Published on May 11, 2025

Implementing a Mutex for ActiveJob

At Chatwoot, we rely heavily on background jobs to process incoming messages and events from various platforms like Facebook. A while back we noticed a strange issue, two messages intended for the same conversation thread would lead into two different threads al-together. The problem was that sometimes webhooks from Facebook would arrive too close together.

These webhook events were picked by the backend and queued for processing with Sidekiq. Our background workers would pick up these jobs simultaneously, each of these threads would check for an existing conversation. Since neither found one (because neither had created it yet), both workers ended up creating separate conversations for the same user message. This race condition resulted in duplicate conversation threads for a single interaction.

Here’s a diagram illustrating the problem:

+----------+                     +----------+
| Facebook | === Webhook 1 ====> |          |
+----------+  (Msg A, Time T)    |          |    +-----------+     +-------------------+
                                 | Chatwoot |===>| Worker 1  |===> | Find/Create Conv? |
                                 | Server   |    +-----------+     +-------------------+
                                 |          |                      (Finds None) |
                                 |          |                                   V
                                 |          |                      +-------------------+
                                 |          |                      |  Create Conv #1   |
                                 |          |                      +-------------------+
+----------+                     |          |
| Facebook | === Webhook 2 ====> |          |    +-----------+     +-------------------+
+----------+  (Msg B, Time T+ε)  |          |===>| Worker 2  |===> | Find/Create Conv? |
                                 +----------+    +-----------+     +-------------------+
                                                                   (Finds None) |
                                                                                V
                                                                   +-------------------+
                                                                   |  Create Conv #2   | <--- Duplicate!
                                                                   +-------------------+

This is a classic race condition, to which there’s a boring solution. We need a way to ensure these events that belong to single conversations are not processed together; a shared mutex is a great way to do this.

Shared lock with Redis SETNX

Redis is a great place to hold a shared key. All processes have access to it, and it’s fast.

The SETNX command (SET if Not eXists) lets us set a key only if it doesn’t already exist — crucially, this check-and-set is atomic. Redis guarantees that no other client can sneak in between the check and the set, so only one process can acquire the lock at a time.

This atomicity is exactly what we need for a distributed mutex. With SETNX, we can safely attempt to acquire a lock in a single step. All that remained was to wrap this logic in a simple, ergonomic interface. That’s where our Redis::LockManager comes in, it’s a small class dedicated to locking, unlocking, and checking the status of locks.

class Redis::LockManager
  LOCK_TIMEOUT = 1.second

  def lock(key, timeout = LOCK_TIMEOUT)
    value = Time.now.to_f.to_s
    Redis::Alfred.set(key, value, nx: true, ex: timeout) ? true : false
  end

  def unlock(key)
    Redis::Alfred.delete(key)
    true
  end

  def locked?(key)
    Redis::Alfred.exists?(key)
  end
end

We have a default timeout of 1 second; this is enough for us to nudge the jobs far enough to avoid any race conditions. We can override the timeout by passing a custom timeout value to the lock method.

The usage was simple as well:

class Webhooks::FacebookEventsJob < ApplicationJob
  class LockAcquisitionError < StandardError; end
  retry_on LockAcquisitionError, wait: 2.seconds, attempts: 5

  def perform(message)
    lock_manager = Redis::LockManager.new
    lock_key = get_key

    raise LockAcquisitionError, "Failed to acquire lock for key: #{lock_key}" if lock_manager.locked?(lock_key)

    begin
      lock_manager.lock(lock_key)
      # process the webhook
    ensure
      # Ensure that the lock is released even if there's an error in processing
      lock_manager.unlock(lock_key)
    end
  end
end

ActiveJob provided all the syntax sugar we needed to handle the lock acquisition failure. Whenever a LockAcquisitionError is raised, ActiveJob retries the job for the number of attempts specified in the retry_on method. This is exactly what we implemented here: chatwoot/chatwoot#7701. You can read more about the ActiveJob retry mechanism here.

We merged this PR and called it a day.

More race conditions

A few days later, we hit a similar snag—this time with our Slack integration, which started sending duplicate messages. It was immediately clear: another race condition, now triggered when processing integration hooks after an agent sent a message.

While the problem was identical to the Facebook issue, our solution needed refinement. The previous implementation—with mutex code awkwardly bolted onto a single job class—felt inelegant. Looking at the core mutex logic, the pattern was clear:

  1. Attempt to acquire a lock
  2. If acquisition fails, raise LockAcquisitionError and retry after a timeout
  3. If successful, execute the protected code block
  4. Release the lock when finished

