This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f1f5ea5c CEP-15: (Accord) SyncPoint timeouts become a Exhausted rather 
than a Timeout and doesn’t get retried (#99)
f1f5ea5c is described below

commit f1f5ea5ccbd6e0a8abf579a4331fa84a1b3d9f95
Author: dcapwell <dcapw...@apache.org>
AuthorDate: Tue Jun 18 16:25:54 2024 -0700

    CEP-15: (Accord) SyncPoint timeouts become a Exhausted rather than a 
Timeout and doesn’t get retried (#99)
    
    patch by David Capwell; reviewed by Ariel Weisberg for CASSANDRA-19718
---
 .../java/accord/coordinate/ExecuteSyncPoint.java   |  4 +-
 .../java/accord/coordinate/FailureAccumulator.java | 70 ++++++++++++++++++++++
 .../java/accord/coordinate/FetchCoordinator.java   | 14 ++---
 .../accord/coordinate/FailureAccumulatorTest.java  | 63 +++++++++++++++++++
 .../src/test/java/accord/impl/list/ListStore.java  |  2 +-
 5 files changed, 142 insertions(+), 11 deletions(-)

diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java 
b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
index ded97010..2bb8f800 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
@@ -71,6 +71,7 @@ public abstract class ExecuteSyncPoint<S extends Seekables<?, 
?>> extends Settab
     final Node node;
     final AbstractSimpleTracker<?> tracker;
     final SyncPoint<S> syncPoint;
+    private Throwable failures = null;
 
     ExecuteSyncPoint(Node node, AbstractSimpleTracker<?> tracker, SyncPoint<S> 
syncPoint)
     {
@@ -123,8 +124,9 @@ public abstract class ExecuteSyncPoint<S extends 
Seekables<?, ?>> extends Settab
     public synchronized void onFailure(Node.Id from, Throwable failure)
     {
         if (isDone()) return;
+        failures = FailureAccumulator.append(failures, failure);
         if (tracker.recordFailure(from) == RequestStatus.Failed)
-            tryFailure(new Exhausted(syncPoint.syncId, syncPoint.homeKey));
+            tryFailure(FailureAccumulator.createFailure(failures, 
syncPoint.syncId, syncPoint.homeKey));
     }
 
     @Override
diff --git 
a/accord-core/src/main/java/accord/coordinate/FailureAccumulator.java 
b/accord-core/src/main/java/accord/coordinate/FailureAccumulator.java
new file mode 100644
index 00000000..c5b537da
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/FailureAccumulator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import java.util.function.Predicate;
+import javax.annotation.Nullable;
+
+import accord.api.RoutingKey;
+import accord.primitives.TxnId;
+
+public class FailureAccumulator
+{
+    private FailureAccumulator() {}
+
+    public static Throwable append(@Nullable Throwable current, Throwable next)
+    {
+        return append(current, next, FailureAccumulator::isTimeout);
+    }
+
+    public static Throwable append(@Nullable Throwable current, Throwable 
next, Predicate<Throwable> isTimeout)
+    {
+        if (current == null) return next;
+        // when a non-timeout is seen make sure it shows up in current rather 
than timeout
+        // this is so checking if the cause is a timeout is able to do a 
single check rather
+        // than walk the whole chain
+        if (isTimeout.test(current) && !(isTimeout.test(next)))
+        {
+            Throwable tmp = current;
+            current = next;
+            next = tmp;
+        }
+        current.addSuppressed(next);
+        return current;
+    }
+
+    public static boolean isTimeout(@Nullable Throwable current)
+    {
+        return current instanceof Timeout;
+    }
+
+    public static CoordinationFailed createFailure(@Nullable Throwable 
current, TxnId txnId, @Nullable RoutingKey homeKey)
+    {
+        return createFailure(current, txnId, homeKey, null);
+    }
+
+    public static CoordinationFailed createFailure(@Nullable Throwable 
current, TxnId txnId, @Nullable RoutingKey homeKey, @Nullable String 
exhaustedMsg)
+    {
+        if (current == null) return new Exhausted(txnId, homeKey, 
exhaustedMsg);
+        if (isTimeout(current)) return new Timeout(txnId, homeKey);
+        Exhausted e = new Exhausted(txnId, homeKey, exhaustedMsg);
+        e.initCause(current);
+        return e;
+    }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/FetchCoordinator.java 
b/accord-core/src/main/java/accord/coordinate/FetchCoordinator.java
index 9b9ec06a..b33192aa 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchCoordinator.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchCoordinator.java
@@ -134,7 +134,7 @@ public abstract class FetchCoordinator
     // provided to us, and manages the safe-to-read state
     private final FetchRanges fetchRanges;
     private boolean isDone;
-    private Throwable failure;
+    private Throwable failures = null;
 
     final Map<Node.Id, State> stateMap = new HashMap<>();
     final List<State> states = new ArrayList<>();
@@ -204,10 +204,8 @@ public abstract class FetchCoordinator
 
         // some portion of the range is completely unavailable
         isDone = true;
-        Exhausted exhausted = new Exhausted(syncPoint.syncId, null, "No more 
nodes to contact for " + needed);
-        if (failure != null) exhausted.addSuppressed(failure);
-        failure = exhausted;
-        onDone(success, failure);
+        failures = FailureAccumulator.createFailure(failures, 
syncPoint.syncId, null, "No more nodes to contact for " + needed);
+        onDone(success, failures);
     }
 
     protected void exhausted(Ranges exhausted)
@@ -310,8 +308,7 @@ public abstract class FetchCoordinator
         if (isDone)
             return;
 
-        if (this.failure == null) this.failure = failure;
-        else this.failure.addSuppressed(failure);
+        failures = FailureAccumulator.append(failures, failure);
 
         unavailable(to, ranges);
     }
@@ -320,8 +317,7 @@ public abstract class FetchCoordinator
         if (isDone)
             return;
 
-        if (this.failure == null) this.failure = failure;
-        else this.failure.addSuppressed(failure);
+        failures = FailureAccumulator.append(failures, failure);
 
         State state = stateMap.get(to);
 
diff --git 
a/accord-core/src/test/java/accord/coordinate/FailureAccumulatorTest.java 
b/accord-core/src/test/java/accord/coordinate/FailureAccumulatorTest.java
new file mode 100644
index 00000000..d3e82fe9
--- /dev/null
+++ b/accord-core/src/test/java/accord/coordinate/FailureAccumulatorTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class FailureAccumulatorTest
+{
+    @Test
+    public void allTimeout()
+    {
+        Throwable accum = null;
+        for (int i = 0; i < 3; i++)
+            accum = FailureAccumulator.append(accum, new Timeout(null, null));
+        assertThat(accum).isInstanceOf(Timeout.class);
+        assertThat(accum.getSuppressed()).hasSize(2);
+        for (Throwable t : accum.getSuppressed())
+            assertThat(t).isInstanceOf(Timeout.class);
+    }
+
+    @Test
+    public void firstTimeout()
+    {
+        testNonTimeout(true);
+    }
+
+    @Test
+    public void firstNotTimeout()
+    {
+        testNonTimeout(false);
+    }
+
+    private static void testNonTimeout(boolean firstTimeout)
+    {
+        Throwable accum = firstTimeout ? new Timeout(null, null) : new 
IllegalStateException();
+        accum = FailureAccumulator.append(accum, new IllegalStateException());
+        accum = FailureAccumulator.append(accum, new Timeout(null, null));
+
+        assertThat(accum).isInstanceOf(IllegalStateException.class);
+        assertThat(accum.getSuppressed()).hasSize(2);
+        Throwable[] sup = accum.getSuppressed();
+        assertThat(sup[0]).isInstanceOf(firstTimeout ? Timeout.class : 
IllegalStateException.class);
+        assertThat(sup[1]).isInstanceOf(Timeout.class);
+    }
+}
\ No newline at end of file
diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java 
b/accord-core/src/test/java/accord/impl/list/ListStore.java
index d4a67dfc..1a5865a3 100644
--- a/accord-core/src/test/java/accord/impl/list/ListStore.java
+++ b/accord-core/src/test/java/accord/impl/list/ListStore.java
@@ -333,7 +333,7 @@ public class ListStore implements DataStore
                     if (!success.isEmpty())
                         fetchCompletes.add(new FetchComplete(storeId, 
syncPoint, success));
                 }
-                delegate.fail(ranges, new Throwable());
+                delegate.fail(ranges, new Throwable("Failed Fetch", failure));
             }
         };
         ListFetchCoordinator coordinator = new ListFetchCoordinator(node, 
ranges, syncPoint, hook, safeStore.commandStore(), this);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to