This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 12000406c3 Follow-up to CASSANDRA-20906: Fix Simulator
12000406c3 is described below

commit 12000406c34748b6ce21d97bbcf5020c313c5672
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Thu Sep 18 13:49:28 2025 +0100

    Follow-up to CASSANDRA-20906: Fix Simulator
---
 modules/accord                                     |  2 +-
 .../cassandra/service/accord/AccordExecutor.java   |  4 +--
 .../service/accord/ImmediateAsyncExecutor.java     | 30 ++--------------------
 .../accord/interop/AccordInteropExecution.java     |  6 ++---
 .../service/accord/interop/AccordInteropRead.java  |  4 +--
 .../accord/interop/AccordInteropReadRepair.java    |  9 ++++---
 .../apache/cassandra/utils/concurrent/Future.java  |  2 +-
 .../distributed/impl/AbstractCluster.java          |  5 ++++
 .../systems/InterceptorOfGlobalMethods.java        |  2 +-
 .../SimulatedAccordCommandStoreTestBase.java       |  4 +--
 10 files changed, 24 insertions(+), 44 deletions(-)

diff --git a/modules/accord b/modules/accord
index 78b84b08e1..520818a004 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 78b84b08e13530722cb785a3b748fd2075c1c449
+Subproject commit 520818a004a89217cf86efa6c8fa2968401968ec
diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java 
b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
index 3f91882051..637b158177 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
@@ -40,8 +40,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.api.Agent;
-import accord.api.AsyncExecutor;
 import accord.api.RoutingKey;
+import accord.impl.AbstractAsyncExecutor;
 import accord.local.Command;
 import accord.local.PreLoadContext;
 import accord.local.SequentialAsyncExecutor;
@@ -95,7 +95,7 @@ import static 
org.apache.cassandra.service.accord.AccordTask.State.WAITING_TO_RU
  * NOTE: We assume that NO BLOCKING TASKS are submitted to this executor AND 
WAITED ON by another task executing on this executor.
  *  (as we do not immediately schedule additional threads for submitted tasks, 
but schedule new threads only if necessary when the submitting execution 
completes)
  */
-public abstract class AccordExecutor implements CacheSize, 
LoadExecutor<AccordTask<?>, Boolean>, SaveExecutor, Shutdownable, AsyncExecutor
+public abstract class AccordExecutor implements CacheSize, 
LoadExecutor<AccordTask<?>, Boolean>, SaveExecutor, Shutdownable, 
AbstractAsyncExecutor
 {
     private static final Logger logger = 
LoggerFactory.getLogger(AccordExecutor.class);
     public interface AccordExecutorFactory
diff --git 
a/src/java/org/apache/cassandra/service/accord/ImmediateAsyncExecutor.java 
b/src/java/org/apache/cassandra/service/accord/ImmediateAsyncExecutor.java
index 7626ee9b97..0f1e82a0d3 100644
--- a/src/java/org/apache/cassandra/service/accord/ImmediateAsyncExecutor.java
+++ b/src/java/org/apache/cassandra/service/accord/ImmediateAsyncExecutor.java
@@ -18,34 +18,15 @@
 
 package org.apache.cassandra.service.accord;
 
-import java.util.concurrent.Callable;
-import java.util.function.BiConsumer;
-
 import javax.annotation.Nonnull;
 
-import accord.api.AsyncExecutor;
-import accord.utils.async.AsyncChain;
-import accord.utils.async.AsyncChains;
+import accord.impl.AbstractAsyncExecutor;
 import org.apache.cassandra.service.accord.api.AccordAgent;
 
-public class ImmediateAsyncExecutor implements AsyncExecutor, 
BiConsumer<Object, Throwable>
+public class ImmediateAsyncExecutor implements AbstractAsyncExecutor
 {
     public static final ImmediateAsyncExecutor INSTANCE = new 
ImmediateAsyncExecutor();
 
-    @Override
-    public <T> AsyncChain<T> chain(Callable<T> call)
-    {
-        try
-        {
-            return AsyncChains.success(call.call());
-        }
-        catch (Throwable t)
-        {
-            AccordAgent.handleUncaughtException(t);
-            return AsyncChains.failure(t);
-        }
-    }
-
     @Override
     public void execute(@Nonnull Runnable command)
     {
@@ -58,11 +39,4 @@ public class ImmediateAsyncExecutor implements 
AsyncExecutor, BiConsumer<Object,
             AccordAgent.handleUncaughtException(t);
         }
     }
-
-    @Override
-    public void accept(Object o, Throwable throwable)
-    {
-        if (throwable != null)
-            AccordAgent.handleUncaughtException(throwable);
-    }
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
index 597294cb5b..6f6ff278d0 100644
--- 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
+++ 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
@@ -258,7 +258,7 @@ public class AccordInteropExecution implements 
ReadCoordinator
                              }
 
                              Group group = Group.one(command);
