This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new ff755a191b Accord: Need to simulate Cassandra Journal in Accord BurnTest to detect issues earlier before they are seen in Cassandra ff755a191b is described below commit ff755a191b3650c99d98a978de894561177b831f Author: David Capwell <dcapw...@apache.org> AuthorDate: Fri May 3 12:57:46 2024 -0700 Accord: Need to simulate Cassandra Journal in Accord BurnTest to detect issues earlier before they are seen in Cassandra patch by Benedict Elliott Smith, David Capwell; reviewed by Benedict Elliott Smith, David Capwell for CASSANDRA-19618 --- .gitmodules | 2 +- modules/accord | 2 +- .../org/apache/cassandra/config/AccordSpec.java | 60 ++++++ .../cassandra/config/DatabaseDescriptor.java | 5 + src/java/org/apache/cassandra/net/Verb.java | 8 +- .../cassandra/service/accord/AccordJournal.java | 124 +++++++----- .../cassandra/service/accord/AccordKeyspace.java | 12 +- .../cassandra/service/accord/AccordService.java | 7 +- .../service/accord/async/AsyncOperation.java | 4 +- .../service/accord/async/ExecutionOrder.java | 127 +++++++------ .../migration/ConsensusRequestRouter.java | 2 +- .../test/AccordJournalSimulationTest.java | 3 +- .../config/DatabaseDescriptorRefTest.java | 4 + .../compaction/CompactionAccordIteratorsTest.java | 8 +- .../cassandra/service/accord/AccordTestUtils.java | 3 +- .../cassandra/service/accord/MockJournal.java | 28 +++ .../accord/SimulatedAccordCommandStore.java | 13 +- .../accord/async/SimulatedAsyncOperationTest.java | 207 +++++++++++++++++++++ 18 files changed, 497 insertions(+), 122 deletions(-) diff --git a/.gitmodules b/.gitmodules index 60a9510e7a..616dacf610 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "modules/accord"] path = modules/accord - url = ../cassandra-accord.git + url = https://github.com/apache/cassandra-accord.git branch = trunk diff --git a/modules/accord b/modules/accord index 202e673583..256b35e27d 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 202e67358396a1e413e29498bea71047bd586d06 +Subproject commit 256b35e27d170db9fcd8024d5678b4f6e9d3a956 diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java b/src/java/org/apache/cassandra/config/AccordSpec.java index d6fb1a5011..b035b0b9b5 100644 --- a/src/java/org/apache/cassandra/config/AccordSpec.java +++ b/src/java/org/apache/cassandra/config/AccordSpec.java @@ -18,6 +18,8 @@ package org.apache.cassandra.config; +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.cassandra.journal.Params; import org.apache.cassandra.service.consensus.TransactionalMode; public class AccordSpec @@ -71,4 +73,62 @@ public class AccordSpec public TransactionalMode default_transactional_mode = TransactionalMode.off; public boolean ephemeralReadEnabled = false; public boolean state_cache_listener_jfr_enabled = true; + public final JournalSpec journal = new JournalSpec(); + + public static class JournalSpec implements Params + { + public int segmentSize = 32 << 20; + public FailurePolicy failurePolicy = FailurePolicy.STOP; + public FlushMode flushMode = FlushMode.BATCH; + public DurationSpec.IntMillisecondsBound flushPeriod; // pulls default from 'commitlog_sync_period' + public DurationSpec.IntMillisecondsBound periodicFlushLagBlock = new DurationSpec.IntMillisecondsBound("1500ms"); + + @Override + public int segmentSize() + { + return segmentSize; + } + + @Override + public FailurePolicy failurePolicy() + { + return failurePolicy; + } + + @Override + public FlushMode flushMode() + { + return flushMode; + } + + @JsonIgnore + @Override + public int flushPeriodMillis() + { + return flushPeriod == null ? DatabaseDescriptor.getCommitLogSyncPeriod() + : flushPeriod.toMilliseconds(); + } + + @JsonIgnore + @Override + public int periodicFlushLagBlock() + { + return periodicFlushLagBlock.toMilliseconds(); + } + + /** + * This is required by the journal, but we don't have multiple versions, so block it from showing up, so we don't need to worry about maintaining it + */ + @JsonIgnore + @Override + public int userVersion() + { + /* + * NOTE: when accord journal version gets bumped, expose it via yaml. + * This way operators can force previous version on upgrade, temporarily, + * to allow easier downgrades if something goes wrong. + */ + return 1; + } + } } diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 384872684b..285d7308b7 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -3302,6 +3302,11 @@ public class DatabaseDescriptor return conf.paxos_topology_repair_strict_each_quorum; } + public static AccordSpec getAccord() + { + return conf.accord; + } + public static AccordSpec.TransactionalRangeMigration getTransactionalRangeMigration() { return conf.accord.range_migration; diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java index 500ca80943..eb244f1bc6 100644 --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@ -344,15 +344,15 @@ public enum Verb ACCORD_SYNC_NOTIFY_REQ (151, P2, writeTimeout, IMMEDIATE, () -> Notification.listSerializer, () -> AccordSyncPropagator.verbHandler, ACCORD_SIMPLE_RSP ), - ACCORD_APPLY_AND_WAIT_REQ (152, P2, writeTimeout, IMMEDIATE, () -> ReadDataSerializers.readData, () -> AccordService.instance().verbHandler(), ACCORD_READ_RSP), + ACCORD_APPLY_AND_WAIT_REQ (152, P2, writeTimeout, IMMEDIATE, () -> ReadDataSerializers.readData, AccordService::verbHandlerOrNoop, ACCORD_READ_RSP), CONSENSUS_KEY_MIGRATION (153, P1, writeTimeout, MUTATION, () -> ConsensusKeyMigrationFinished.serializer,() -> ConsensusKeyMigrationState.consensusKeyMigrationFinishedHandler), ACCORD_INTEROP_READ_RSP (154, P2, writeTimeout, IMMEDIATE, () -> AccordInteropRead.replySerializer, RESPONSE_HANDLER), - ACCORD_INTEROP_READ_REQ (155, P2, writeTimeout, IMMEDIATE, () -> AccordInteropRead.requestSerializer, () -> AccordService.instance().verbHandler(), ACCORD_INTEROP_READ_RSP), - ACCORD_INTEROP_COMMIT_REQ (156, P2, writeTimeout, IMMEDIATE, () -> AccordInteropCommit.serializer, () -> AccordService.instance().verbHandler(), ACCORD_INTEROP_READ_RSP), + ACCORD_INTEROP_READ_REQ (155, P2, writeTimeout, IMMEDIATE, () -> AccordInteropRead.requestSerializer, AccordService::verbHandlerOrNoop, ACCORD_INTEROP_READ_RSP), + ACCORD_INTEROP_COMMIT_REQ (156, P2, writeTimeout, IMMEDIATE, () -> AccordInteropCommit.serializer, AccordService::verbHandlerOrNoop, ACCORD_INTEROP_READ_RSP), ACCORD_INTEROP_READ_REPAIR_RSP (157, P2, writeTimeout, IMMEDIATE, () -> AccordInteropReadRepair.replySerializer, RESPONSE_HANDLER), - ACCORD_INTEROP_READ_REPAIR_REQ (158, P2, writeTimeout, IMMEDIATE, () -> AccordInteropReadRepair.requestSerializer, () -> AccordService.instance().verbHandler(), ACCORD_INTEROP_READ_REPAIR_RSP), + ACCORD_INTEROP_READ_REPAIR_REQ (158, P2, writeTimeout, IMMEDIATE, () -> AccordInteropReadRepair.requestSerializer, AccordService::verbHandlerOrNoop, ACCORD_INTEROP_READ_REPAIR_RSP), ACCORD_INTEROP_APPLY_REQ (160, P2, writeTimeout, IMMEDIATE, () -> AccordInteropApply.serializer, AccordService::verbHandlerOrNoop, ACCORD_APPLY_RSP), // generic failure response diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index b659cf4733..ce90b26747 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -42,6 +42,7 @@ import com.google.common.collect.Multimap; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; +import accord.messages.ApplyThenWaitUntilApplied; import org.agrona.collections.Long2ObjectHashMap; import org.agrona.collections.LongArrayList; import org.agrona.collections.ObjectHashSet; @@ -156,50 +157,6 @@ public class AccordJournal implements IJournal, Shutdownable private static final ThreadLocal<byte[]> keyCRCBytes = ThreadLocal.withInitial(() -> new byte[21]); - static final Params PARAMS = new Params() - { - @Override - public int segmentSize() - { - return 32 << 20; - } - - @Override - public FailurePolicy failurePolicy() - { - return FailurePolicy.STOP; - } - - @Override - public FlushMode flushMode() - { - return FlushMode.BATCH; - } - - @Override - public int flushPeriodMillis() - { - return DatabaseDescriptor.getCommitLogSyncPeriod(); - } - - @Override - public int periodicFlushLagBlock() - { - return 1500; - } - - @Override - public int userVersion() - { - /* - * NOTE: when accord journal version gets bumped, expose it via yaml. - * This way operators can force previous version on upgrade, temporarily, - * to allow easier downgrades if something goes wrong. - */ - return 1; - } - }; - private final File directory; private final Journal<Key, Object> journal; private final AccordEndpointMapper endpointMapper; @@ -219,10 +176,10 @@ public class AccordJournal implements IJournal, Shutdownable private final FrameApplicator frameApplicator = new FrameApplicator(); @VisibleForTesting - public AccordJournal(AccordEndpointMapper endpointMapper) + public AccordJournal(AccordEndpointMapper endpointMapper, Params params) { this.directory = new File(DatabaseDescriptor.getAccordJournalDirectory()); - this.journal = new Journal<>("AccordJournal", directory, PARAMS, new JournalCallbacks(), Key.SUPPORT, RECORD_SERIALIZER); + this.journal = new Journal<>("AccordJournal", directory, params, new JournalCallbacks(), Key.SUPPORT, RECORD_SERIALIZER); this.endpointMapper = endpointMapper; } @@ -969,6 +926,22 @@ public class AccordJournal implements IJournal, Shutdownable } } msgTypeToSynonymousTypesMap = ImmutableListMultimap.copyOf(msgTypeToSynonymousTypes); + + //TODO (now): enable as this shows we are currently missing a message +// IllegalStateException e = null; +// for (MessageType t : MessageType.values) +// { +// if (!t.hasSideEffects()) continue; +// Type matches = msgTypeToTypeMap.get(t); +// if (matches == null) +// { +// IllegalStateException ise = new IllegalStateException("Missing MessageType " + t); +// if (e == null) e = ise; +// else e.addSuppressed(ise); +// } +// } +// if (e != null) +// throw e; } static Type fromId(int id) @@ -1164,7 +1137,7 @@ public class AccordJournal implements IJournal, Shutdownable while (null != (request = unframedRequests.poll())) { long waitForEpoch = request.waitForEpoch; - if (!node.topology().hasEpoch(waitForEpoch)) + if (waitForEpoch != 0 && !node.topology().hasEpoch(waitForEpoch)) { delayedRequests.computeIfAbsent(waitForEpoch, ignore -> new ArrayList<>()).add(request); if (!waitForEpochs.containsLong(waitForEpoch)) @@ -1394,6 +1367,12 @@ public class AccordJournal implements IJournal, Shutdownable this.txnId = txnId; } + @Override + public TxnId txnId() + { + return txnId; + } + @Override public Set<MessageType> test(Set<MessageType> messages) { @@ -1464,6 +1443,12 @@ public class AccordJournal implements IJournal, Shutdownable return readMessage(txnId, STABLE_FAST_PATH_REQ, Commit.class); } + @Override + public Commit stableSlowPath() + { + return readMessage(txnId, STABLE_SLOW_PATH_REQ, Commit.class); + } + @Override public Commit stableMaximal() { @@ -1493,6 +1478,18 @@ public class AccordJournal implements IJournal, Shutdownable { return readMessage(txnId, PROPAGATE_APPLY_MSG, Propagate.class); } + + @Override + public Propagate propagateOther() + { + return readMessage(txnId, PROPAGATE_OTHER_MSG, Propagate.class); + } + + @Override + public ApplyThenWaitUntilApplied applyThenWaitUntilApplied() + { + return readMessage(txnId, APPLY_THEN_WAIT_UNTIL_APPLIED_REQ, ApplyThenWaitUntilApplied.class); + } } private final class LoggingMessageProvider implements SerializerSupport.MessageProvider @@ -1506,6 +1503,12 @@ public class AccordJournal implements IJournal, Shutdownable this.provider = provider; } + @Override + public TxnId txnId() + { + return txnId; + } + @Override public Set<MessageType> test(Set<MessageType> messages) { @@ -1587,6 +1590,15 @@ public class AccordJournal implements IJournal, Shutdownable return commit; } + @Override + public Commit stableSlowPath() + { + logger.debug("Fetching {} message for {}", STABLE_SLOW_PATH_REQ, txnId); + Commit commit = provider.stableSlowPath(); + logger.debug("Fetched {} message for {}: {}", STABLE_SLOW_PATH_REQ, txnId, commit); + return commit; + } + @Override public Commit stableMaximal() { @@ -1631,5 +1643,23 @@ public class AccordJournal implements IJournal, Shutdownable logger.debug("Fetched {} message for {}: {}", PROPAGATE_APPLY_MSG, txnId, propagate); return propagate; } + + @Override + public Propagate propagateOther() + { + logger.debug("Fetching {} message for {}", PROPAGATE_OTHER_MSG, txnId); + Propagate propagate = provider.propagateOther(); + logger.debug("Fetched {} message for {}: {}", PROPAGATE_OTHER_MSG, txnId, propagate); + return propagate; + } + + @Override + public ApplyThenWaitUntilApplied applyThenWaitUntilApplied() + { + logger.debug("Fetching {} message for {}", APPLY_THEN_WAIT_UNTIL_APPLIED_REQ, txnId); + ApplyThenWaitUntilApplied apply = provider.applyThenWaitUntilApplied(); + logger.debug("Fetched {} message for {}: {}", APPLY_THEN_WAIT_UNTIL_APPLIED_REQ, txnId, apply); + return apply; + } } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index ee5c0ae619..ab6701d6d6 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -249,6 +249,7 @@ public class AccordKeyspace + format("execute_at %s,", TIMESTAMP_TUPLE) + format("promised_ballot %s,", TIMESTAMP_TUPLE) + format("accepted_ballot %s,", TIMESTAMP_TUPLE) + + format("execute_atleast %s,", TIMESTAMP_TUPLE) + "waiting_on blob," + "listeners set<blob>, " + "PRIMARY KEY((store_id, domain, txn_id))" @@ -297,6 +298,7 @@ public class AccordKeyspace public static final ColumnMetadata execute_at = getColumn(Commands, "execute_at"); static final ColumnMetadata promised_ballot = getColumn(Commands, "promised_ballot"); static final ColumnMetadata accepted_ballot = getColumn(Commands, "accepted_ballot"); + static final ColumnMetadata execute_atleast = getColumn(Commands, "execute_atleast"); static final ColumnMetadata waiting_on = getColumn(Commands, "waiting_on"); static final ColumnMetadata listeners = getColumn(Commands, "listeners"); @@ -856,6 +858,8 @@ public class AccordKeyspace addCellIfModified(CommandsColumns.execute_at, Command::executeAt, AccordKeyspace::serializeTimestamp, builder, timestampMicros, nowInSeconds, original, command); addCellIfModified(CommandsColumns.promised_ballot, Command::promised, AccordKeyspace::serializeTimestamp, builder, timestampMicros, nowInSeconds, original, command); addCellIfModified(CommandsColumns.accepted_ballot, Command::acceptedOrCommitted, AccordKeyspace::serializeTimestamp, builder, timestampMicros, nowInSeconds, original, command); + if (command.txnId().kind().awaitsOnlyDeps()) + addCellIfModified(CommandsColumns.execute_atleast, Command::executesAtLeast, AccordKeyspace::serializeTimestamp, builder, timestampMicros, nowInSeconds, original, command); if (command.isStable() && !command.isTruncated()) { @@ -1229,11 +1233,12 @@ public class AccordKeyspace Timestamp executeAt = deserializeExecuteAtOrNull(row); Ballot promised = deserializePromisedOrNull(row); Ballot accepted = deserializeAcceptedOrNull(row); + Timestamp executeAtLeast = status.is(Status.Truncated) && txnId.kind().awaitsOnlyDeps() ? deserializeExecuteAtLeastOrNull(row) : null; WaitingOnProvider waitingOn = deserializeWaitingOn(txnId, row); MessageProvider messages = commandStore.makeMessageProvider(txnId); - return SerializerSupport.reconstruct(commandStore.unsafeRangesForEpoch(), attrs, status, executeAt, promised, accepted, waitingOn, messages); + return SerializerSupport.reconstruct(commandStore.unsafeRangesForEpoch(), attrs, status, executeAt, executeAtLeast, promised, accepted, waitingOn, messages); } catch (Throwable t) { @@ -1306,6 +1311,11 @@ public class AccordKeyspace return deserializeTimestampOrNull(row, "execute_at", Timestamp::fromBits); } + public static Timestamp deserializeExecuteAtLeastOrNull(UntypedResultSet.Row row) + { + return deserializeTimestampOrNull(row, "execute_atleast", Timestamp::fromBits); + } + public static Ballot deserializePromisedOrNull(UntypedResultSet.Row row) { return deserializeTimestampOrNull(row.getBlob("promised_ballot"), Ballot::fromBits); diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 54f8c369ff..01331bb2d6 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -43,6 +43,7 @@ import accord.impl.CoordinateDurabilityScheduling; import accord.primitives.SyncPoint; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.cql3.statements.RequestValidations; +import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.service.accord.interop.AccordInteropAdapter.AccordInteropFactory; @@ -310,7 +311,7 @@ public class AccordService implements IAccordService, Shutdownable this.scheduler = new AccordScheduler(); this.dataStore = new AccordDataStore(); this.configuration = new AccordConfiguration(DatabaseDescriptor.getRawConfig()); - this.journal = new AccordJournal(configService); + this.journal = new AccordJournal(configService, DatabaseDescriptor.getAccord().journal); this.node = new Node(localId, messageSink, this::handleLocalRequest, @@ -441,7 +442,7 @@ public class AccordService implements IAccordService, Shutdownable private long doWithRetries(LongSupplier action, int retryAttempts, long initialBackoffMillis, long maxBackoffMillis) throws InterruptedException { // Since we could end up having the barrier transaction or the transaction it listens to invalidated - CoordinationFailed existingFailures = null; + RuntimeException existingFailures = null; Long success = null; long backoffMillis = 0; for (int attempt = 0; attempt < retryAttempts; attempt++) @@ -462,7 +463,7 @@ public class AccordService implements IAccordService, Shutdownable success = action.getAsLong(); break; } - catch (CoordinationFailed newFailures) + catch (RequestExecutionException | CoordinationFailed newFailures) { existingFailures = Throwables.merge(existingFailures, newFailures); } diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java index 51041aab58..72e673f48b 100644 --- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java +++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java @@ -215,7 +215,7 @@ public abstract class AsyncOperation<R> extends AsyncChains.Head<R> implements R commandStore.abortCurrentOperation(); case LOADING: context.releaseResources(commandStore); - commandStore.executionOrder().unregister(this); + commandStore.executionOrder().unregisterOutOfOrder(this); case INITIALIZED: break; // nothing to clean up, call callback } @@ -239,6 +239,8 @@ public abstract class AsyncOperation<R> extends AsyncChains.Head<R> implements R default: throw new IllegalStateException("Unexpected state " + state); case INITIALIZED: canRun = commandStore.executionOrder().register(this); + if (Invariants.isParanoid()) + Invariants.checkState(canRun.booleanValue() == commandStore.executionOrder().canRun(this), "Register of %s returned canRun=%s but canRun returned %s!", this, canRun, !canRun); state(LOADING); case LOADING: if (null == canRun) diff --git a/src/java/org/apache/cassandra/service/accord/async/ExecutionOrder.java b/src/java/org/apache/cassandra/service/accord/async/ExecutionOrder.java index 1ef3ff6c8d..19cf170966 100644 --- a/src/java/org/apache/cassandra/service/accord/async/ExecutionOrder.java +++ b/src/java/org/apache/cassandra/service/accord/async/ExecutionOrder.java @@ -21,6 +21,7 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.IdentityHashMap; import java.util.List; +import java.util.function.Consumer; import accord.api.Key; import accord.api.RoutingKey; @@ -82,31 +83,9 @@ public class ExecutionOrder } } - Conflicts remove(AsyncOperation<?> operation) + Conflicts remove(AsyncOperation<?> operation, boolean allowOutOfOrder) { - if (operationOrQueue instanceof AsyncOperation<?>) - { - Invariants.checkState(operationOrQueue == operation); - rangeQueues.remove(range); - } - else - { - @SuppressWarnings("unchecked") - ArrayDeque<AsyncOperation<?>> queue = (ArrayDeque<AsyncOperation<?>>) operationOrQueue; - AsyncOperation<?> head = queue.poll(); - Invariants.checkState(head == operation); - - if (queue.isEmpty()) - { - rangeQueues.remove(range); - } - else - { - head = queue.peek(); - if (canRun(head)) - head.onUnblocked(); - } - } + unregister("range", range, operationOrQueue, operation, allowOutOfOrder, () -> rangeQueues.remove(range)); return operationToConflicts.remove(operation); } @@ -182,20 +161,12 @@ public class ExecutionOrder result.rangeConflicts.add(e.getKey()); } RangeState state = e.getValue(); - Object operationOrQueue = state.operationOrQueue; - if (operationOrQueue instanceof AsyncOperation) - { - ArrayDeque<AsyncOperation<?>> queue = new ArrayDeque<>(4); - queue.add((AsyncOperation<?>) operationOrQueue); - queue.add(operation); - state.operationOrQueue = queue; - } - else - { - @SuppressWarnings("unchecked") - ArrayDeque<AsyncOperation<?>> queue = (ArrayDeque<AsyncOperation<?>>) operationOrQueue; - queue.add(operation); - } + // a single range could conflict with multiple other ranges, so it is possible that the operation + // exists in the queue already due to another range in the txn... simple example is + // keys = (0, 10], (12, 15] + // e.getKey() == (-100, 100] + // in this case the operation would attempt to double add since it has 2 keys that conflict with this single range + register(state.operationOrQueue, operation, q -> state.operationOrQueue = q); }); if (result.sameRange != null) { @@ -205,7 +176,7 @@ public class ExecutionOrder { rangeQueues.add(range, new RangeState(range, keyConflicts, result.rangeConflicts, operation)); } - return keyConflicts == null && result.rangeConflicts == null; + return keyConflicts == null && result.rangeConflicts == null && result.sameRange == null; } /** @@ -221,12 +192,19 @@ public class ExecutionOrder return true; } + register(operationOrQueue, operation, q -> queues.put(keyOrTxnId, q)); + return false; + } + + private void register(Object operationOrQueue, AsyncOperation<?> operation, Consumer<ArrayDeque<AsyncOperation<?>>> onCreateQueue) + { if (operationOrQueue instanceof AsyncOperation) { + Invariants.checkState(operationOrQueue != operation, "Attempted to double register operation %s", operation); ArrayDeque<AsyncOperation<?>> queue = new ArrayDeque<>(4); queue.add((AsyncOperation<?>) operationOrQueue); queue.add(operation); - queues.put(keyOrTxnId, queue); + onCreateQueue.accept(queue); } else { @@ -234,23 +212,35 @@ public class ExecutionOrder ArrayDeque<AsyncOperation<?>> queue = (ArrayDeque<AsyncOperation<?>>) operationOrQueue; queue.add(operation); } - return false; + } + + /** + * Unregister the operation as being a dependency for its keys and TxnIds, but do so even if it is unable to run now. + */ + void unregisterOutOfOrder(AsyncOperation<?> operation) + { + unregister(operation, true); } /** * Unregister the operation as being a dependency for its keys and TxnIds */ void unregister(AsyncOperation<?> operation) + { + unregister(operation, false); + } + + private void unregister(AsyncOperation<?> operation, boolean allowOutOfOrder) { for (Seekable seekable : operation.keys()) { switch (seekable.domain()) { case Key: - unregister(seekable.asKey(), operation); + unregister(seekable.asKey(), operation, allowOutOfOrder); break; case Range: - unregister(seekable.asRange(), operation); + unregister(seekable.asRange(), operation, allowOutOfOrder); break; default: throw new AssertionError("Unexpected domain: " + seekable.domain()); @@ -259,48 +249,69 @@ public class ExecutionOrder } TxnId primaryTxnId = operation.primaryTxnId(); if (null != primaryTxnId) - unregister(primaryTxnId, operation); + unregister(primaryTxnId, operation, allowOutOfOrder); } - private void unregister(Range range, AsyncOperation<?> operation) + private void unregister(Range range, AsyncOperation<?> operation, boolean allowOutOfOrder) { var state = state(range); - var conflicts = state.remove(operation); + var conflicts = state.remove(operation, allowOutOfOrder); if (conflicts.rangeConflicts != null) - conflicts.rangeConflicts.forEach(r -> state(r).remove(operation)); + conflicts.rangeConflicts.forEach(r -> state(r).remove(operation, allowOutOfOrder)); if (conflicts.keyConflicts != null) - conflicts.keyConflicts.forEach(k -> unregister(k, operation)); + conflicts.keyConflicts.forEach(k -> unregister(k, operation, allowOutOfOrder)); } /** * Unregister the operation as being a dependency for key or TxnId */ - private void unregister(Object keyOrTxnId, AsyncOperation<?> operation) + private void unregister(Object keyOrTxnId, AsyncOperation<?> operation, boolean allowOutOfOrder) { Object operationOrQueue = queues.get(keyOrTxnId); Invariants.nonNull(operationOrQueue); + unregister("Key or TxnId", keyOrTxnId, operationOrQueue, operation, allowOutOfOrder, () -> queues.remove(keyOrTxnId)); + } + + private void unregister(String name, Object key, Object operationOrQueue, AsyncOperation<?> operation, boolean allowOutOfOrder, Runnable onEmpty) + { if (operationOrQueue instanceof AsyncOperation<?>) { - Invariants.checkState(operationOrQueue == operation); - queues.remove(keyOrTxnId); + Invariants.checkState(operationOrQueue == operation, "Only single operation present and was not %s; %s %s", name, key); + onEmpty.run(); } else { @SuppressWarnings("unchecked") ArrayDeque<AsyncOperation<?>> queue = (ArrayDeque<AsyncOperation<?>>) operationOrQueue; - AsyncOperation<?> head = queue.poll(); - Invariants.checkState(head == operation); + if (allowOutOfOrder) + { + Invariants.checkState(queue.remove(operation), "Operation %s was not found in queue: %s; %s %s", operation, queue, name, key); + } + else + { + Invariants.checkState(queue.peek() == operation, "Operation %s is not at the top of the queue; %s; %s %s", operation, queue, name, key); + queue.poll(); + } if (queue.isEmpty()) { - queues.remove(keyOrTxnId); + onEmpty.run(); } else { - head = queue.peek(); - if (canRun(head)) - head.onUnblocked(); + AsyncOperation<?> next = queue.peek(); + if (next == operation) + { + // a single range could conflict with multiple other ranges, so it is possible that the operation + // exists in the queue already due to another range in the txn... simple example is + // keys = (0, 10], (12, 15] + // e.getKey() == (-100, 100] + // in this case the operation would attempt to double add since it has 2 keys that conflict with this single range + return; + } + if (canRun(next)) + next.onUnblocked(); } } } @@ -357,7 +368,7 @@ public class ExecutionOrder private RangeState state(Range range) { var list = rangeQueues.get(range); - assert list.size() == 1 : String.format("Expected 1 element but saw list %s", list); + assert list.size() == 1 : String.format("Expected 1 element for range %s but saw list %s", range, list); return list.get(0); } diff --git a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusRequestRouter.java b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusRequestRouter.java index fb6232c560..3c2422dfa4 100644 --- a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusRequestRouter.java +++ b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusRequestRouter.java @@ -120,7 +120,7 @@ public class ConsensusRequestRouter ClusterMetadata cm = ClusterMetadata.current(); TableMetadata metadata = cm.schema.getTableMetadata(tableId); if (metadata == null) - throw new IllegalStateException("Can't route consensus request for nonexistent table %s".format(tableId.toString())); + throw new IllegalStateException(String.format("Can't route consensus request for nonexistent table %s", tableId)); if (!mayWriteThroughAccord(metadata)) return false; diff --git a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java index cd98c71f18..22dd55bb31 100644 --- a/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java +++ b/test/simulator/test/org/apache/cassandra/simulator/test/AccordJournalSimulationTest.java @@ -26,6 +26,7 @@ import javax.annotation.Nullable; import com.google.common.collect.ImmutableMap; import accord.topology.TopologyUtils; +import org.apache.cassandra.config.AccordSpec; import org.apache.cassandra.schema.*; import org.junit.Ignore; import org.junit.Test; @@ -166,7 +167,7 @@ public class AccordJournalSimulationTest extends SimulationTestBase } } private static final ExecutorPlus executor = ExecutorFactory.Global.executorFactory().pooled("name", 10); - private static final AccordJournal journal = new AccordJournal(null); + private static final AccordJournal journal = new AccordJournal(null, new AccordSpec.JournalSpec()); private static final int events = 100; private static final CountDownLatch eventsWritten = CountDownLatch.newCountDownLatch(events); private static final CountDownLatch eventsDurable = CountDownLatch.newCountDownLatch(events); diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java index 7fbfc89cc8..fb47302904 100644 --- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java +++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java @@ -78,6 +78,7 @@ public class DatabaseDescriptorRefTest "org.apache.cassandra.auth.INetworkAuthorizer", "org.apache.cassandra.auth.IRoleManager", "org.apache.cassandra.config.AccordSpec", + "org.apache.cassandra.config.AccordSpec$JournalSpec", "org.apache.cassandra.config.AccordSpec$TransactionalRangeMigration", "org.apache.cassandra.config.CassandraRelevantProperties", "org.apache.cassandra.config.CassandraRelevantProperties$PropertyConverter", @@ -267,6 +268,9 @@ public class DatabaseDescriptorRefTest "org.apache.cassandra.io.util.PathUtils$IOToLongFunction", "org.apache.cassandra.io.util.RebufferingInputStream", "org.apache.cassandra.io.util.SpinningDiskOptimizationStrategy", + "org.apache.cassandra.journal.Params", + "org.apache.cassandra.journal.Params$FailurePolicy", + "org.apache.cassandra.journal.Params$FlushMode", "org.apache.cassandra.locator.Endpoint", "org.apache.cassandra.locator.IEndpointSnitch", "org.apache.cassandra.locator.InetAddressAndPort", diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java index ee38ffceb9..5cbf0915b5 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java @@ -91,6 +91,7 @@ import org.apache.cassandra.service.accord.api.PartitionKey; import org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; +import org.assertj.core.api.Assertions; import static accord.impl.TimestampsForKey.NO_LAST_EXECUTED_HLC; import static accord.local.KeyHistory.COMMANDS; @@ -369,9 +370,14 @@ public class CompactionAccordIteratorsTest assertEquals(1, Iterators.size(partition.unfilteredIterator())); ByteBuffer[] partitionKeyComponents = CommandRows.splitPartitionKey(partition.partitionKey()); Row row = (Row)partition.unfilteredIterator().next(); - assertEquals(commands.metadata().regularColumns().size(), row.columnCount()); + + // execute_atleast is null, so when we read from the scanner the column won't be present in the partition + Assertions.assertThat(new ArrayList<>(row.columns())).isEqualTo(commands.metadata().regularColumns().stream().filter(c -> !c.name.toString().equals("execute_atleast")).collect(Collectors.toList())); for (ColumnMetadata cm : commands.metadata().regularColumns()) + { + if (cm.name.toString().equals("execute_atleast")) continue; assertNotNull(row.getColumnData(cm)); + } assertEquals(TXN_ID, CommandRows.getTxnId(partitionKeyComponents)); assertEquals(SaveStatus.Applied, AccordKeyspace.CommandRows.getStatus(row)); }; diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java index a608f41f8d..0d3b3d005c 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java @@ -76,6 +76,7 @@ import org.apache.cassandra.concurrent.ExecutorPlus; import org.apache.cassandra.concurrent.ImmediateExecutor; import org.apache.cassandra.concurrent.ManualExecutor; import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.config.AccordSpec; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.statements.TransactionStatement; @@ -399,7 +400,7 @@ public class AccordTestUtils public long unix(TimeUnit timeUnit) { return NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, this::now).applyAsLong(timeUnit); } }; - AccordJournal journal = new AccordJournal(null); + AccordJournal journal = new AccordJournal(null, new AccordSpec.JournalSpec()); journal.start(null); SingleEpochRanges holder = new SingleEpochRanges(topology.rangesForNode(node)); diff --git a/test/unit/org/apache/cassandra/service/accord/MockJournal.java b/test/unit/org/apache/cassandra/service/accord/MockJournal.java index 8a68163ede..dc22f540ed 100644 --- a/test/unit/org/apache/cassandra/service/accord/MockJournal.java +++ b/test/unit/org/apache/cassandra/service/accord/MockJournal.java @@ -28,6 +28,7 @@ import com.google.common.collect.Sets; import accord.local.SerializerSupport; import accord.messages.Accept; import accord.messages.Apply; +import accord.messages.ApplyThenWaitUntilApplied; import accord.messages.BeginRecovery; import accord.messages.Commit; import accord.messages.Message; @@ -43,15 +44,18 @@ import org.apache.cassandra.service.accord.AccordJournal.Type; import static accord.messages.MessageType.ACCEPT_REQ; import static accord.messages.MessageType.APPLY_MAXIMAL_REQ; import static accord.messages.MessageType.APPLY_MINIMAL_REQ; +import static accord.messages.MessageType.APPLY_THEN_WAIT_UNTIL_APPLIED_REQ; import static accord.messages.MessageType.BEGIN_RECOVER_REQ; import static accord.messages.MessageType.COMMIT_MAXIMAL_REQ; import static accord.messages.MessageType.COMMIT_SLOW_PATH_REQ; import static accord.messages.MessageType.PRE_ACCEPT_REQ; import static accord.messages.MessageType.PROPAGATE_APPLY_MSG; +import static accord.messages.MessageType.PROPAGATE_OTHER_MSG; import static accord.messages.MessageType.PROPAGATE_PRE_ACCEPT_MSG; import static accord.messages.MessageType.PROPAGATE_STABLE_MSG; import static accord.messages.MessageType.STABLE_FAST_PATH_REQ; import static accord.messages.MessageType.STABLE_MAXIMAL_REQ; +import static accord.messages.MessageType.STABLE_SLOW_PATH_REQ; public class MockJournal implements IJournal { @@ -61,6 +65,12 @@ public class MockJournal implements IJournal { return new SerializerSupport.MessageProvider() { + @Override + public TxnId txnId() + { + return txnId; + } + @Override public Set<MessageType> test(Set<MessageType> messages) { @@ -146,6 +156,12 @@ public class MockJournal implements IJournal return get(STABLE_FAST_PATH_REQ); } + @Override + public Commit stableSlowPath() + { + return get(STABLE_SLOW_PATH_REQ); + } + @Override public Commit stableMaximal() { @@ -175,6 +191,18 @@ public class MockJournal implements IJournal { return get(PROPAGATE_APPLY_MSG); } + + @Override + public Propagate propagateOther() + { + return get(PROPAGATE_OTHER_MSG); + } + + @Override + public ApplyThenWaitUntilApplied applyThenWaitUntilApplied() + { + return get(APPLY_THEN_WAIT_UNTIL_APPLIED_REQ); + } }; } diff --git a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java index bf28d696c8..1a1b7f98d2 100644 --- a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java +++ b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java @@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.BooleanSupplier; import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.ToLongFunction; import accord.impl.SizeOfIntersectionSorter; @@ -76,7 +77,7 @@ import static org.apache.cassandra.db.ColumnFamilyStore.FlushReason.UNIT_TESTS; import static org.apache.cassandra.schema.SchemaConstants.ACCORD_KEYSPACE_NAME; import static org.apache.cassandra.utils.AccordGenerators.fromQT; -class SimulatedAccordCommandStore implements AutoCloseable +public class SimulatedAccordCommandStore implements AutoCloseable { private final List<Throwable> failures = new ArrayList<>(); private final SimulatedExecutorFactory globalExecutor; @@ -90,8 +91,9 @@ class SimulatedAccordCommandStore implements AutoCloseable public final MockJournal journal; public final ScheduledExecutorPlus unorderedScheduled; public final List<String> evictions = new ArrayList<>(); + public Predicate<Throwable> ignoreExceptions = ignore -> false; - SimulatedAccordCommandStore(RandomSource rs) + public SimulatedAccordCommandStore(RandomSource rs) { globalExecutor = new SimulatedExecutorFactory(rs.fork(), fromQT(Generators.TIMESTAMP_GEN.map(java.sql.Timestamp::getTime)).mapToLong(TimeUnit.MILLISECONDS::toNanos).next(rs), failures::add); this.unorderedScheduled = globalExecutor.scheduled("ignored"); @@ -151,6 +153,13 @@ class SimulatedAccordCommandStore implements AutoCloseable { return false; } + + @Override + public void onUncaughtException(Throwable t) + { + if (ignoreExceptions.test(t)) return; + super.onUncaughtException(t); + } }, null, ignore -> AccordTestUtils.NOOP_PROGRESS_LOG, diff --git a/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java b/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java new file mode 100644 index 0000000000..6e216ff56d --- /dev/null +++ b/test/unit/org/apache/cassandra/service/accord/async/SimulatedAsyncOperationTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord.async; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.BooleanSupplier; + +import org.junit.Before; +import org.junit.Test; + +import accord.api.Key; +import accord.impl.basic.SimulatedFault; +import accord.local.PreLoadContext; +import accord.local.SafeCommandStore; +import accord.primitives.Keys; +import accord.primitives.Range; +import accord.primitives.Ranges; +import accord.primitives.Seekables; +import accord.utils.Gen; +import accord.utils.Gens; +import accord.utils.RandomSource; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Murmur3Partitioner.LongToken; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.accord.AccordCommandStore; +import org.apache.cassandra.service.accord.AccordKeyspace; +import org.apache.cassandra.service.accord.SimulatedAccordCommandStore; +import org.apache.cassandra.service.accord.SimulatedAccordCommandStoreTestBase; +import org.apache.cassandra.service.accord.TokenRange; +import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey; +import org.apache.cassandra.service.accord.api.PartitionKey; +import org.assertj.core.api.Assertions; + +import static accord.utils.Property.qt; + +public class SimulatedAsyncOperationTest extends SimulatedAccordCommandStoreTestBase +{ + @Before + public void precondition() + { + Assertions.assertThat(intTbl.partitioner).isEqualTo(Murmur3Partitioner.instance); + } + + @Test + public void happyPath() + { + qt().withExamples(100).check(rs -> test(rs, 100, intTbl, ignore -> Action.SUCCESS)); + } + + @Test + public void fuzz() + { + Gen<Action> actionGen = Gens.enums().allWithWeights(Action.class, 10, 1, 1); + qt().withExamples(100).check(rs -> test(rs, 100, intTbl, actionGen)); + } + + private static void test(RandomSource rs, int numSamples, TableMetadata tbl, Gen<Action> actionGen) throws Exception + { + AccordKeyspace.unsafeClear(); + + int numKeys = rs.nextInt(20, 1000); + long minToken = 0; + long maxToken = numKeys; + + Gen<Key> keyGen = Gens.longs().between(minToken + 1, maxToken).map(t -> new PartitionKey(tbl.id, tbl.partitioner.decorateKey(LongToken.keyForToken(t)))); + + + Gen<Keys> keysGen = Gens.lists(keyGen).unique().ofSizeBetween(1, 10).map(l -> Keys.of(l)); + Gen<Ranges> rangesGen = Gens.lists(rangeInsideRange(tbl.id, minToken, maxToken)).uniqueBestEffort().ofSizeBetween(1, 10).map(l -> Ranges.of(l.toArray(Range[]::new))); + Gen<Seekables<?, ?>> seekablesGen = Gens.oneOf(keysGen, rangesGen); + + try (var instance = new SimulatedAccordCommandStore(rs)) + { + instance.ignoreExceptions = t -> t instanceof SimulatedFault; + Counter counter = new Counter(); + for (int i = 0; i < numSamples; i++) + { + PreLoadContext ctx = PreLoadContext.contextFor(seekablesGen.next(rs)); + operation(instance, ctx, actionGen.next(rs), rs::nextBoolean).begin((ignore, failure) -> { + counter.counter++; + if (failure != null && !(failure instanceof SimulatedFault)) throw new AssertionError("Unexpected error", failure); + }); + } + instance.processAll(); + Assertions.assertThat(counter.counter).isEqualTo(numSamples); + } + } + + private static Gen<Range> rangeInsideRange(TableId tableId, long minToken, long maxToken) + { + if (minToken + 1 == maxToken) + { + // only one range is possible... + return Gens.constant(range(tableId, minToken, maxToken)); + } + return rs -> { + long a = rs.nextLong(minToken, maxToken + 1); + long b = rs.nextLong(minToken, maxToken + 1); + while (a == b) + b = rs.nextLong(minToken, maxToken + 1); + if (a > b) + { + long tmp = a; + a = b; + b = tmp; + } + return range(tableId, a, b); + }; + } + + private static TokenRange range(TableId tableId, long start, long end) + { + return new TokenRange(new TokenKey(tableId, new LongToken(start)), new TokenKey(tableId, new LongToken(end))); + } + + private enum Action {SUCCESS, FAILURE, LOAD_FAILURE} + + private static AsyncOperation<Void> operation(SimulatedAccordCommandStore instance, PreLoadContext ctx, Action action, BooleanSupplier delay) + { + return new SimulatedOperation(instance.store, ctx, action == Action.FAILURE ? SimulatedOperation.Action.FAILURE : SimulatedOperation.Action.SUCCESS) + { + @Override + AsyncLoader createAsyncLoader(AccordCommandStore commandStore, PreLoadContext preLoadContext) + { + return new SimulatedLoader(action == SimulatedAsyncOperationTest.Action.LOAD_FAILURE ? SimulatedLoader.Action.FAILURE : SimulatedLoader.Action.SUCCESS, delay.getAsBoolean(), instance.unorderedScheduled); + } + }; + } + + private static class Counter + { + int counter = 0; + } + + private static class SimulatedOperation extends AsyncOperation<Void> + { + enum Action { SUCCESS, FAILURE} + private final Action action; + + public SimulatedOperation(AccordCommandStore commandStore, PreLoadContext preLoadContext, Action action) + { + super(commandStore, preLoadContext); + this.action = action; + } + + @Override + public Void apply(SafeCommandStore safe) + { + if (action == Action.FAILURE) + throw new SimulatedFault("Operation failed for keys " + keys()); + return null; + } + } + + private static class SimulatedLoader extends AsyncLoader + { + + enum Action { SUCCESS, FAILURE} + + private final Action action; + private boolean delay; + private final ScheduledExecutorService executor; + SimulatedLoader(Action action, boolean delay, ScheduledExecutorService executor) + { + super(null, null, null, null); + this.action = action; + this.delay = delay; + this.executor = executor; + } + + @Override + public boolean load(AsyncOperation.Context context, BiConsumer<Object, Throwable> callback) + { + if (delay) + { + executor.schedule(() -> { + callback.accept(null, action == Action.FAILURE ? new SimulatedFault("Failure loading " + context) : null); + }, 1, TimeUnit.SECONDS); + delay = false; + return false; + } + if (action == Action.FAILURE) + throw new SimulatedFault("Failure loading " + context); + + return true; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org