This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 38f7789534 Fix flaky InProgressSequenceCoordinationTest by increasing 
request_timeout and ensuring background threads are joined before test cleanup
38f7789534 is described below

commit 38f77895346900090933975beee2fd62adfef2bd
Author: samlightfoot <[email protected]>
AuthorDate: Wed Feb 25 08:42:55 2026 +0000

    Fix flaky InProgressSequenceCoordinationTest by increasing request_timeout 
and ensuring background threads are joined before test cleanup
    
    Patch by Sam Lightfoot; reviewed by Dmitry Konstantinov and Sam
    Tunnicliffe for CASSANDRA-21189
---
 .../log/InProgressSequenceCoordinationTest.java    | 196 +++++++++++++--------
 1 file changed, 118 insertions(+), 78 deletions(-)

diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java
index c23bd3d751..94558ac66e 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java
@@ -70,7 +70,7 @@ public class InProgressSequenceCoordinationTest extends 
FuzzTestBase
     {
         try (Cluster cluster = builder().withNodes(3)
                                         .appendConfig(cfg -> 
cfg.set("progress_barrier_timeout", "5000ms")
-                                                                
.set("request_timeout", "1000ms")
+                                                                
.set("request_timeout", "5000ms")
                                                                 
.set("progress_barrier_backoff", "100ms")
                                                                 
.with(Feature.NETWORK, Feature.GOSSIP))
                                         
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
@@ -100,25 +100,33 @@ public class InProgressSequenceCoordinationTest extends 
FuzzTestBase
             // The StartJoin event has no prerequisite, so we should see a 
CONTINUING state, but execution of the
             // MidJoin should be BLOCKED as nodes 2 & 3 can't ack the 
StartJoin.
             Callable<Void> progressBlocked = waitForListener(newInstance, 
continuable(), blocked());
-            new Thread(() -> newInstance.startup()).start();
-            progressBlocked.call();
-
-            // Remove the partition between nodes 2 & 3 and the CMS and have 
them catch up.
-            cluster.filters().reset();
-            cluster.get(2).runOnInstance(() -> 
ClusterMetadataService.instance().processor().fetchLogAndWait());
-            cluster.get(3).runOnInstance(() -> 
ClusterMetadataService.instance().processor().fetchLogAndWait());
-
-            // Unpause the joining node and have it retry the bootstrap 
sequence from MidJoin, this time the
-            // expectation is that it will succeed, so we can just clear out 
the listener.
-            newInstance.runOnInstance(() -> {
-                TestExecutionListener listener = (TestExecutionListener) 
InProgressSequences.listener;
-                listener.restorePrevious();
-                listener.releaseAndRetry();
-            });
-
-            // Wait for the cluster to all witness the finish join event.
-            Epoch finalEpoch = finishJoinEpoch.call();
-            ClusterUtils.waitForCMSToQuiesce(cluster, finalEpoch);
+            Thread startupThread = new Thread(newInstance::startup);
+            startupThread.start();
+            try
+            {
+                progressBlocked.call();
+
+                // Remove the partition between nodes 2 & 3 and the CMS and 
have them catch up.
+                cluster.filters().reset();
+                cluster.get(2).runOnInstance(() -> 
ClusterMetadataService.instance().processor().fetchLogAndWait());
+                cluster.get(3).runOnInstance(() -> 
ClusterMetadataService.instance().processor().fetchLogAndWait());
+
+                // Unpause the joining node and have it retry the bootstrap 
sequence from MidJoin, this time the
+                // expectation is that it will succeed, so we can just clear 
out the listener.
+                newInstance.runOnInstance(() -> {
+                    TestExecutionListener listener = (TestExecutionListener) 
InProgressSequences.listener;
+                    listener.restorePrevious();
+                    listener.releaseAndRetry();
+                });
+
+                // Wait for the cluster to all witness the finish join event.
+                Epoch finalEpoch = finishJoinEpoch.call();
+                ClusterUtils.waitForCMSToQuiesce(cluster, finalEpoch);
+            }
+            finally
+            {
+                releaseAndJoin(newInstance, startupThread);
+            }
         }
     }
 
