This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 0ef232f88b CEP-15: (Accord) SyncPoint timeouts become a Exhausted 
rather than a Timeout and doesn’t get retried
0ef232f88b is described below

commit 0ef232f88b83a8d1bba6e406c9075e9e051d0241
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Tue Jun 18 12:20:22 2024 -0700

    CEP-15: (Accord) SyncPoint timeouts become a Exhausted rather than a 
Timeout and doesn’t get retried
    
    patch by David Capwell; reviewed by Ariel Weisberg for CASSANDRA-19718
---
 modules/accord                                     |   2 +-
 src/java/org/apache/cassandra/net/Verb.java        |   4 +-
 .../cassandra/service/accord/AccordService.java    |  78 ++++++---
 .../accord/exceptions/ReadExhaustedException.java  |  39 +++++
 .../apache/cassandra/streaming/StreamSession.java  |  10 +-
 src/java/org/apache/cassandra/utils/Blocking.java  |  42 +++++
 .../test/accord/AccordBootstrapTest.java           |   2 +-
 .../apache/cassandra/repair/LocalSyncTaskTest.java |   2 +-
 .../service/accord/AccordServiceTest.java          | 185 +++++++++++++++++++++
 9 files changed, 338 insertions(+), 26 deletions(-)

diff --git a/modules/accord b/modules/accord
index 37c957c719..f1f5ea5ccb 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 37c957c719491634f081b39900ebf708079ef3ee
+Subproject commit f1f5ea5ccbd6e0a8abf579a4331fa84a1b3d9f95
diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index eb244f1bc6..151b59c8df 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -335,8 +335,8 @@ public enum Verb
     ACCORD_GET_EPHMRL_READ_DEPS_REQ (162, P2, writeTimeout, IMMEDIATE,         
 () -> GetEphmrlReadDepsSerializers.request, AccordService::verbHandlerOrNoop, 
ACCORD_GET_EPHMRL_READ_DEPS_RSP),
     ACCORD_GET_MAX_CONFLICT_RSP     (163, P2, writeTimeout, IMMEDIATE,         
 () -> GetMaxConflictSerializers.reply,      RESPONSE_HANDLER                   
                                         ),
     ACCORD_GET_MAX_CONFLICT_REQ     (164, P2, writeTimeout, IMMEDIATE,         
 () -> GetMaxConflictSerializers.request,    AccordService::verbHandlerOrNoop, 
ACCORD_GET_MAX_CONFLICT_RSP),
-    ACCORD_FETCH_DATA_RSP           (145, P2, repairTimeout,IMMEDIATE,         
 () -> FetchSerializers.reply,               RESPONSE_HANDLER                   
                                         ),
-    ACCORD_FETCH_DATA_REQ           (146, P2, repairTimeout,IMMEDIATE,         
 () -> FetchSerializers.request,             AccordService::verbHandlerOrNoop, 
ACCORD_FETCH_DATA_RSP                     ),
+    ACCORD_FETCH_DATA_RSP           (145, P2, writeTimeout,IMMEDIATE,          
 () -> FetchSerializers.reply,               RESPONSE_HANDLER                   
                                         ),
+    ACCORD_FETCH_DATA_REQ           (146, P2, writeTimeout,IMMEDIATE,          
 () -> FetchSerializers.request,             AccordService::verbHandlerOrNoop, 
ACCORD_FETCH_DATA_RSP                     ),
     ACCORD_SET_SHARD_DURABLE_REQ    (147, P2, writeTimeout, IMMEDIATE,         
 () -> SetDurableSerializers.shardDurable,   AccordService::verbHandlerOrNoop, 
ACCORD_SIMPLE_RSP                         ),
     ACCORD_SET_GLOBALLY_DURABLE_REQ (148, P2, writeTimeout, IMMEDIATE,         
 () -> SetDurableSerializers.globallyDurable,AccordService::verbHandlerOrNoop, 
ACCORD_SIMPLE_RSP                         ),
     ACCORD_QUERY_DURABLE_BEFORE_RSP (149, P2, writeTimeout, IMMEDIATE,         
 () -> QueryDurableBeforeSerializers.reply,  RESPONSE_HANDLER                   
                                         ),
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 8764addbf5..d3954a208a 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -35,10 +35,14 @@ import javax.annotation.concurrent.GuardedBy;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.primitives.Ints;
 
 import accord.coordinate.Barrier;
 import accord.coordinate.CoordinateSyncPoint;