-                             
results.add(AsyncExecutor.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
+                             
results.add(AsyncChains.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
                                  TxnData result = new TxnData();
                                  // Enforcing limits is redundant since we 
only have a group of size 1, but checking anyways
                                  // documents the requirement here
@@ -294,7 +294,7 @@ public class AccordInteropExecution implements 
ReadCoordinator
 
                 // TODO (required): To make migration work we need to validate 
that the range is all on Accord
 
-                
results.add(AsyncExecutor.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
+                
results.add(AsyncChains.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
                     TxnData result = new TxnData();
                     try (PartitionIterator iterator = 
StorageProxy.getRangeSlice(command, consistencyLevel, this, requestTime))
                     {
@@ -396,7 +396,7 @@ public class AccordInteropExecution implements 
ReadCoordinator
 
     private AsyncChain<Data> executeUnrecoverableRepairUpdate()
     {
-        return AsyncExecutor.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
+        return AsyncChains.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
             UnrecoverableRepairUpdate repairUpdate = 
(UnrecoverableRepairUpdate)txn.update();
             // TODO (expected): We should send the read in the same message as 
the commit. This requires refactor ReadData.Kind so that it doesn't specify the 
ordinal encoding
             // and can be extended similar to MessageType which allows 
additional types not from Accord to be added
diff --git 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java
index 51f9360829..cb53c27a31 100644
--- 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java
+++ 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java
@@ -283,7 +283,7 @@ public class AccordInteropRead extends ReadData
                 return AsyncChains.success(new LocalReadData(new 
ArrayList<>(), readCommand));
 
             ReadCommand submit = 
readCommand.withTransactionalSettings(TxnNamedRead.readsWithoutReconciliation(txnRead.cassandraConsistencyLevel()),
 nowInSeconds);
-            return AsyncExecutor.chain(Stage.READ.executor(), () -> new 
LocalReadData(key, ReadCommandVerbHandler.instance.doRead(submit, false), 
command));
+            return AsyncChains.chain(Stage.READ.executor(), () -> new 
LocalReadData(key, ReadCommandVerbHandler.instance.doRead(submit, false), 
command));
         }
 
         // This path can have a subrange we have never seen before provided by 
short read protection or read repair so we need to
@@ -298,7 +298,7 @@ public class AccordInteropRead extends ReadData
                 continue;
             ReadCommand submit = 
TxnNamedRead.commandForSubrange((PartitionRangeReadCommand) command, 
intersection, txnRead.cassandraConsistencyLevel(), nowInSeconds);
             TokenKey routingKey = ((TokenRange)r).start();
-            chains.add(AsyncExecutor.chain(Stage.READ.executor(), () -> new 
LocalReadData(routingKey, ReadCommandVerbHandler.instance.doRead(submit, 
false), command)));
+            chains.add(AsyncChains.chain(Stage.READ.executor(), () -> new 
LocalReadData(routingKey, ReadCommandVerbHandler.instance.doRead(submit, 
false), command)));
         }
 
         if (chains.isEmpty())
diff --git 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java
 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java
index d080386ba0..1a661215cc 100644
--- 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java
+++ 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java
@@ -35,6 +35,7 @@ import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.topology.Topologies;
 import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadRepairVerbHandler;
 import org.apache.cassandra.db.TypeSizes;
@@ -145,10 +146,10 @@ public class AccordInteropReadRepair extends ReadData
     protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp 
executeAt, PartialTxn txn, Participants<?> execute)
     {
         // TODO (required): subtract unavailable ranges, either from read or 
from response (or on coordinator)
-        return AsyncExecutor.chain(Verb.READ_REPAIR_REQ.stage.executor(), () 
-> {
-                                          
ReadRepairVerbHandler.instance.applyMutation(mutation);
-                                          return Data.NOOP_DATA;
-                                      });
+        return AsyncChains.chain(Verb.READ_REPAIR_REQ.stage.executor(), () -> {
+            ReadRepairVerbHandler.instance.applyMutation(mutation);
+            return Data.NOOP_DATA;
+        });
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Future.java 
b/src/java/org/apache/cassandra/utils/concurrent/Future.java
index b8bf62ecb5..629c2f1093 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Future.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Future.java
@@ -42,7 +42,7 @@ import static 
org.apache.cassandra.utils.Shared.Scope.SIMULATION;
  * A Future that integrates several different (but equivalent) APIs used 
within Cassandra into a single concept,
  * integrating also with our {@link Awaitable} abstraction, to overall improve 
coherency and clarity in the codebase.
  */
-@Shared(scope = SIMULATION, ancestors = INTERFACES)
+@Shared(scope = SIMULATION, ancestors = INTERFACES, members = INTERFACES)
 public interface Future<V> extends io.netty.util.concurrent.Future<V>, 
ListenableFuture<V>, Awaitable, AsyncResult<V>
 {
     /**
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 6cd9c8e60a..805d15665a 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -1344,6 +1344,11 @@ public abstract class AbstractCluster<I extends 
IInstance> implements ICluster<I
             return;
 
         forEach.accept(cur);
+        {
+            Shared overrideShared = cur.getAnnotation(Shared.class);
+            if (overrideShared != null)
+                shared = new SharedParams(overrideShared);
+        }
 
         switch (shared.ancestors)
         {
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfGlobalMethods.java
 
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfGlobalMethods.java
index eca4ebb2b4..503c73b702 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfGlobalMethods.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfGlobalMethods.java
@@ -43,7 +43,7 @@ import static 
org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
 import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
 
 @SuppressWarnings("unused")
-@Shared(scope = SIMULATION, inner = INTERFACES)
+@Shared(scope = SIMULATION, ancestors = INTERFACES)
 public interface InterceptorOfGlobalMethods extends 
InterceptorOfSystemMethods, Closeable
 {
     Semaphore newSemaphore(int count);
diff --git 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
index 0fd249a3a5..6bf0e569da 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
@@ -32,7 +32,6 @@ import com.google.common.collect.Maps;
 import org.junit.Before;
 import org.junit.BeforeClass;
 
-import accord.api.AsyncExecutor;
 import accord.api.RoutingKey;
 import accord.impl.SizeOfIntersectionSorter;
 import accord.local.Node;
@@ -57,6 +56,7 @@ import accord.topology.Topologies;
 import accord.utils.Gen;
 import accord.utils.Gens;
 import accord.utils.Invariants;
+import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResult;
 import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.config.CassandraRelevantProperties;
@@ -286,7 +286,7 @@ public abstract class SimulatedAccordCommandStoreTestBase 
extends CQLTester
             assertDeps(success.txnId, success.deps, cloneKeyConflicts, 
cloneRangeConflicts);
             return success;
         });
-        var delay = preAcceptAsync.flatMap(ignore -> 
AsyncExecutor.chain(instance.unorderedScheduled, () -> {
+        var delay = preAcceptAsync.flatMap(ignore -> 
AsyncChains.chain(instance.unorderedScheduled, () -> {
             Ballot ballot = Ballot.fromValues(instance.storeService.epoch(), 
instance.storeService.now(), nodeId);
             return new BeginRecovery(nodeId, new 
Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, instance.topology), txnId, 
null, false, txn, route, ballot);
         }).beginAsResult());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to