@@ -128,7 +136,7 @@ public class InProgressSequenceCoordinationTest extends 
FuzzTestBase
         try (Cluster cluster = builder().withNodes(4)
                                         
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
                                         .appendConfig(cfg -> 
cfg.set("progress_barrier_timeout", "5000ms")
-                                                                
.set("request_timeout", "1000ms")
+                                                                
.set("request_timeout", "5000ms")
                                                                 
.set("progress_barrier_backoff", "100ms")
                                                                 
.with(Feature.NETWORK, Feature.GOSSIP))
                                         
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
@@ -151,44 +159,51 @@ public class InProgressSequenceCoordinationTest extends 
FuzzTestBase
             Callable<Void> progressBlocked = waitForListener(leavingInstance, 
blocked());
             Thread t = new Thread(() -> leavingInstance.runOnInstance(() -> 
StorageService.instance.decommission(true)));
             t.start();
-            progressBlocked.call();
-            // Remove the partition between nodes 2 & 3 and the CMS and have 
them catch up.
-            cluster.filters().reset();
-            cluster.get(2).runOnInstance(() -> 
ClusterMetadataService.instance().processor().fetchLogAndWait());
-            cluster.get(3).runOnInstance(() -> 
ClusterMetadataService.instance().processor().fetchLogAndWait());
-
-            // Now re-partition nodes 2 & 3 so that the MidLeave event cannot 
be submitted by node 1 as 2 & 3 won't
-            // receive/ack the StartLeave.
-            cluster.filters().allVerbs().from(1).to(2,3).drop();
-            cluster.filters().verbs(TCM_FETCH_PEER_LOG_RSP.id).from(4).to(2, 
3).drop();
-
-            // Unpause the leaving node and have it retry the StartLeave, 
which should now be able to proceed as 2 & 3
-            // will ack the PrepareJoin. Its progress should be blocked as it 
comes to submit the next event, its
-            // MidJoin, so set a new BLOCKED expectation.
-            progressBlocked = waitForExistingListener(leavingInstance, 
blocked());
-            leavingInstance.runOnInstance(() -> {
-                TestExecutionListener listener = (TestExecutionListener) 
InProgressSequences.listener;
-                listener.releaseAndRetry();
-            });
-            progressBlocked.call();
-
-            // Heal the partition again and force a catch up.
-            cluster.filters().reset();
-            cluster.get(2).runOnInstance(() -> 
ClusterMetadataService.instance().processor().fetchLogAndWait());
-            cluster.get(3).runOnInstance(() -> 
ClusterMetadataService.instance().processor().fetchLogAndWait());
-
-            // Unpause the leaving node and have it retry the MidLeave, which 
will now be able to proceed as 2 & 3 will
-            // ack the StartJoin. This time the expectation is that the 
remaining events will be successfully acked, so
-            // we can just clear out the listener.
-            leavingInstance.runOnInstance(() -> {
-                TestExecutionListener listener = (TestExecutionListener) 
InProgressSequences.listener;
-                listener.restorePrevious();
-                listener.releaseAndRetry();
-            });
-
-            // Wait for the cluster to all witness the finish join event.
-            Epoch finalEpoch = finishLeaveEpoch.call();
-            ClusterUtils.waitForCMSToQuiesce(cluster, finalEpoch);
+            try
+            {
+                progressBlocked.call();
+                // Remove the partition between nodes 2 & 3 and the CMS and 
have them catch up.
+                cluster.filters().reset();
+                cluster.get(2).runOnInstance(() -> 
ClusterMetadataService.instance().processor().fetchLogAndWait());
+                cluster.get(3).runOnInstance(() -> 
ClusterMetadataService.instance().processor().fetchLogAndWait());
+
+                // Now re-partition nodes 2 & 3 so that the MidLeave event 
cannot be submitted by node 1 as 2 & 3 won't
+                // receive/ack the StartLeave.
+                cluster.filters().allVerbs().from(1).to(2,3).drop();
+                
cluster.filters().verbs(TCM_FETCH_PEER_LOG_RSP.id).from(4).to(2, 3).drop();
+
+                // Unpause the leaving node and have it retry the StartLeave, 
which should now be able to proceed as 2 & 3
+                // will ack the PrepareJoin. Its progress should be blocked as 
it comes to submit the next event, its
+                // MidJoin, so set a new BLOCKED expectation.
+                progressBlocked = waitForExistingListener(leavingInstance, 
blocked());
+                leavingInstance.runOnInstance(() -> {
+                    TestExecutionListener listener = (TestExecutionListener) 
InProgressSequences.listener;
+                    listener.releaseAndRetry();
+                });
+                progressBlocked.call();
+
+                // Heal the partition again and force a catch up.
+                cluster.filters().reset();
+                cluster.get(2).runOnInstance(() -> 
ClusterMetadataService.instance().processor().fetchLogAndWait());
+                cluster.get(3).runOnInstance(() -> 
ClusterMetadataService.instance().processor().fetchLogAndWait());
+
+                // Unpause the leaving node and have it retry the MidLeave, 
which will now be able to proceed as 2 & 3 will
+                // ack the StartJoin. This time the expectation is that the 
remaining events will be successfully acked, so
+                // we can just clear out the listener.
+                leavingInstance.runOnInstance(() -> {
+                    TestExecutionListener listener = (TestExecutionListener) 
InProgressSequences.listener;
+                    listener.restorePrevious();
+                    listener.releaseAndRetry();
+                });
+
+                // Wait for the cluster to all witness the finish join event.
+                Epoch finalEpoch = finishLeaveEpoch.call();
+                ClusterUtils.waitForCMSToQuiesce(cluster, finalEpoch);
+            }
+            finally
+            {
+                releaseAndJoin(leavingInstance, t);
+            }
         }
     }
 