+import accord.coordinate.Exhausted;
+import accord.coordinate.FailureAccumulator;
 import accord.coordinate.TopologyMismatch;
 import accord.impl.CoordinateDurabilityScheduling;
 import accord.primitives.SyncPoint;
@@ -47,6 +51,7 @@ import 
org.apache.cassandra.cql3.statements.RequestValidations;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.accord.exceptions.ReadExhaustedException;
 import 
org.apache.cassandra.service.accord.interop.AccordInteropAdapter.AccordInteropFactory;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.service.accord.repair.RepairSyncPointAdapter;
@@ -392,6 +397,13 @@ public class AccordService implements IAccordService, 
Shutdownable
                 // Protocol also doesn't have a way to denote "unknown" 
outcome, so using a timeout as the closest match
                 throw newBarrierPreempted(txnId, barrierType.global);
             }
+            if (cause instanceof Exhausted)
+            {
+                // this case happens when a non-timeout exception is seen, and 
we are unable to move forward
+                metrics.failures.mark();
+                throw newBarrierExhausted(txnId, barrierType.global);
+            }
+            // unknown error
             metrics.failures.mark();
             throw new RuntimeException(cause);
         }
@@ -430,35 +442,40 @@ public class AccordService implements IAccordService, 
Shutdownable
         return barrier(keysOrRanges, epoch, queryStartNanos, timeoutNanos, 
barrierType, isForWrite, repairSyncPoint(allNodes));
     }
 
-    private static ReadTimeoutException newBarrierTimeout(TxnId txnId, boolean 
global)
+    @VisibleForTesting
+    static ReadTimeoutException newBarrierTimeout(TxnId txnId, boolean global)
     {
         return new ReadTimeoutException(global ? ConsistencyLevel.ANY : 
ConsistencyLevel.QUORUM, 0, 0, false, txnId.toString());
     }
 
-    private static ReadTimeoutException newBarrierPreempted(TxnId txnId, 
boolean global)
+    @VisibleForTesting
+    static ReadTimeoutException newBarrierPreempted(TxnId txnId, boolean 
global)
     {
         return new ReadPreemptedException(global ? ConsistencyLevel.ANY : 
ConsistencyLevel.QUORUM, 0, 0, false, txnId.toString());
     }
 
