I recently added a deduplication safeguard to a queue-driven system. The problem was simple to describe but harder to solve well; many obvious answers turned out to have some bad trade-offs.
The setup
The system is event-driven. Workers pull messages off an SQS FIFO queue, where each message is grouped by an entity ID. Processing a message may create an “incident” for the entity. The processing flow looks like this:
- The worker reads existing open incidents for the entity from Postgres.
- It runs a slow evaluation. This usually means catalog lookups, calls to remote services, sometimes more than one hop.
- If the evaluation says the entity is in violation of some policy, the worker inserts a new incident row.
The read in step 1 and the insert in step 3 are in two separate database transactions, with potentially several seconds of network I/O in between. That gap introduces data races that can be hard to debug.
In the common case, FIFO message grouping prevents two workers from ever processing the same entity at the same time. The queue guarantees per-group ordering, so worker A finishes entity X before worker B starts on entity X. That guarantee is the primary defense and it works really well.
But that guarantee has edge cases. Visibility timeouts can expire while a worker is still processing. A worker can be slow enough that the queue redelivers the same message to a different worker. These edge cases are not very common, but they can happen, and I wanted absolute correctness for one specific invariant: for certain policies, there must never be two open incidents for the same entity.
In concrete terms, consider this sequence of events:
[T00] -> W1: receives message for entity X.
[T01] -> W1: BEGIN read-only txn; SELECT open incidents for X = []; COMMIT.
[T02] -> W1: starts evaluation. Network calls in flight.
[T03] -> (visibility timeout on the message expires; SQS marks it for redelivery)
[T04] -> W2: receives the same message, redelivered.
[T05] -> W2: BEGIN read-only txn; SELECT open incidents for X = []; COMMIT.
[T06] -> W2: starts evaluation.
[T07] -> W1: evaluation completes. Decides "violation".
[T08] -> W1: BEGIN write txn; INSERT new incident; COMMIT.
[T09] -> W2: evaluation completes. Decides "violation".
[T10] -> W2: BEGIN write txn; INSERT new incident; COMMIT.
Two open incidents on entity X. Both workers were individually correct given what they could see but the system invariant is now broken. The check (“is there an existing incident?”) is in a different transaction from the write (“create a new incident”). By the time W2 writes, its check is stale - W1’s incident is already there but W2’s snapshot was taken before W1 wrote.
First instinct: a unique constraint
If you have a clean uniqueness rule, this is the cheapest answer in the world. A partial unique index like:
CREATE UNIQUE INDEX one_open_incident_per_entity_policy
ON incidents (policy_id, entity_id)
WHERE lifecycle_state = 'OPEN';
The second insert would fail with a constraint violation. The application catches it and treats it as “another worker won the race.” Done.
This is what I wanted to use, and it is what I would have used in a simpler system. The “only one open incident per entity” rule is not universal here. Some policies allow multiple open incidents on the same entity legitimately. The singleton-ness varies per policy and, for some policies, even per entity depending on runtime state.
You can sometimes encode that into a richer partial index. In this case the rule was dynamic enough that there was no clean static encoding.
Second instinct: stronger isolation
If a unique constraint isn’t an option, the next natural fix is to put the read and the write into a single transaction with stronger isolation. In Postgres, that means REPEATABLE READ but this does not work because it is implemented via snapshot isolation. Each transaction sees a consistent snapshot of the database taken at the moment the transaction started. It prevents non-repeatable reads. It also prevents phantom reads in the strict sense: re-running the same query in the same transaction returns the same results.
What it does not prevent is write skew: two transactions reading from a set of rows, then each writing new rows that would have changed each other’s read if they had been visible.
In our case there is no row-level conflict. W1 and W2 are not modifying the same row. They are both inserting brand-new rows. Snapshot isolation has no objection.
SERIALIZABLE does catch this, but in Postgres it works by aborting one of the conflicting transactions with a serialization failure, and the application is expected to retry. Adopting it for one code path means every other transaction touching the same tables also has to deal with abort-and-retry, or you get unexpected aborts somewhere unrelated. It is the right answer in some systems but it is a heavy commitment that I was not eager to make right now.
Third instinct: hold a lock across the evaluation
The next thought was to take a per-entity lock at the start of message processing and release it at the end. While this worker is processing entity X, no other worker can.
Postgres supports session-level advisory locks, which are exactly this:
SELECT pg_advisory_lock(hashtextextended('entity:' || $1, 0));
-- ... do all your work, including the slow evaluation, network calls etc ...
SELECT pg_advisory_unlock(hashtextextended('entity:' || $1, 0));
These locks are essentially logical locks that the application can just acquire without needing an actual row to exist in the database. While this is correct, it is also operationally risky in a way that only shows up under higher load.
Today, the system holds a connection only during short transactions, on the order of tens of milliseconds. With a connection pool of around 30 connections, the hundred or so worker threads polling from SQS share them comfortably because no individual worker monopolizes a connection for too long.
If I switched to session-level locks held across the whole evaluation, the situation changes. If processing takes three to five seconds end to end, a worker now occupies a connection for that entire window. The number of concurrent in-flight messages is hard-capped by the pool size.
That trade-off is fine when traffic and evaluation latencies are steady and I have tuned the pool size per worker instance, but when those latencies don’t hold, I run the risk of exhausting the connection pool.
What actually fit: a short lock at the moment of the write
In hindsight, this is now obvious to me but I realized it only later - the slow part of the evaluation does not actually need to be inside the lock. The only thing that needs to be inside the lock is the moment where we look at the existing state and decide whether our pending decision is still correct.
Postgres has a sibling to the session-level advisory lock: pg_advisory_xact_lock, scoped to the current transaction and released automatically at COMMIT or ROLLBACK.
So the structure became:
1. Run the evaluation outside any transaction. No DB connection
held during network calls. Same as today.
2. If the evaluation produces a "create incident" decision, open
a short write transaction. Inside that transaction:
- Take a per-entity advisory lock - but the *transaction-scoped*
variant, `pg_advisory_xact_lock`.
- Re-read the current set of open incident IDs for
`(policy_id, entity_id)`.
- Compare it to what the evaluator saw earlier.
- If they match, insert the new incident.
- If they differ, throw a `ConcurrentIncidentModificationException`.
The caller logs it, increments a metric counter, and yields
nothing. The next message for this entity will re-evaluate
with the now-updated state.
3. COMMIT releases the lock.
The lock window is small: one indexed read and one insert. Microseconds in steady state. The connection is checked out only during the short write transaction, exactly like before.
Postgres deliberately gives you no pg_advisory_xact_unlock function - these locks can only be released by COMMIT or ROLLBACK. Lock leaks are impossible by construction. A worker that crashes mid-write rolls back and releases the lock. A worker that throws unexpectedly rolls back and releases the lock. There is no path where the lock survives the transaction.
The high-level shape of the new persistence method is:
// called inside a transaction
createIncidentIfUnchanged(incident, seenOpenIncidents):
acquire_xact_lock("entity:" + entityId)
freshIds = SELECT incident_id FROM incidents
WHERE policy_id = ?
AND entity_id = ?
AND lifecycle_state = 'OPEN'
seenIds = ids of seenOpenIncidents filtered to this policy
if seenIds != freshIds:
increment counter "dedup-check-failed"
throw ConcurrentIncidentModificationException
saveIncident(incident)
The seenOpenIncidents is the list the evaluator saw at the start of step 1. The comparison is intentionally simple: just the set of IDs. If anything in that set changed - a new incident appeared, an existing one transitioned out of OPEN then we just abort. That is conservative. It throws even when the change may not have invalidated the decision. I am fine with that. The cost of an unnecessary abort is one re-evaluation on the next message, which is cheap.
What this actually buys
This pattern serializes only the narrow critical section that matters for the invariant. The slow evaluation is outside the lock, so two workers can still evaluate the same entity concurrently - they just cannot both write. Whoever writes first wins; the loser sees a changed read set under the lock and aborts.
Operationally, the connection-pool footprint is unchanged. The worker fleet remains elastic. Postgres-level concurrency is unaffected for any other call path.
Failure modes
There are exactly two failure modes worth thinking about.
Acquired outside a transaction. If pg_advisory_xact_lock is called outside an open transaction, Postgres acquires it within the implicit single-statement transaction and immediately releases it when that statement returns. The lock effectively no-ops. The dedup check still runs, but it is unprotected.
Concurrent collision. Two workers race. The second one’s read sees the first one’s just-inserted incident, the comparison fails, and the second one throws. The exception is caught at the bootstrap level, logged at WARN, and the metrics are tagged with concurrent-modification=true.
The dedup-failure counter is an important operational signal. A non-zero rate after rollout means the safeguard is catching real concurrent writes. A very high rate would point at something wrong upstream, like FIFO grouping not working as intended.
Rollout
I gated the entire new path behind a runtime property defaulting to 0%. The old code path is untouched. Increase the rollout percentage and watch the latency metrics for the processing code. If anything goes wrong, flip it back to 0 without redeploying. I am a little paranoid (one of my old colleagues used to call it healthy paranoia; hi AM!) about this change so I felt I needed to make it conditional on a property that I can turn off easily.
This was the main thing I watched during rollout:

Processing latencies as well as create-incident latencies remained bounded.
Takeaway
The fix came down to a principle I use when writing concurrent code on the JVM: minimize the critical section. You wouldn’t wrap a five-second network call in a synchronized block - you’d do the work, then use a lock only to safely update the shared state. The same logic applied here.
If you can’t use a unique constraint, find the smallest window where the invariant is actually at risk and lock only that.