This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 12000406c3 Follow-up to CASSANDRA-20906: Fix Simulator
12000406c3 is described below
commit 12000406c34748b6ce21d97bbcf5020c313c5672
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Thu Sep 18 13:49:28 2025 +0100
Follow-up to CASSANDRA-20906: Fix Simulator
---
modules/accord | 2 +-
.../cassandra/service/accord/AccordExecutor.java | 4 +--
.../service/accord/ImmediateAsyncExecutor.java | 30 ++--------------------
.../accord/interop/AccordInteropExecution.java | 6 ++---
.../service/accord/interop/AccordInteropRead.java | 4 +--
.../accord/interop/AccordInteropReadRepair.java | 9 ++++---
.../apache/cassandra/utils/concurrent/Future.java | 2 +-
.../distributed/impl/AbstractCluster.java | 5 ++++
.../systems/InterceptorOfGlobalMethods.java | 2 +-
.../SimulatedAccordCommandStoreTestBase.java | 4 +--
10 files changed, 24 insertions(+), 44 deletions(-)
diff --git a/modules/accord b/modules/accord
index 78b84b08e1..520818a004 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 78b84b08e13530722cb785a3b748fd2075c1c449
+Subproject commit 520818a004a89217cf86efa6c8fa2968401968ec
diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
index 3f91882051..637b158177 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
@@ -40,8 +40,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.api.Agent;
-import accord.api.AsyncExecutor;
import accord.api.RoutingKey;
+import accord.impl.AbstractAsyncExecutor;
import accord.local.Command;
import accord.local.PreLoadContext;
import accord.local.SequentialAsyncExecutor;
@@ -95,7 +95,7 @@ import static
org.apache.cassandra.service.accord.AccordTask.State.WAITING_TO_RU
* NOTE: We assume that NO BLOCKING TASKS are submitted to this executor AND
WAITED ON by another task executing on this executor.
* (as we do not immediately schedule additional threads for submitted tasks,
but schedule new threads only if necessary when the submitting execution
completes)
*/
-public abstract class AccordExecutor implements CacheSize,
LoadExecutor<AccordTask<?>, Boolean>, SaveExecutor, Shutdownable, AsyncExecutor
+public abstract class AccordExecutor implements CacheSize,
LoadExecutor<AccordTask<?>, Boolean>, SaveExecutor, Shutdownable,
AbstractAsyncExecutor
{
private static final Logger logger =
LoggerFactory.getLogger(AccordExecutor.class);
public interface AccordExecutorFactory
diff --git
a/src/java/org/apache/cassandra/service/accord/ImmediateAsyncExecutor.java
b/src/java/org/apache/cassandra/service/accord/ImmediateAsyncExecutor.java
index 7626ee9b97..0f1e82a0d3 100644
--- a/src/java/org/apache/cassandra/service/accord/ImmediateAsyncExecutor.java
+++ b/src/java/org/apache/cassandra/service/accord/ImmediateAsyncExecutor.java
@@ -18,34 +18,15 @@
package org.apache.cassandra.service.accord;
-import java.util.concurrent.Callable;
-import java.util.function.BiConsumer;
-
import javax.annotation.Nonnull;
-import accord.api.AsyncExecutor;
-import accord.utils.async.AsyncChain;
-import accord.utils.async.AsyncChains;
+import accord.impl.AbstractAsyncExecutor;
import org.apache.cassandra.service.accord.api.AccordAgent;
-public class ImmediateAsyncExecutor implements AsyncExecutor,
BiConsumer<Object, Throwable>
+public class ImmediateAsyncExecutor implements AbstractAsyncExecutor
{
public static final ImmediateAsyncExecutor INSTANCE = new
ImmediateAsyncExecutor();
- @Override
- public <T> AsyncChain<T> chain(Callable<T> call)
- {
- try
- {
- return AsyncChains.success(call.call());
- }
- catch (Throwable t)
- {
- AccordAgent.handleUncaughtException(t);
- return AsyncChains.failure(t);
- }
- }
-
@Override
public void execute(@Nonnull Runnable command)
{
@@ -58,11 +39,4 @@ public class ImmediateAsyncExecutor implements
AsyncExecutor, BiConsumer<Object,
AccordAgent.handleUncaughtException(t);
}
}
-
- @Override
- public void accept(Object o, Throwable throwable)
- {
- if (throwable != null)
- AccordAgent.handleUncaughtException(throwable);
- }
}
diff --git
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
index 597294cb5b..6f6ff278d0 100644
---
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
+++
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropExecution.java
@@ -258,7 +258,7 @@ public class AccordInteropExecution implements
ReadCoordinator
}
Group group = Group.one(command);
-
results.add(AsyncExecutor.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
+
results.add(AsyncChains.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
TxnData result = new TxnData();
// Enforcing limits is redundant since we
only have a group of size 1, but checking anyways
// documents the requirement here
@@ -294,7 +294,7 @@ public class AccordInteropExecution implements
ReadCoordinator
// TODO (required): To make migration work we need to validate
that the range is all on Accord
-
results.add(AsyncExecutor.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
+
results.add(AsyncChains.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
TxnData result = new TxnData();
try (PartitionIterator iterator =
StorageProxy.getRangeSlice(command, consistencyLevel, this, requestTime))
{
@@ -396,7 +396,7 @@ public class AccordInteropExecution implements
ReadCoordinator
private AsyncChain<Data> executeUnrecoverableRepairUpdate()
{
- return AsyncExecutor.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
+ return AsyncChains.chain(Stage.ACCORD_MIGRATION.executor(), () -> {
UnrecoverableRepairUpdate repairUpdate =
(UnrecoverableRepairUpdate)txn.update();
// TODO (expected): We should send the read in the same message as
the commit. This requires refactor ReadData.Kind so that it doesn't specify the
ordinal encoding
// and can be extended similar to MessageType which allows
additional types not from Accord to be added
diff --git
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java
index 51f9360829..cb53c27a31 100644
---
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java
+++
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java
@@ -283,7 +283,7 @@ public class AccordInteropRead extends ReadData
return AsyncChains.success(new LocalReadData(new
ArrayList<>(), readCommand));
ReadCommand submit =
readCommand.withTransactionalSettings(TxnNamedRead.readsWithoutReconciliation(txnRead.cassandraConsistencyLevel()),
nowInSeconds);
- return AsyncExecutor.chain(Stage.READ.executor(), () -> new
LocalReadData(key, ReadCommandVerbHandler.instance.doRead(submit, false),
command));
+ return AsyncChains.chain(Stage.READ.executor(), () -> new
LocalReadData(key, ReadCommandVerbHandler.instance.doRead(submit, false),
command));
}
// This path can have a subrange we have never seen before provided by
short read protection or read repair so we need to
@@ -298,7 +298,7 @@ public class AccordInteropRead extends ReadData
continue;
ReadCommand submit =
TxnNamedRead.commandForSubrange((PartitionRangeReadCommand) command,
intersection, txnRead.cassandraConsistencyLevel(), nowInSeconds);
TokenKey routingKey = ((TokenRange)r).start();
- chains.add(AsyncExecutor.chain(Stage.READ.executor(), () -> new
LocalReadData(routingKey, ReadCommandVerbHandler.instance.doRead(submit,
false), command)));
+ chains.add(AsyncChains.chain(Stage.READ.executor(), () -> new
LocalReadData(routingKey, ReadCommandVerbHandler.instance.doRead(submit,
false), command)));
}
if (chains.isEmpty())
diff --git
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java
index d080386ba0..1a661215cc 100644
---
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java
+++
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java
@@ -35,6 +35,7 @@ import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.topology.Topologies;
import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.ReadRepairVerbHandler;
import org.apache.cassandra.db.TypeSizes;
@@ -145,10 +146,10 @@ public class AccordInteropReadRepair extends ReadData
protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp
executeAt, PartialTxn txn, Participants<?> execute)
{
// TODO (required): subtract unavailable ranges, either from read or
from response (or on coordinator)
- return AsyncExecutor.chain(Verb.READ_REPAIR_REQ.stage.executor(), ()
-> {
-
ReadRepairVerbHandler.instance.applyMutation(mutation);
- return Data.NOOP_DATA;
- });
+ return AsyncChains.chain(Verb.READ_REPAIR_REQ.stage.executor(), () -> {
+ ReadRepairVerbHandler.instance.applyMutation(mutation);
+ return Data.NOOP_DATA;
+ });
}
@Override
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Future.java
b/src/java/org/apache/cassandra/utils/concurrent/Future.java
index b8bf62ecb5..629c2f1093 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Future.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Future.java
@@ -42,7 +42,7 @@ import static
org.apache.cassandra.utils.Shared.Scope.SIMULATION;
* A Future that integrates several different (but equivalent) APIs used
within Cassandra into a single concept,
* integrating also with our {@link Awaitable} abstraction, to overall improve
coherency and clarity in the codebase.
*/
-@Shared(scope = SIMULATION, ancestors = INTERFACES)
+@Shared(scope = SIMULATION, ancestors = INTERFACES, members = INTERFACES)
public interface Future<V> extends io.netty.util.concurrent.Future<V>,
ListenableFuture<V>, Awaitable, AsyncResult<V>
{
/**
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 6cd9c8e60a..805d15665a 100644
---
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -1344,6 +1344,11 @@ public abstract class AbstractCluster<I extends
IInstance> implements ICluster<I
return;
forEach.accept(cur);
+ {
+ Shared overrideShared = cur.getAnnotation(Shared.class);
+ if (overrideShared != null)
+ shared = new SharedParams(overrideShared);
+ }
switch (shared.ancestors)
{
diff --git
a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfGlobalMethods.java
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfGlobalMethods.java
index eca4ebb2b4..503c73b702 100644
---
a/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfGlobalMethods.java
+++
b/test/simulator/main/org/apache/cassandra/simulator/systems/InterceptorOfGlobalMethods.java
@@ -43,7 +43,7 @@ import static
org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
@SuppressWarnings("unused")
-@Shared(scope = SIMULATION, inner = INTERFACES)
+@Shared(scope = SIMULATION, ancestors = INTERFACES)
public interface InterceptorOfGlobalMethods extends
InterceptorOfSystemMethods, Closeable
{
Semaphore newSemaphore(int count);
diff --git
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
index 0fd249a3a5..6bf0e569da 100644
---
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
+++
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
@@ -32,7 +32,6 @@ import com.google.common.collect.Maps;
import org.junit.Before;
import org.junit.BeforeClass;
-import accord.api.AsyncExecutor;
import accord.api.RoutingKey;
import accord.impl.SizeOfIntersectionSorter;
import accord.local.Node;
@@ -57,6 +56,7 @@ import accord.topology.Topologies;
import accord.utils.Gen;
import accord.utils.Gens;
import accord.utils.Invariants;
+import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.config.CassandraRelevantProperties;
@@ -286,7 +286,7 @@ public abstract class SimulatedAccordCommandStoreTestBase
extends CQLTester
assertDeps(success.txnId, success.deps, cloneKeyConflicts,
cloneRangeConflicts);
return success;
});
- var delay = preAcceptAsync.flatMap(ignore ->
AsyncExecutor.chain(instance.unorderedScheduled, () -> {
+ var delay = preAcceptAsync.flatMap(ignore ->
AsyncChains.chain(instance.unorderedScheduled, () -> {
Ballot ballot = Ballot.fromValues(instance.storeService.epoch(),
instance.storeService.now(), nodeId);
return new BeginRecovery(nodeId, new
Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, instance.topology), txnId,
null, false, txn, route, ballot);
}).beginAsResult());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]