[
https://issues.apache.org/jira/browse/CASSANDRA-21181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ariel Weisberg updated CASSANDRA-21181:
---------------------------------------
Description:
MultiNodeTableWalkWithWitnessesTest fails with read timeouts when
reconciliation hangs.
ShortPaxosTrackingSimulationTest fails with:
{code:java}
Caused by: java.lang.RuntimeException: Missing mutation ShortMutationId{2, 1,
0}
at
org.apache.cassandra.service.reads.tracked.PartialTrackedRead.augment(PartialTrackedRead.java:261)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at
org.apache.cassandra.service.reads.tracked.PartialTrackedRead.augment(PartialTrackedRead.java:247)
at
org.apache.cassandra.service.reads.tracked.TrackedLocalReads$Coordinator.lambda$acknowledgeReconcile$0(TrackedLocalReads.java:256)
at org.apache.cassandra.concurrent.FutureTask$2.call(FutureTask.java:124)
at org.apache.cassandra.concurrent.FutureTask.call(FutureTask.java:61)
at org.apache.cassandra.concurrent.FutureTask.run(FutureTask.java:71)
at
org.apache.cassandra.simulator.systems.InterceptingExecutor$InterceptingPooledExecutor$WaitingThread.lambda$new$1(InterceptingExecutor.java:284)
{code}
The read reconciliation protocol requires each node to verify it has all
mutations that other nodes have. When a node discovers it's missing a mutation,
it registers a callback ("notify me when this mutation arrives") and sends a
pull request to fetch it. The callback decrements a counter, and the read can
only
complete when all counters reach zero.
A race condition in the duplicate mutation handling causes the callback to
never fire, leaving the counter permanently stuck at 1 and the read hanging
until timeout.
h2. The race
Setup: Node2 is a summary node for a tracked read. Node3 is the data node.
Node2 creates its mutation summary for the read. At this point, coordinator log
{2, 20} has offsets [0,0] — node2 knows about offset 0 only.
A write happens on node2 (normal replication, unrelated to the read). This
write gets offset 1 in coordinator log \{2, 20}
. The write goes through {{{}Keyspace.applyInternalTracked(){}}}:
{{startWriting()}} returns true (first time seeing this offset), the mutation
is applied to memtables, {{finishWriting()}} calls
{{invokeListeners(mutationId)}} — but nobody is listening yet, so it's a no-op.
Node3 sends its summary to node2. Node3's summary has [0,1] for log
{2, 20} — it knows about both offsets 0 and 1.
Node2 processes node3's summary in {{{}acceptRemoteSummary(){}}}.
{{collectLocallyMissingMutations()}} compares node3's summary against node2's
current local state. Even though the mutation arrived in step 2, there is a
window where the collection logic still identifies offset 1 as missing —
{{collectLocallyMissingMutations()}} takes a read lock on the CoordinatorLog
and computes the difference against {{{}witnessedOffsets{}}}, but this check
can race with the write in step 2. It finds offset 1 is missing, calls
{{registerMutationCallback()}} which succeeds (first listener — the earlier
{{invokeListeners}} found no listeners and was a no-op), increments the
remaining mutations counter by 1, and calls {{pull()}} which sends
{{PULL_MUTATIONS_REQ}} to node2 itself (since log \{2, 20}
is node2's own coordinator log).
Node2 receives its own pull request. {{PullMutationsRequest}} handler calls
{{{}requestMissingMutations(){}}}, which schedules the mutation for delivery
via the {{{}ActiveLogReconciler{}}}.
The {{ActiveLogReconciler}} looks up the mutation in the journal (it's there),
reads it, and sends a {{PUSH_MUTATION_REQ}} back to node2.
Node2 receives the pushed mutation. {{PushMutationRequest}} handler calls
{{{}mutation.applyFuture(){}}}, which goes through
{{{}Keyspace.applyInternalTracked(){}}}.
{{MutationTrackingService.startWriting()}} is called, but
{{CoordinatorLog.startWriting()}} checks {{witnessedOffsets}} and finds offset
1 is already
witnessed from step 2 — returns false. Because {{started}} is false,
{{finishWriting()}} is never called, {{invokeListeners()}} is never called, the
callback registered in step 4 is never invoked, and the remaining mutations
counter stays at 1 forever.
Node2 never completes reconciliation, so it never sends {{READ_RECONCILE_ACK}}
to node3. Node3 is waiting for node2's syncAck. It never arrives. After 60
seconds, the client times out with {{{}OperationTimedOutException{}}}.
h2. The fix
In {{{}MutationTrackingService.startWriting(){}}}, when the underlying
{{CoordinatorLog.startWriting()}} returns false (duplicate mutation already
witnessed), we now call
{{{}incomingMutations.invokeListeners(mutation.id()){}}}. This fires any
pending callbacks even for duplicate mutations. The callback fires, the counter
decrements to 0, the summary node completes reconciliation and sends its
syncAck, and the read succeeds.
This is safe because {{startWriting()}} returning false means
{{finishWriting()}} already completed for this offset (it's the only code path
that adds to {{{}witnessedOffsets{}}}), so the mutation data is fully written
and visible.
{code:java}
// MutationTrackingService.startWriting()
boolean started =
getOrCreateShards(mutation.getKeyspaceName()).startWriting(mutation);
// If this is a duplicate mutation (already witnessed), notify any pending
read
// reconciliation listeners. A listener can be registered between the first
write's
// invokeListeners() call (which found no listeners) and this duplicate's
arrival,
// causing the listener to never fire and the read to hang.
if (!started)
incomingMutations.invokeListeners(mutation.id());
return started;
{code}
h2. Alternatives
"check-after-register" in {{ReadReconciliations.pull()}} that would avoid the
unnecessary pull round-trip entirely: after registering the callback,
immediately check if the mutation is already in {{{}witnessedOffsets{}}}, and
if so fire the callback directly without sending a pull request.
It might also work to leverage the locking to make the processing of
{{collectLocallyMissingMutations()}} atomic with registration and notification
of listeners, but that wasn't as straightforward a fix.
was:
MultiNodeTableWalkWithWitnessesTest fails with read timeouts when
reconciliation hangs.
ShortPaxosTrackingSimulationTest fails with:
{code:java}
Caused by: java.lang.RuntimeException: Missing mutation ShortMutationId{2, 1,
0}
at
org.apache.cassandra.service.reads.tracked.PartialTrackedRead.augment(PartialTrackedRead.java:261)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at
org.apache.cassandra.service.reads.tracked.PartialTrackedRead.augment(PartialTrackedRead.java:247)
at
org.apache.cassandra.service.reads.tracked.TrackedLocalReads$Coordinator.lambda$acknowledgeReconcile$0(TrackedLocalReads.java:256)
at org.apache.cassandra.concurrent.FutureTask$2.call(FutureTask.java:124)
at org.apache.cassandra.concurrent.FutureTask.call(FutureTask.java:61)
at org.apache.cassandra.concurrent.FutureTask.run(FutureTask.java:71)
at
org.apache.cassandra.simulator.systems.InterceptingExecutor$InterceptingPooledExecutor$WaitingThread.lambda$new$1(InterceptingExecutor.java:284)
{code}
The read reconciliation protocol requires each node to verify it has all
mutations that other nodes have. When a node discovers it's missing a mutation,
it registers a callback ("notify me when this mutation arrives") and sends a
pull request to fetch it. The callback decrements a counter, and the read can
only
complete when all counters reach zero.
A race condition in the duplicate mutation handling causes the callback to
never fire, leaving the counter permanently stuck at 1 and the read hanging
until timeout.
h2. The race
Setup: Node2 is a summary node for a tracked read. Node3 is the data node.
Node2 creates its mutation summary for the read. At this point, coordinator log
{2, 20} has offsets [0,0] — node2 knows about offset 0 only.
A write happens on node2 (normal replication, unrelated to the read). This
write gets offset 1 in coordinator log \{2, 20}
. The write goes through {{{}Keyspace.applyInternalTracked(){}}}:
{{startWriting()}} returns true (first time seeing this offset), the mutation
is applied to memtables, {{finishWriting()}} calls
{{invokeListeners(mutationId)}} — but nobody is listening yet, so it's a no-op.
Node3 sends its summary to node2. Node3's summary has [0,1] for log
{2, 20} — it knows about both offsets 0 and 1.
Node2 processes node3's summary in {{{}acceptRemoteSummary(){}}}.
{{collectLocallyMissingMutations()}} compares node3's summary against node2's
current local state. Even though the mutation arrived in step 2, there is a
window where the collection logic still identifies offset 1 as missing —
{{collectLocallyMissingMutations()}} takes a read lock on the CoordinatorLog
and computes the difference against {{{}witnessedOffsets{}}}, but this check
can race with the write in step 2. It finds offset 1 is missing, calls
{{registerMutationCallback()}} which succeeds (first listener — the earlier
{{invokeListeners}} found no listeners and was a no-op), increments the
remaining mutations counter by 1, and calls {{pull()}} which sends
{{PULL_MUTATIONS_REQ}} to node2 itself (since log \{2, 20}
is node2's own coordinator log).
Node2 receives its own pull request. {{PullMutationsRequest}} handler calls
{{{}requestMissingMutations(){}}}, which schedules the mutation for delivery
via the {{{}ActiveLogReconciler{}}}.
The {{ActiveLogReconciler}} looks up the mutation in the journal (it's there),
reads it, and sends a {{PUSH_MUTATION_REQ}} back to node2.
Node2 receives the pushed mutation. {{PushMutationRequest}} handler calls
{{{}mutation.applyFuture(){}}}, which goes through
{{{}Keyspace.applyInternalTracked(){}}}.
{{MutationTrackingService.startWriting()}} is called, but
{{CoordinatorLog.startWriting()}} checks {{witnessedOffsets}} and finds offset
1 is already
witnessed from step 2 — returns false. Because {{started}} is false,
{{finishWriting()}} is never called, {{invokeListeners()}} is never called, the
callback registered in step 4 is never invoked, and the remaining mutations
counter stays at 1 forever.
Node2 never completes reconciliation, so it never sends {{READ_RECONCILE_ACK}}
to node3. Node3 is waiting for node2's syncAck. It never arrives. After 60
seconds, the client times out with {{{}OperationTimedOutException{}}}.
h2. The fix
In {{{}MutationTrackingService.startWriting(){}}}, when the underlying
{{CoordinatorLog.startWriting()}} returns false (duplicate mutation already
witnessed), we now call
{{{}incomingMutations.invokeListeners(mutation.id()){}}}. This fires any
pending callbacks even for duplicate mutations. The callback fires, the counter
decrements to 0, the summary node completes reconciliation and sends its
syncAck, and the read succeeds.
This is safe because {{startWriting()}} returning false means
{{finishWriting()}} already completed for this offset (it's the only code path
that adds to {{{}witnessedOffsets{}}}), so the mutation data is fully written
and visible.
{code:java}
// MutationTrackingService.startWriting()
boolean started =
getOrCreateShards(mutation.getKeyspaceName()).startWriting(mutation);
// If this is a duplicate mutation (already witnessed), notify any pending
read
// reconciliation listeners. A listener can be registered between the first
write's
// invokeListeners() call (which found no listeners) and this duplicate's
arrival,
// causing the listener to never fire and the read to hang.
if (!started)
incomingMutations.invokeListeners(mutation.id());
return started;
{code}
h2. Alternatives
"check-after-register" in {{ReadReconciliations.pull()}} that would avoid the
unnecessary pull round-trip entirely: after registering the callback,
immediately check if the mutation is already in {{{}witnessedOffsets{}}}, and
if so fire the callback directly without sending a pull request.
This follows the standard "register interest, then check if you missed the
event" pattern. However, this would require threading a new
{{isMutationWitnessed()}} method through {{{}MutationTrackingService{}}},
{{{}Shard{}}}, and {{{}CoordinatorLog{}}}, adding complexity across multiple
layers. The {{startWriting()}} fix is
simpler — a single check at the point where duplicates are already detected —
and is consistent with how {{finishWriting()}} and the transfer activation path
already call {{invokeListeners()}} under the same lock.
It might also work to leverage the locking to make the processing of
{{collectLocallyMissingMutations()}} atomic with registration and notification
of listeners, but that wasn't as straightforward a fix.
> Fix ShortPaxosTrackingSimulationTest and MultiNodeTableWalkWithWitnessesTest
> ----------------------------------------------------------------------------
>
> Key: CASSANDRA-21181
> URL: https://issues.apache.org/jira/browse/CASSANDRA-21181
> Project: Apache Cassandra
> Issue Type: Sub-task
> Components: Consistency/Coordination
> Reporter: Ariel Weisberg
> Assignee: Ariel Weisberg
> Priority: Normal
>
> MultiNodeTableWalkWithWitnessesTest fails with read timeouts when
> reconciliation hangs.
> ShortPaxosTrackingSimulationTest fails with:
> {code:java}
> Caused by: java.lang.RuntimeException: Missing mutation ShortMutationId{2,
> 1, 0}
>
>
>
> at
> org.apache.cassandra.service.reads.tracked.PartialTrackedRead.augment(PartialTrackedRead.java:261)
>
>
>
> at java.base/java.lang.Iterable.forEach(Iterable.java:75)
>
>
>
>
> at
> org.apache.cassandra.service.reads.tracked.PartialTrackedRead.augment(PartialTrackedRead.java:247)
>
>
>
> at
> org.apache.cassandra.service.reads.tracked.TrackedLocalReads$Coordinator.lambda$acknowledgeReconcile$0(TrackedLocalReads.java:256)
>
>
>
> at
> org.apache.cassandra.concurrent.FutureTask$2.call(FutureTask.java:124)
>
>
>
> at org.apache.cassandra.concurrent.FutureTask.call(FutureTask.java:61)
>
>
>
>
> at org.apache.cassandra.concurrent.FutureTask.run(FutureTask.java:71)
>
>
>
>
> at
> org.apache.cassandra.simulator.systems.InterceptingExecutor$InterceptingPooledExecutor$WaitingThread.lambda$new$1(InterceptingExecutor.java:284)
>
> {code}
> The read reconciliation protocol requires each node to verify it has all
> mutations that other nodes have. When a node discovers it's missing a
> mutation, it registers a callback ("notify me when this mutation arrives")
> and sends a pull request to fetch it. The callback decrements a counter, and
> the read can only
> complete when all counters reach zero.
> A race condition in the duplicate mutation handling causes the callback to
> never fire, leaving the counter permanently stuck at 1 and the read hanging
> until timeout.
> h2. The race
> Setup: Node2 is a summary node for a tracked read. Node3 is the data node.
> Node2 creates its mutation summary for the read. At this point, coordinator
> log
> {2, 20} has offsets [0,0] — node2 knows about offset 0 only.
> A write happens on node2 (normal replication, unrelated to the read). This
> write gets offset 1 in coordinator log \{2, 20}
> . The write goes through {{{}Keyspace.applyInternalTracked(){}}}:
> {{startWriting()}} returns true (first time seeing this offset), the mutation
> is applied to memtables, {{finishWriting()}} calls
> {{invokeListeners(mutationId)}} — but nobody is listening yet, so it's a
> no-op.
> Node3 sends its summary to node2. Node3's summary has [0,1] for log
> {2, 20} — it knows about both offsets 0 and 1.
> Node2 processes node3's summary in {{{}acceptRemoteSummary(){}}}.
> {{collectLocallyMissingMutations()}} compares node3's summary against node2's
> current local state. Even though the mutation arrived in step 2, there is a
> window where the collection logic still identifies offset 1 as missing —
> {{collectLocallyMissingMutations()}} takes a read lock on the CoordinatorLog
> and computes the difference against {{{}witnessedOffsets{}}}, but this check
> can race with the write in step 2. It finds offset 1 is missing, calls
> {{registerMutationCallback()}} which succeeds (first listener — the earlier
> {{invokeListeners}} found no listeners and was a no-op), increments the
> remaining mutations counter by 1, and calls {{pull()}} which sends
> {{PULL_MUTATIONS_REQ}} to node2 itself (since log \{2, 20}
> is node2's own coordinator log).
> Node2 receives its own pull request. {{PullMutationsRequest}} handler calls
> {{{}requestMissingMutations(){}}}, which schedules the mutation for delivery
> via the {{{}ActiveLogReconciler{}}}.
> The {{ActiveLogReconciler}} looks up the mutation in the journal (it's
> there), reads it, and sends a {{PUSH_MUTATION_REQ}} back to node2.
> Node2 receives the pushed mutation. {{PushMutationRequest}} handler calls
> {{{}mutation.applyFuture(){}}}, which goes through
> {{{}Keyspace.applyInternalTracked(){}}}.
> {{MutationTrackingService.startWriting()}} is called, but
> {{CoordinatorLog.startWriting()}} checks {{witnessedOffsets}} and finds
> offset 1 is already
> witnessed from step 2 — returns false. Because {{started}} is false,
> {{finishWriting()}} is never called, {{invokeListeners()}} is never called,
> the callback registered in step 4 is never invoked, and the remaining
> mutations counter stays at 1 forever.
> Node2 never completes reconciliation, so it never sends
> {{READ_RECONCILE_ACK}} to node3. Node3 is waiting for node2's syncAck. It
> never arrives. After 60 seconds, the client times out with
> {{{}OperationTimedOutException{}}}.
> h2. The fix
> In {{{}MutationTrackingService.startWriting(){}}}, when the underlying
> {{CoordinatorLog.startWriting()}} returns false (duplicate mutation already
> witnessed), we now call
> {{{}incomingMutations.invokeListeners(mutation.id()){}}}. This fires any
> pending callbacks even for duplicate mutations. The callback fires, the
> counter
> decrements to 0, the summary node completes reconciliation and sends its
> syncAck, and the read succeeds.
> This is safe because {{startWriting()}} returning false means
> {{finishWriting()}} already completed for this offset (it's the only code
> path that adds to {{{}witnessedOffsets{}}}), so the mutation data is fully
> written and visible.
> {code:java}
> // MutationTrackingService.startWriting()
> boolean started =
> getOrCreateShards(mutation.getKeyspaceName()).startWriting(mutation);
> // If this is a duplicate mutation (already witnessed), notify any pending
> read
> // reconciliation listeners. A listener can be registered between the first
> write's
> // invokeListeners() call (which found no listeners) and this duplicate's
> arrival,
> // causing the listener to never fire and the read to hang.
> if (!started)
> incomingMutations.invokeListeners(mutation.id());
> return started;
> {code}
> h2. Alternatives
> "check-after-register" in {{ReadReconciliations.pull()}} that would avoid the
> unnecessary pull round-trip entirely: after registering the callback,
> immediately check if the mutation is already in {{{}witnessedOffsets{}}}, and
> if so fire the callback directly without sending a pull request.
> It might also work to leverage the locking to make the processing of
> {{collectLocallyMissingMutations()}} atomic with registration and
> notification of listeners, but that wasn't as straightforward a fix.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]