Since the lock would expire automatically or be released after the job, it was safe from deadlocks. This pattern was crying out to be abstracted. With a bit of object-oriented design, we could extract this logic into a reusable base class that any job requiring mutex protection could inherit from.

Final implementation

After some iterations, we created MutexApplicationJob, which serves as a base class for jobs that require distributed locking mechanisms. It abstracts the locking logic using Redis and ensures that a block of code can be executed with mutual exclusion.

At its heart is the with_lock method, which takes a lock key and wraps any code block in mutex. The method handles the entire lifecycle: it attempts to acquire the lock, executes the protected code if successful, and ensures proper cleanup afterward.

When a lock can’t be acquired, it raises a LockAcquisitionError, triggering ActiveJob’s retry mechanism to try again later providing a simple yet effective way to handle contention.

class MutexApplicationJob < ApplicationJob
  class LockAcquisitionError < StandardError; end

  def with_lock(lock_key, timeout = Redis::LockManager::LOCK_TIMEOUT)
    lock_manager = Redis::LockManager.new

    begin
      if lock_manager.lock(lock_key, timeout)
        log_attempt(lock_key, executions)
        yield
        # release the lock after the block has been executed
        lock_manager.unlock(lock_key)
      else
        handle_failed_lock_acquisition(lock_key)
      end
    rescue StandardError => e
      handle_error(e, lock_manager, lock_key)
    end
  end

  private

  def log_attempt(lock_key, executions)
    Rails.logger.info "[#{self.class.name}] Acquired lock for: #{lock_key} on attempt #{executions}"
  end

  def handle_error(err, lock_manager, lock_key)
    lock_manager.unlock(lock_key) unless err.is_a?(LockAcquisitionError)
    raise err
  end

  def handle_failed_lock_acquisition(lock_key)
    Rails.logger.warn "[#{self.class.name}] Failed to acquire lock on attempt #{executions}: #{lock_key}"
    raise LockAcquisitionError, "Failed to acquire lock for key: #{lock_key}"
  end
end

The handle_error method handles any StandardError exceptions that may occur during the execution of the block within the mutex. It ensures that the lock is released before re-raising the error. We log the execution attempt too; ActiveJob exposes the executions attribute, which is the number of times this job has been executed (which increments on every retry, like after an exception).

The handle_failed_lock_acquisition method logs a warning when lock acquisition fails and raises a LockAcquisitionError. This approach resulted in much cleaner implementation code. Here is how our refactored Facebook job looked:

class Webhooks::FacebookEventsJob < MutexApplicationJob
  queue_as :default
  retry_on LockAcquisitionError, wait: 1.second, attempts: 8

  def perform(message)
    response = ::Integrations::Facebook::MessageParser.new(message)

    key = format(::Redis::Alfred::FACEBOOK_MESSAGE_MUTEX, sender_id: response.sender_id, recipient_id: response.recipient_id)
    with_lock(key) do
      ::Integrations::Facebook::MessageCreator.new(response).perform
    end
  end
end

We implemented this in chatwoot/chatwoot#7783. Over the last two years, this class has been used across multiple mission-critical jobs. All we needed for these jobs was a key that is granular enough, and it works like a charm (unless it doesn’t).

Some Limitations

Although it works, the solution is far from perfect, and has a lot of inherent flaws. All the limitations of this come down to fairness:

  • The mutex mechanism doesn’t preserve execution sequence, potentially causing messages to arrive out of order. Fortunately, this happens rarely enough to be tolerable in our use case.
  • With a maximum retry limit, during peak concurrency some jobs might never acquire the lock. These edge cases are rare but do occur.

In the context of concurrency and mutexes, fairness refers to the property that ensures threads waiting to acquire a mutex lock will eventually get it, typically in the order they requested it (First-In, First-Out or FIFO).

We have fine-tuned the timeout and retry configuration several times to improve reliability. But a next version of this class is due soon, solving all these limitations. If you have any suggestions or feedback, feel free to start a discussion on GitHub.

📫  Get updates!

I'll send an email only when I publish something on my blog, which is usually once month at best, and you can unsubscribe instantly at any time.

No spam, unsubscribe at any time. Scout's honor.

Built using Picoletter.