@@ -197,7 +212,7 @@ public class InProgressSequenceCoordinationTest extends 
FuzzTestBase
     {
         try (Cluster cluster = builder().withNodes(3)
                                         .appendConfig(cfg -> 
cfg.set("progress_barrier_timeout", "5000ms")
-                                                                
.set("request_timeout", "1000ms")
+                                                                
.set("request_timeout", "5000ms")
                                                                 
.set("progress_barrier_backoff", "100ms")
                                                                 
.with(Feature.NETWORK, Feature.GOSSIP))
                                         
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
@@ -226,31 +241,39 @@ public class InProgressSequenceCoordinationTest extends 
FuzzTestBase
 
             // Have the joining node pause when the StartReplace event fails 
due to ack timeout.
             Callable<Void> progressBlocked = waitForListener(replacement, 
continuable(), blocked());
-            new Thread(() -> {
+            Thread startupThread = new Thread(() -> {
                 try (WithProperties replacementProps = new WithProperties())
                 {
                     replacementProps.set(REPLACE_ADDRESS_FIRST_BOOT,
                                          
toReplace.config().broadcastAddress().getAddress().getHostAddress());
                     replacement.startup();
                 }
-            }).start();
-            progressBlocked.call();
-
-            // Remove the partition between node 2 and the CMS and have it 
catch up.
-            cluster.filters().reset();
-            cluster.get(2).runOnInstance(() -> 
ClusterMetadataService.instance().processor().fetchLogAndWait());
-
-            // Unpause the joining node and have it retry the bootstrap 
sequence from StartReplace, this time the
-            // expectation is that it will succeed, so we can just clear out 
the listener.
-            replacement.runOnInstance(() -> {
-                TestExecutionListener listener = (TestExecutionListener) 
InProgressSequences.listener;
-                listener.restorePrevious();
-                listener.releaseAndRetry();
             });
-
-            // Wait for the cluster to all witness the finish join event.
-            Epoch finalEpoch = finishReplaceEpoch.call();
-            ClusterUtils.waitForCMSToQuiesce(cluster, finalEpoch);
+            startupThread.start();
+            try
+            {
+                progressBlocked.call();
+
+                // Remove the partition between node 2 and the CMS and have it 
catch up.
+                cluster.filters().reset();
+                cluster.get(2).runOnInstance(() -> 
ClusterMetadataService.instance().processor().fetchLogAndWait());
+
+                // Unpause the joining node and have it retry the bootstrap 
sequence from StartReplace, this time the
+                // expectation is that it will succeed, so we can just clear 
out the listener.
+                replacement.runOnInstance(() -> {
+                    TestExecutionListener listener = (TestExecutionListener) 
InProgressSequences.listener;
+                    listener.restorePrevious();
+                    listener.releaseAndRetry();
+                });
+
+                // Wait for the cluster to all witness the finish join event.
+                Epoch finalEpoch = finishReplaceEpoch.call();
+                ClusterUtils.waitForCMSToQuiesce(cluster, finalEpoch);
+            }
+            finally
+            {
+                releaseAndJoin(replacement, startupThread);
+            }
         }
     }
 
@@ -338,6 +361,16 @@ public class InProgressSequenceCoordinationTest extends 
FuzzTestBase
         return remoteCallable::call;
     }
 
+    // Unblock the thread if it's still waiting on the listener barrier, then 
join to ensure clean shutdown.
+    private void releaseAndJoin(IInvokableInstance instance, Thread thread) 
throws InterruptedException
+    {
+        instance.runOnInstance(() -> {
+            if (InProgressSequences.listener instanceof TestExecutionListener)
+                ((TestExecutionListener) 
InProgressSequences.listener).releaseWithoutRetry();
+        });
+        thread.join();
+    }
+
     public static class TestExecutionListener implements 
BiFunction<MultiStepOperation<?>, SequenceState, SequenceState>
     {
         volatile boolean retry = true;
@@ -405,5 +438,12 @@ public class InProgressSequenceCoordinationTest extends 
FuzzTestBase
             retry = true;
             barrier.signal();
         }
+
+        public void releaseWithoutRetry()
+        {
+            retry = false;
+            if (barrier != null)
+                barrier.signal();
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to