
I am currently running a chaos test on a system (essentially starting nodes
that process something and randomly knockign them out). It appeared to work
fine with regular tests but I am seeing occasional duplicate key value
violattions of a uniqueness constraint on one of the complexer CTE-based
queries. Something that  only happens with concurrency where nodes restart
and ample load.
I can not reproduce it by taking out the query and running it manually in
PG Admin, it behaves fine if I do so and does exactly what I expect.

The query looks like this (it uses Rust SQLX which is why there is some
unnesting happening on the parameters).

WITH unnested_inputs AS (
            unnest($1::uuid[]) AS event_id,
            unnest($2::varchar[]) AS type,
            unnest($3::int[]) AS version,
            unnest($4::uuid[]) AS causation_id,
            unnest($5::uuid[]) AS correlation_id,
            unnest($6::text[]) AS idempotency_key,
            unnest($7::jsonb[]) AS data,
            unnest($8::jsonb[]) AS metadata,
            unnest($9::text[]) AS subscription_id,
            unnest($10::text[]) AS subscription_instance_identifier,
            unnest($11::bigint[]) AS applied_order_id
    ) AS inputs
to_update_subscription_logs AS (
    SELECT as subscription_log_id, sl.node_id, sl.status, ui.*
    FROM subscription_log sl
    JOIN unnested_inputs ui
    ON sl.event_id = ui.causation_id
    AND sl.node_id = $12
    AND sl.status = 'assigned'
    AND sl.subscription_id = ui.subscription_id
    AND sl.subscription_instance_identifier =
    FOR UPDATE NOWAIT -- if something is updating it, we probably shouldn't
touch it anymore.
updated_logs AS (
    UPDATE subscription_log sl
    SET status = 'processed',
    updated_at = CURRENT_TIMESTAMP
    FROM to_update_subscription_logs usl
    WHERE = usl.subscription_log_id
        AND usl.node_id = $12
inserted_event_log AS (
    INSERT INTO event_log (
        event_id, type, version, causation_id, correlation_id,
        idempotency_key, data, metadata, created_at
        event_id, type, version, usl.causation_id, correlation_id,
        idempotency_key, data, metadata, CURRENT_TIMESTAMP
    FROM to_update_subscription_logs usl
inserted_output_routing_info AS (
    INSERT INTO output_event_routing (event_id, subscription_id,
subscription_instance_identifier, applied_order_id)
    SELECT event_id, subscription_id, subscription_instance_identifier,
    FROM to_update_subscription_logs usl

SELECT * FROM to_update_subscription_logs

The tables look as follows:

CREATE TABLE event_log (
    event_id UUID PRIMARY KEY,
    event_order_id BIGINT REFERENCES event(order_id),
    type varchar NOT NULL,
    version int NOT NULL,
    causation_id UUID,
    correlation_id UUID,
    idempotency_key TEXT NOT NULL,
    data JSONB NOT NULL,
    metadata JSONB,
    CONSTRAINT constraint_event_log_unique_idempotency_key
UNIQUE(idempotency_key) -- idempotent writes.

CREATE TABLE output_event_routing (
    event_id UUID REFERENCES event_log(event_id),
    subscription_id TEXT NOT NULL,
    subscription_instance_identifier TEXT,
    applied_order_id BIGINT,
    CONSTRAINT constraint_output_event_routing_uniqueness
UNIQUE(subscription_id, subscription_instance_identifier, applied_order_id)

CREATE TABLE subscription_log (
    event_id UUID NOT NULL,
    event_order_id BIGINT NOT NULL,
    event_correlation_id UUID NOT NULL,
    subscription_instance_identifier TEXT NOT NULL,
    subscription_id TEXT NOT NULL REFERENCES subscription(name),
    status processing_status NOT NULL DEFAULT 'enqueued',
    node_id UUID references node(id),     -- is null until assigned.

Since I'm trying to avoid using PL/pgSQL upon request I tried to achieve
the following behaviour in CTEs:
- For given events, update the subscription log to 'processed' only if we
still are the node that is processing these and the status is still
- Only for the events where the previous succeeded, continue processing by
inserting in the event_log and inserting in
the inserted_output_routing_info.
The mechanism aims to make sure we don't insert results of event processing

When logging the input values, we can see that there are indeed two times
the same value sets (exactly the same) passed for different nodes, that's
to be expected and exactly what has to be caught by this logic. Same
values, but another node. What we see is that one node succeeds and the
other node fails due to the uniqueness violation. Which is actually fine
from a business perspective since rolling back has the same effect, albeit
with an error that I didn't expect. However, I would love to understand
this, how can one node succeed and set the status of the log to 'processed'
and continue to insert the values while the other, apparently also is able
to continue inserting (which means that both nodes saw 'assigned' in the
select when it locked it for update). Is there something I do not fully
understand about how CTEs work in combination with locks?

Things I tried:
1. Whether I go for regular FOR UPDATE, SKIP LOCK or NOWAIT makes no
2. I do return the to_update_subscription_logs at the end to be sure the
lock is held (I'm aware of that CTE behaviour for selects) even if it's
used by the next CTE.

3. Changing it to UPDATE/RETURN (which was my original logic)
updated_subscription AS (
     UPDATE subscription_log sl
     SET status = 'processed',
             updated_at = CURRENT_TIMESTAMP
     FROM unnested_inputs ui
     WHERE sl.event_id = ui.causation_id
     AND sl.node_id = $12
     AND sl.status = 'assigned' -- Assuming you're updating from 'assigned'
     RETURNING ui.causation_id

then doing a distinct on the causation ID and only allow inserts for values
that have a causation ID that was successfully updated to processed yields
exactly the same behaviour, everything goes fine but once in a while things
go wrong when a node dies and another takes over.

4. I also tried with both approaches to do more explicit checks in the
joins when we insert to make sure that in the insert CTEs the node is still
the same etc, things that shouldn't be necessary, they also didn't change

5. After the error we can see that the succeeding node successfully set the
status to process and that it 'owns' the subscription_log entry for that
processed event. Which means the other node should not have been able to
get passed the lock.

If anyone could provide some insight of what knowledge I'm missing about
how CTEs work that would be amazing. Sorry that it's a rather complex case
which makes it hard to come up with something 'smaller' with the same tests
that reproduce it.

Reply via email to