Published on
Implementing a Mutex for ActiveJob
At Chatwoot, we rely heavily on background jobs to process incoming messages and events from various platforms like Facebook. A while back we noticed a strange issue, two messages intended for the same conversation thread would lead into two different threads al-together. The problem was that sometimes webhooks from Facebook would arrive too close together.
These webhook events were picked by the backend and queued for processing with Sidekiq. Our background workers would pick up these jobs simultaneously, each of these threads would check for an existing conversation. Since neither found one (because neither had created it yet), both workers ended up creating separate conversations for the same user message. This race condition resulted in duplicate conversation threads for a single interaction.
Here’s a diagram illustrating the problem:
+----------+ +----------+
| Facebook | === Webhook 1 ====> | |
+----------+ (Msg A, Time T) | | +-----------+ +-------------------+
| Chatwoot |===>| Worker 1 |===> | Find/Create Conv? |
| Server | +-----------+ +-------------------+
| | (Finds None) |
| | V
| | +-------------------+
| | | Create Conv #1 |
| | +-------------------+
+----------+ | |
| Facebook | === Webhook 2 ====> | | +-----------+ +-------------------+
+----------+ (Msg B, Time T+ε) | |===>| Worker 2 |===> | Find/Create Conv? |
+----------+ +-----------+ +-------------------+
(Finds None) |
V
+-------------------+
| Create Conv #2 | <--- Duplicate!
+-------------------+
This is a classic race condition, to which there’s a boring solution. We need a way to ensure these events that belong to single conversations are not processed together; a shared mutex is a great way to do this.
Shared lock with Redis SETNX
Redis is a great place to hold a shared key. All processes have access to it, and it’s fast.
The SETNX
command (SET if Not eXists) lets us set a key only if it doesn’t already exist — crucially, this check-and-set is atomic. Redis guarantees that no other client can sneak in between the check and the set, so only one process can acquire the lock at a time.
This atomicity is exactly what we need for a distributed mutex. With SETNX
, we can safely attempt to acquire a lock in a single step. All that remained was to wrap this logic in a simple, ergonomic interface. That’s where our Redis::LockManager
comes in, it’s a small class dedicated to locking, unlocking, and checking the status of locks.
class Redis::LockManager
LOCK_TIMEOUT = 1.second
def lock(key, timeout = LOCK_TIMEOUT)
value = Time.now.to_f.to_s
Redis::Alfred.set(key, value, nx: true, ex: timeout) ? true : false
end
def unlock(key)
Redis::Alfred.delete(key)
true
end
def locked?(key)
Redis::Alfred.exists?(key)
end
end
We have a default timeout of 1 second; this is enough for us to nudge the jobs far enough to avoid any race conditions. We can override the timeout by passing a custom timeout value to the lock
method.
The usage was simple as well:
class Webhooks::FacebookEventsJob < ApplicationJob
class LockAcquisitionError < StandardError; end
retry_on LockAcquisitionError, wait: 2.seconds, attempts: 5
def perform(message)
lock_manager = Redis::LockManager.new
lock_key = get_key
raise LockAcquisitionError, "Failed to acquire lock for key: #{lock_key}" if lock_manager.locked?(lock_key)
begin
lock_manager.lock(lock_key)
# process the webhook
ensure
# Ensure that the lock is released even if there's an error in processing
lock_manager.unlock(lock_key)
end
end
end
ActiveJob
provided all the syntax sugar we needed to handle the lock acquisition failure. Whenever a LockAcquisitionError
is raised, ActiveJob
retries the job for the number of attempts specified in the retry_on
method. This is exactly what we implemented here: chatwoot/chatwoot#7701. You can read more about the ActiveJob retry mechanism here.
We merged this PR and called it a day.
More race conditions
A few days later, we hit a similar snag—this time with our Slack integration, which started sending duplicate messages. It was immediately clear: another race condition, now triggered when processing integration hooks after an agent sent a message.
While the problem was identical to the Facebook issue, our solution needed refinement. The previous implementation—with mutex code awkwardly bolted onto a single job class—felt inelegant. Looking at the core mutex logic, the pattern was clear:
- Attempt to acquire a lock
- If acquisition fails, raise
LockAcquisitionError
and retry after a timeout - If successful, execute the protected code block
- Release the lock when finished
Since the lock would expire automatically or be released after the job, it was safe from deadlocks. This pattern was crying out to be abstracted. With a bit of object-oriented design, we could extract this logic into a reusable base class that any job requiring mutex protection could inherit from.
Final implementation
After some iterations, we created MutexApplicationJob
, which serves as a base class for jobs that require distributed locking mechanisms. It abstracts the locking logic using Redis and ensures that a block of code can be executed with mutual exclusion.
At its heart is the with_lock
method, which takes a lock key and wraps any code block in mutex. The method handles the entire lifecycle: it attempts to acquire the lock, executes the protected code if successful, and ensures proper cleanup afterward.
When a lock can’t be acquired, it raises a LockAcquisitionError
, triggering ActiveJob’s retry mechanism to try again later providing a simple yet effective way to handle contention.
class MutexApplicationJob < ApplicationJob
class LockAcquisitionError < StandardError; end
def with_lock(lock_key, timeout = Redis::LockManager::LOCK_TIMEOUT)
lock_manager = Redis::LockManager.new
begin
if lock_manager.lock(lock_key, timeout)
log_attempt(lock_key, executions)
yield
# release the lock after the block has been executed
lock_manager.unlock(lock_key)
else
handle_failed_lock_acquisition(lock_key)
end
rescue StandardError => e
handle_error(e, lock_manager, lock_key)
end
end
private
def log_attempt(lock_key, executions)
Rails.logger.info "[#{self.class.name}] Acquired lock for: #{lock_key} on attempt #{executions}"
end
def handle_error(err, lock_manager, lock_key)
lock_manager.unlock(lock_key) unless err.is_a?(LockAcquisitionError)
raise err
end
def handle_failed_lock_acquisition(lock_key)
Rails.logger.warn "[#{self.class.name}] Failed to acquire lock on attempt #{executions}: #{lock_key}"
raise LockAcquisitionError, "Failed to acquire lock for key: #{lock_key}"
end
end
The handle_error
method handles any StandardError
exceptions that may occur during the execution of the block within the mutex. It ensures that the lock is released before re-raising the error. We log the execution attempt too; ActiveJob
exposes the executions
attribute, which is the number of times this job has been executed (which increments on every retry, like after an exception).
The handle_failed_lock_acquisition
method logs a warning when lock acquisition fails and raises a LockAcquisitionError
. This approach resulted in much cleaner implementation code. Here is how our refactored Facebook job looked:
class Webhooks::FacebookEventsJob < MutexApplicationJob
queue_as :default
retry_on LockAcquisitionError, wait: 1.second, attempts: 8
def perform(message)
response = ::Integrations::Facebook::MessageParser.new(message)
key = format(::Redis::Alfred::FACEBOOK_MESSAGE_MUTEX, sender_id: response.sender_id, recipient_id: response.recipient_id)
with_lock(key) do
::Integrations::Facebook::MessageCreator.new(response).perform
end
end
end
We implemented this in chatwoot/chatwoot#7783. Over the last two years, this class has been used across multiple mission-critical jobs. All we needed for these jobs was a key that is granular enough, and it works like a charm (unless it doesn’t).
Some Limitations
Although it works, the solution is far from perfect, and has a lot of inherent flaws. All the limitations of this come down to fairness:
- The mutex mechanism doesn’t preserve execution sequence, potentially causing messages to arrive out of order. Fortunately, this happens rarely enough to be tolerable in our use case.
- With a maximum retry limit, during peak concurrency some jobs might never acquire the lock. These edge cases are rare but do occur.
In the context of concurrency and mutexes, fairness refers to the property that ensures threads waiting to acquire a mutex lock will eventually get it, typically in the order they requested it (First-In, First-Out or FIFO).
We have fine-tuned the timeout and retry configuration several times to improve reliability. But a next version of this class is due soon, solving all these limitations. If you have any suggestions or feedback, feel free to start a discussion on GitHub.
📫 Get updates!
I'll send an email only when I publish something on my blog, which is usually once month at best, and you can unsubscribe instantly at any time.
No spam, unsubscribe at any time. Scout's honor.