This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 38f7789534 Fix flaky InProgressSequenceCoordinationTest by increasing
request_timeout and ensuring background threads are joined before test cleanup
38f7789534 is described below
commit 38f77895346900090933975beee2fd62adfef2bd
Author: samlightfoot <[email protected]>
AuthorDate: Wed Feb 25 08:42:55 2026 +0000
Fix flaky InProgressSequenceCoordinationTest by increasing request_timeout
and ensuring background threads are joined before test cleanup
Patch by Sam Lightfoot; reviewed by Dmitry Konstantinov and Sam
Tunnicliffe for CASSANDRA-21189
---
.../log/InProgressSequenceCoordinationTest.java | 196 +++++++++++++--------
1 file changed, 118 insertions(+), 78 deletions(-)
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java
index c23bd3d751..94558ac66e 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java
@@ -70,7 +70,7 @@ public class InProgressSequenceCoordinationTest extends
FuzzTestBase
{
try (Cluster cluster = builder().withNodes(3)
.appendConfig(cfg ->
cfg.set("progress_barrier_timeout", "5000ms")
-
.set("request_timeout", "1000ms")
+
.set("request_timeout", "5000ms")
.set("progress_barrier_backoff", "100ms")
.with(Feature.NETWORK, Feature.GOSSIP))
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
@@ -100,25 +100,33 @@ public class InProgressSequenceCoordinationTest extends
FuzzTestBase
// The StartJoin event has no prerequisite, so we should see a
CONTINUING state, but execution of the
// MidJoin should be BLOCKED as nodes 2 & 3 can't ack the
StartJoin.
Callable<Void> progressBlocked = waitForListener(newInstance,
continuable(), blocked());
- new Thread(() -> newInstance.startup()).start();
- progressBlocked.call();
-
- // Remove the partition between nodes 2 & 3 and the CMS and have
them catch up.
- cluster.filters().reset();
- cluster.get(2).runOnInstance(() ->
ClusterMetadataService.instance().processor().fetchLogAndWait());
- cluster.get(3).runOnInstance(() ->
ClusterMetadataService.instance().processor().fetchLogAndWait());
-
- // Unpause the joining node and have it retry the bootstrap
sequence from MidJoin, this time the
- // expectation is that it will succeed, so we can just clear out
the listener.
- newInstance.runOnInstance(() -> {
- TestExecutionListener listener = (TestExecutionListener)
InProgressSequences.listener;
- listener.restorePrevious();
- listener.releaseAndRetry();
- });
-
- // Wait for the cluster to all witness the finish join event.
- Epoch finalEpoch = finishJoinEpoch.call();
- ClusterUtils.waitForCMSToQuiesce(cluster, finalEpoch);
+ Thread startupThread = new Thread(newInstance::startup);
+ startupThread.start();
+ try
+ {
+ progressBlocked.call();
+
+ // Remove the partition between nodes 2 & 3 and the CMS and
have them catch up.
+ cluster.filters().reset();
+ cluster.get(2).runOnInstance(() ->
ClusterMetadataService.instance().processor().fetchLogAndWait());
+ cluster.get(3).runOnInstance(() ->
ClusterMetadataService.instance().processor().fetchLogAndWait());
+
+ // Unpause the joining node and have it retry the bootstrap
sequence from MidJoin, this time the
+ // expectation is that it will succeed, so we can just clear
out the listener.
+ newInstance.runOnInstance(() -> {
+ TestExecutionListener listener = (TestExecutionListener)
InProgressSequences.listener;
+ listener.restorePrevious();
+ listener.releaseAndRetry();
+ });
+
+ // Wait for the cluster to all witness the finish join event.
+ Epoch finalEpoch = finishJoinEpoch.call();
+ ClusterUtils.waitForCMSToQuiesce(cluster, finalEpoch);
+ }
+ finally
+ {
+ releaseAndJoin(newInstance, startupThread);
+ }
}
}
@@ -128,7 +136,7 @@ public class InProgressSequenceCoordinationTest extends
FuzzTestBase
try (Cluster cluster = builder().withNodes(4)
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
.appendConfig(cfg ->
cfg.set("progress_barrier_timeout", "5000ms")
-
.set("request_timeout", "1000ms")
+
.set("request_timeout", "5000ms")
.set("progress_barrier_backoff", "100ms")
.with(Feature.NETWORK, Feature.GOSSIP))
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
@@ -151,44 +159,51 @@ public class InProgressSequenceCoordinationTest extends
FuzzTestBase
Callable<Void> progressBlocked = waitForListener(leavingInstance,
blocked());
Thread t = new Thread(() -> leavingInstance.runOnInstance(() ->
StorageService.instance.decommission(true)));
t.start();
- progressBlocked.call();
- // Remove the partition between nodes 2 & 3 and the CMS and have
them catch up.
- cluster.filters().reset();
- cluster.get(2).runOnInstance(() ->
ClusterMetadataService.instance().processor().fetchLogAndWait());
- cluster.get(3).runOnInstance(() ->
ClusterMetadataService.instance().processor().fetchLogAndWait());
-
- // Now re-partition nodes 2 & 3 so that the MidLeave event cannot
be submitted by node 1 as 2 & 3 won't
- // receive/ack the StartLeave.
- cluster.filters().allVerbs().from(1).to(2,3).drop();
- cluster.filters().verbs(TCM_FETCH_PEER_LOG_RSP.id).from(4).to(2,
3).drop();
-
- // Unpause the leaving node and have it retry the StartLeave,
which should now be able to proceed as 2 & 3
- // will ack the PrepareJoin. Its progress should be blocked as it
comes to submit the next event, its
- // MidJoin, so set a new BLOCKED expectation.
- progressBlocked = waitForExistingListener(leavingInstance,
blocked());
- leavingInstance.runOnInstance(() -> {
- TestExecutionListener listener = (TestExecutionListener)
InProgressSequences.listener;
- listener.releaseAndRetry();
- });
- progressBlocked.call();
-
- // Heal the partition again and force a catch up.
- cluster.filters().reset();
- cluster.get(2).runOnInstance(() ->
ClusterMetadataService.instance().processor().fetchLogAndWait());
- cluster.get(3).runOnInstance(() ->
ClusterMetadataService.instance().processor().fetchLogAndWait());
-
- // Unpause the leaving node and have it retry the MidLeave, which
will now be able to proceed as 2 & 3 will
- // ack the StartJoin. This time the expectation is that the
remaining events will be successfully acked, so
- // we can just clear out the listener.
- leavingInstance.runOnInstance(() -> {
- TestExecutionListener listener = (TestExecutionListener)
InProgressSequences.listener;
- listener.restorePrevious();
- listener.releaseAndRetry();
- });
-
- // Wait for the cluster to all witness the finish join event.
- Epoch finalEpoch = finishLeaveEpoch.call();
- ClusterUtils.waitForCMSToQuiesce(cluster, finalEpoch);
+ try
+ {
+ progressBlocked.call();
+ // Remove the partition between nodes 2 & 3 and the CMS and
have them catch up.
+ cluster.filters().reset();
+ cluster.get(2).runOnInstance(() ->
ClusterMetadataService.instance().processor().fetchLogAndWait());
+ cluster.get(3).runOnInstance(() ->
ClusterMetadataService.instance().processor().fetchLogAndWait());
+
+ // Now re-partition nodes 2 & 3 so that the MidLeave event
cannot be submitted by node 1 as 2 & 3 won't
+ // receive/ack the StartLeave.
+ cluster.filters().allVerbs().from(1).to(2,3).drop();
+
cluster.filters().verbs(TCM_FETCH_PEER_LOG_RSP.id).from(4).to(2, 3).drop();
+
+ // Unpause the leaving node and have it retry the StartLeave,
which should now be able to proceed as 2 & 3
+ // will ack the PrepareJoin. Its progress should be blocked as
it comes to submit the next event, its
+ // MidJoin, so set a new BLOCKED expectation.
+ progressBlocked = waitForExistingListener(leavingInstance,
blocked());
+ leavingInstance.runOnInstance(() -> {
+ TestExecutionListener listener = (TestExecutionListener)
InProgressSequences.listener;
+ listener.releaseAndRetry();
+ });
+ progressBlocked.call();
+
+ // Heal the partition again and force a catch up.
+ cluster.filters().reset();
+ cluster.get(2).runOnInstance(() ->
ClusterMetadataService.instance().processor().fetchLogAndWait());
+ cluster.get(3).runOnInstance(() ->
ClusterMetadataService.instance().processor().fetchLogAndWait());
+
+ // Unpause the leaving node and have it retry the MidLeave,
which will now be able to proceed as 2 & 3 will
+ // ack the StartJoin. This time the expectation is that the
remaining events will be successfully acked, so
+ // we can just clear out the listener.
+ leavingInstance.runOnInstance(() -> {
+ TestExecutionListener listener = (TestExecutionListener)
InProgressSequences.listener;
+ listener.restorePrevious();
+ listener.releaseAndRetry();
+ });
+
+ // Wait for the cluster to all witness the finish join event.
+ Epoch finalEpoch = finishLeaveEpoch.call();
+ ClusterUtils.waitForCMSToQuiesce(cluster, finalEpoch);
+ }
+ finally
+ {
+ releaseAndJoin(leavingInstance, t);
+ }
}
}
@@ -197,7 +212,7 @@ public class InProgressSequenceCoordinationTest extends
FuzzTestBase
{
try (Cluster cluster = builder().withNodes(3)
.appendConfig(cfg ->
cfg.set("progress_barrier_timeout", "5000ms")
-
.set("request_timeout", "1000ms")
+
.set("request_timeout", "5000ms")
.set("progress_barrier_backoff", "100ms")
.with(Feature.NETWORK, Feature.GOSSIP))
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
@@ -226,31 +241,39 @@ public class InProgressSequenceCoordinationTest extends
FuzzTestBase
// Have the joining node pause when the StartReplace event fails
due to ack timeout.
Callable<Void> progressBlocked = waitForListener(replacement,
continuable(), blocked());
- new Thread(() -> {
+ Thread startupThread = new Thread(() -> {
try (WithProperties replacementProps = new WithProperties())
{
replacementProps.set(REPLACE_ADDRESS_FIRST_BOOT,
toReplace.config().broadcastAddress().getAddress().getHostAddress());
replacement.startup();
}
- }).start();
- progressBlocked.call();
-
- // Remove the partition between node 2 and the CMS and have it
catch up.
- cluster.filters().reset();
- cluster.get(2).runOnInstance(() ->
ClusterMetadataService.instance().processor().fetchLogAndWait());
-
- // Unpause the joining node and have it retry the bootstrap
sequence from StartReplace, this time the
- // expectation is that it will succeed, so we can just clear out
the listener.
- replacement.runOnInstance(() -> {
- TestExecutionListener listener = (TestExecutionListener)
InProgressSequences.listener;
- listener.restorePrevious();
- listener.releaseAndRetry();
});
-
- // Wait for the cluster to all witness the finish join event.
- Epoch finalEpoch = finishReplaceEpoch.call();
- ClusterUtils.waitForCMSToQuiesce(cluster, finalEpoch);
+ startupThread.start();
+ try
+ {
+ progressBlocked.call();
+
+ // Remove the partition between node 2 and the CMS and have it
catch up.
+ cluster.filters().reset();
+ cluster.get(2).runOnInstance(() ->
ClusterMetadataService.instance().processor().fetchLogAndWait());
+
+ // Unpause the joining node and have it retry the bootstrap
sequence from StartReplace, this time the
+ // expectation is that it will succeed, so we can just clear
out the listener.
+ replacement.runOnInstance(() -> {
+ TestExecutionListener listener = (TestExecutionListener)
InProgressSequences.listener;
+ listener.restorePrevious();
+ listener.releaseAndRetry();
+ });
+
+ // Wait for the cluster to all witness the finish join event.
+ Epoch finalEpoch = finishReplaceEpoch.call();
+ ClusterUtils.waitForCMSToQuiesce(cluster, finalEpoch);
+ }
+ finally
+ {
+ releaseAndJoin(replacement, startupThread);
+ }
}
}
@@ -338,6 +361,16 @@ public class InProgressSequenceCoordinationTest extends
FuzzTestBase
return remoteCallable::call;
}
+ // Unblock the thread if it's still waiting on the listener barrier, then
join to ensure clean shutdown.
+ private void releaseAndJoin(IInvokableInstance instance, Thread thread)
throws InterruptedException
+ {
+ instance.runOnInstance(() -> {
+ if (InProgressSequences.listener instanceof TestExecutionListener)
+ ((TestExecutionListener)
InProgressSequences.listener).releaseWithoutRetry();
+ });
+ thread.join();
+ }
+
public static class TestExecutionListener implements
BiFunction<MultiStepOperation<?>, SequenceState, SequenceState>
{
volatile boolean retry = true;
@@ -405,5 +438,12 @@ public class InProgressSequenceCoordinationTest extends
FuzzTestBase
retry = true;
barrier.signal();
}
+
+ public void releaseWithoutRetry()
+ {
+ retry = false;
+ if (barrier != null)
+ barrier.signal();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]