-    private long doWithRetries(LongSupplier action, int retryAttempts, long 
initialBackoffMillis, long maxBackoffMillis) throws InterruptedException
+    @VisibleForTesting
+    static ReadExhaustedException newBarrierExhausted(TxnId txnId, boolean 
global)
+    {
+        //TODO (usability): not being able to show the txn is a bad UX, this 
becomes harder to trace back in logs
+        return new ReadExhaustedException(global ? ConsistencyLevel.ANY : 
ConsistencyLevel.QUORUM, 0, 0, false, ImmutableMap.of());
+    }
+
+    @VisibleForTesting
+    static boolean isTimeout(Throwable t)
+    {
+        return t instanceof Timeout || t instanceof ReadTimeoutException || t 
instanceof Preempted || t instanceof ReadPreemptedException;
+    }
+
+    @VisibleForTesting
+    static long doWithRetries(Blocking blocking, LongSupplier action, int 
retryAttempts, long initialBackoffMillis, long maxBackoffMillis) throws 
InterruptedException
     {
         // Since we could end up having the barrier transaction or the 
transaction it listens to invalidated
-        RuntimeException existingFailures = null;
+        Throwable existingFailures = null;
         Long success = null;
-        long backoffMillis = 0;
+        long backoffMillis = initialBackoffMillis;
         for (int attempt = 0; attempt < retryAttempts; attempt++)
         {
-            try
-            {
-                Thread.sleep(backoffMillis);
-            }
-            catch (InterruptedException e)
-            {
-                if (existingFailures != null)
-                    e.addSuppressed(existingFailures);
-                throw e;
-            }
-            backoffMillis = backoffMillis == 0 ? initialBackoffMillis : 
Math.min(backoffMillis * 2, maxBackoffMillis);
             try
             {
                 success = action.getAsLong();
@@ -466,13 +483,34 @@ public class AccordService implements IAccordService, 
Shutdownable
             }
             catch (RequestExecutionException | CoordinationFailed newFailures)
             {
-                existingFailures = Throwables.merge(existingFailures, 
newFailures);
+                existingFailures = FailureAccumulator.append(existingFailures, 
newFailures, AccordService::isTimeout);
+
+                try
+                {
+                    blocking.sleep(backoffMillis);
+                }
+                catch (InterruptedException e)
+                {
+                    if (existingFailures != null)
+                        e.addSuppressed(existingFailures);
+                    throw e;
+                }
+                backoffMillis = Math.min(backoffMillis * 2, maxBackoffMillis);
+            }
+            catch (Throwable t)
+            {
+                // if an unknown/unexpected error happens retry stops right 
away
+                if (existingFailures != null)
+                    t.addSuppressed(existingFailures);
+                existingFailures = t;
+                break;
             }
         }
         if (success == null)
         {
             checkState(existingFailures != null, "Didn't have success, but 
also didn't have failures");
-            throw existingFailures;
+            Throwables.throwIfUnchecked(existingFailures);
+            throw new RuntimeException(existingFailures);
         }
         return success;
     }
@@ -480,7 +518,7 @@ public class AccordService implements IAccordService, 
Shutdownable
     @Override
     public long barrierWithRetries(Seekables keysOrRanges, long minEpoch, 
BarrierType barrierType, boolean isForWrite) throws InterruptedException
     {
-        return doWithRetries(() -> 
AccordService.instance().barrier(keysOrRanges, minEpoch, 
Clock.Global.nanoTime(), 
DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos(), barrierType, 
isForWrite),
+        return doWithRetries(Blocking.Default.instance, () -> 
AccordService.instance().barrier(keysOrRanges, minEpoch, 
Clock.Global.nanoTime(), 
DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos(), barrierType, 
isForWrite),
                       DatabaseDescriptor.getAccordBarrierRetryAttempts(),
                       
DatabaseDescriptor.getAccordBarrierRetryInitialBackoffMillis(),
                       
DatabaseDescriptor.getAccordBarrierRetryMaxBackoffMillis());
@@ -489,7 +527,7 @@ public class AccordService implements IAccordService, 
Shutdownable
     @Override
     public long repairWithRetries(Seekables keysOrRanges, long minEpoch, 
BarrierType barrierType, boolean isForWrite, List<InetAddressAndPort> 
allEndpoints) throws InterruptedException
     {
-        return doWithRetries(() -> 
AccordService.instance().repair(keysOrRanges, minEpoch, 
Clock.Global.nanoTime(), 
DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos(), barrierType, 
isForWrite, allEndpoints),
+        return doWithRetries(Blocking.Default.instance, () -> 
AccordService.instance().repair(keysOrRanges, minEpoch, 
Clock.Global.nanoTime(), 
DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos(), barrierType, 
isForWrite, allEndpoints),
                              
DatabaseDescriptor.getAccordBarrierRetryAttempts(),
                              
DatabaseDescriptor.getAccordBarrierRetryInitialBackoffMillis(),
                              
DatabaseDescriptor.getAccordBarrierRetryMaxBackoffMillis());
diff --git 
a/src/java/org/apache/cassandra/service/accord/exceptions/ReadExhaustedException.java
 
b/src/java/org/apache/cassandra/service/accord/exceptions/ReadExhaustedException.java
new file mode 100644
index 0000000000..4ebfc8fdb0
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/service/accord/exceptions/ReadExhaustedException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.exceptions;
+
+import java.util.Map;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.exceptions.ReadFailureException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+public class ReadExhaustedException extends ReadFailureException
+{
+    public ReadExhaustedException(ConsistencyLevel consistency, int received, 
int blockFor, boolean dataPresent, Map<InetAddressAndPort, 
RequestFailureReason> failureReasonByEndpoint)
+    {
+        super(consistency, received, blockFor, dataPresent, 
failureReasonByEndpoint);
+    }
+
+    protected ReadExhaustedException(String msg, ConsistencyLevel consistency, 
int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, 
RequestFailureReason> failureReasonByEndpoint)
+    {
+        super(msg, consistency, received, blockFor, dataPresent, 
failureReasonByEndpoint);
+    }
+}
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 050e37c749..447621c78c 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -1306,8 +1306,16 @@ public class StreamSession
         return requests.size();
     }
 
-    @VisibleForTesting
     public int getNumTransfers()
+    {
+        return transfers.size();
+    }
+
+    //TODO (now, review): there were 2 tests that use this (nothing else) and 
both are checking that its > 1... but in both cases they are checking if there 
are transfer tasks, but there isn't any as the range doesn't have data...
+    // This looks like AccordBootstrapTest and LocalSyncTaskTest have a test 
bug, so rather than fixing this method was created to keep the old semantic...
+    @Deprecated(since = "5.1")
+    @VisibleForTesting
+    public int getNumKeyspaceTransfers()
     {
         return transferredRangesPerKeyspace.size();
     }
diff --git a/src/java/org/apache/cassandra/utils/Blocking.java 
b/src/java/org/apache/cassandra/utils/Blocking.java
new file mode 100644
index 0000000000..e04e53b090
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/Blocking.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+public interface Blocking
+{
+    default void sleep(long millis) throws InterruptedException
+    {
+        sleep(millis, TimeUnit.MILLISECONDS);
+    }
+
+    void sleep(long value, TimeUnit unit) throws InterruptedException;
+
+    enum Default implements Blocking
+    {
+        instance;
+
+        @Override
+        public void sleep(long value, TimeUnit unit) throws 
InterruptedException
+        {
+            unit.sleep(value);
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
index 2241a8c911..14ae7d1cde 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
@@ -267,7 +267,7 @@ public class AccordBootstrapTest extends TestBaseImpl
                     StreamListener.listener.forSession(session -> {
                         Assert.assertEquals(node3Addr, 
session.peer.getAddress());
                         Assert.assertEquals(0, session.getNumRequests());
-                        Assert.assertTrue(session.getNumTransfers() > 0);
+                        Assert.assertTrue(session.getNumKeyspaceTransfers() > 
0);
                     });
 
                     
awaitUninterruptiblyAndRethrow(service().node().commandStores().forEach(safeStore
 -> {
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java 
b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index 95f630dc05..c03fe2aac7 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -171,7 +171,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest
         StreamCoordinator coordinator = plan.getCoordinator();
         StreamSession session = 
Iterables.getOnlyElement(coordinator.getAllStreamSessions());
         assertEquals(expectedIncoming, session.getNumRequests());
-        assertEquals(expectedOutgoing, session.getNumTransfers());
+        assertEquals(expectedOutgoing, session.getNumKeyspaceTransfers());
     }
 
     @Test
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordServiceTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordServiceTest.java
new file mode 100644
index 0000000000..ea88d11977
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/accord/AccordServiceTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.LongSupplier;
+
+import org.junit.Test;
+
+import accord.coordinate.Exhausted;
+import accord.coordinate.Preempted;
+import accord.coordinate.Timeout;
+import accord.primitives.TxnId;
+import org.apache.cassandra.utils.Blocking;
+import org.assertj.core.api.Condition;
+import org.mockito.Mockito;
+
+import static accord.utils.Property.qt;
+import static org.apache.cassandra.service.accord.AccordService.doWithRetries;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class AccordServiceTest
+{
+    @Test
+    public void retryExpectedFailures() throws InterruptedException
+    {
+        Blocking blocking = Mockito.mock(Blocking.class);
+        class Task implements LongSupplier
+        {
+            private int attempts = 0;
+
+            @Override
+            public long getAsLong()
+            {
+                switch (attempts)
+                {
+                    case 0:
+                        attempts++;
+                        throw new Timeout(null, null);
+                    case 1:
+                        attempts++;
+                        throw AccordService.newBarrierTimeout(TxnId.NONE, 
true);
+                    case 2:
+                        attempts++;
+                        throw new Preempted(null, null);
+                    case 3:
+                        attempts++;
+                        throw AccordService.newBarrierPreempted(TxnId.NONE, 
true);
+                    case 4:
+                        attempts++;
+                        throw new Exhausted(null, null);
+                    case 5:
+                        attempts++;
+                        throw AccordService.newBarrierExhausted(TxnId.NONE, 
true);
+                    default:
+                        return 42;
+                }
+            }
+        }
+        Task failing = new Task();
+        assertThat(doWithRetries(blocking, failing, Integer.MAX_VALUE, 100, 
1000)).isEqualTo(42);
+        verify(blocking).sleep(100);
+        verify(blocking).sleep(200);
+        verify(blocking).sleep(400);
+        verify(blocking).sleep(800);
+        verify(blocking, times(2)).sleep(1000); // hit max backoff, so stays 
at 1k
+    }
+
+    @Test
+    public void retryThrowsTimeout()
+    {
+        Blocking blocking = Mockito.mock(Blocking.class);
+        qt().check(rs -> {
+            List<Runnable> timeoutFailures = new ArrayList<>(4);
+            timeoutFailures.add(() -> {throw new Timeout(null, null);});
+            timeoutFailures.add(() -> {throw 
AccordService.newBarrierTimeout(TxnId.NONE, true);});
+            timeoutFailures.add(() -> {throw new Preempted(null, null);});
+            timeoutFailures.add(() -> {throw 
AccordService.newBarrierPreempted(TxnId.NONE, true);});
+            Collections.shuffle(timeoutFailures, rs.asJdkRandom());
+            Iterator<Runnable> it = timeoutFailures.iterator();
+            LongSupplier failing = () -> {
+                if (!it.hasNext()) throw new IllegalStateException("Called too 
many times");
+                it.next().run(); // this throws...
+                return 42;
+            };
+            assertThatThrownBy(() -> doWithRetries(blocking, failing, 
timeoutFailures.size(), 100, 1000)).is(new 
Condition<>(AccordService::isTimeout, "timeout"));
+            assertThat(it).isExhausted();
+        });
+    }
+
+    @Test
+    public void retryThrowsNonTimeout()
+    {
+        Blocking blocking = Mockito.mock(Blocking.class);
+        qt().check(rs -> {
+            List<Runnable> timeoutFailures = new ArrayList<>(5);
+            timeoutFailures.add(() -> {throw new Timeout(null, null);});
+            timeoutFailures.add(() -> {throw 
AccordService.newBarrierTimeout(TxnId.NONE, true);});
+            timeoutFailures.add(() -> {throw new Preempted(null, null);});
+            timeoutFailures.add(() -> {throw 
AccordService.newBarrierPreempted(TxnId.NONE, true);});
+            timeoutFailures.add(() -> {throw new Exhausted(null, null);});
+            Collections.shuffle(timeoutFailures, rs.asJdkRandom());
+            Iterator<Runnable> it = timeoutFailures.iterator();
+            LongSupplier failing = () -> {
+                if (!it.hasNext()) throw new IllegalStateException("Called too 
many times");
+                it.next().run(); // this throws...
+                return 42;
+            };
+            assertThatThrownBy(() -> doWithRetries(blocking, failing, 
timeoutFailures.size(), 100, 1000)).isInstanceOf(Exhausted.class);
+            assertThat(it).isExhausted();
+        });
+    }
+
+    @Test
+    public void retryShortCircuitError()
+    {
+        class Unexpected implements Runnable
+        {
+            final boolean isError;
+
+            Unexpected(boolean isError)
+            {
+                this.isError = isError;
+            }
+
+            @Override
+            public void run()
+            {
+                if (isError) throw new AssertionError();
+                throw new NullPointerException();
+            }
+        }
+        qt().check(rs -> {
+            List<Runnable> failures = new ArrayList<>(6);
+            failures.add(() -> {throw new Timeout(null, null);});
+            failures.add(() -> {throw 
AccordService.newBarrierTimeout(TxnId.NONE, true);});
+            failures.add(() -> {throw new Preempted(null, null);});
+            failures.add(() -> {throw 
AccordService.newBarrierPreempted(TxnId.NONE, true);});
+            failures.add(() -> {throw new Exhausted(null, null);});
+            boolean isError = rs.nextBoolean();
+            failures.add(new Unexpected(isError));
+            Collections.shuffle(failures, rs.asJdkRandom());
+            int unexpectedIndex = -1;
+            for (int i = 0; i < failures.size(); i++)
+            {
+                if (failures.get(i) instanceof Unexpected)
+                {
+                    unexpectedIndex = i;
+                    break;
+                }
+            }
+            Iterator<Runnable> it = failures.iterator();
+            LongSupplier failing = () -> {
+                if (!it.hasNext()) throw new IllegalStateException("Called too 
many times");
+                it.next().run(); // this throws...
+                return 42;
+            };
+            Blocking blocking = Mockito.mock(Blocking.class);
+            assertThatThrownBy(() -> doWithRetries(blocking, failing, 
failures.size(), 100, 1000)).isInstanceOf(isError ? AssertionError.class : 
NullPointerException.class);
+            verify(blocking, times(unexpectedIndex)).sleep(Mockito.anyLong());
+        });
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to