This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch rebootstrap in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit d5db52d44264d11419512d99c72718c03235f4c6 Author: Alex Petrov <[email protected]> AuthorDate: Thu Jun 26 17:55:11 2025 +0200 Accord: Add Rebootstrap and unsafe Bootstrap To support recovering a node that has lost some of its local transaction log, introduce rebootstrap and unsafe bootstrap modes, where Accord ensures no responses are produced for transactions the node cannot be certain it had not previously answered. patch by Benedict; reviewed by Aleksey Yeschenko for CASSANDRA-20908 --- .gitmodules | 4 +- modules/accord | 2 +- .../cassandra/db/virtual/AccordDebugKeyspace.java | 26 +-- .../cassandra/metrics/AccordSystemMetrics.java | 2 +- .../cassandra/service/ActiveRepairService.java | 3 +- src/java/org/apache/cassandra/service/Rebuild.java | 3 +- .../apache/cassandra/service/StorageService.java | 32 +++- .../service/accord/AccordCommandStores.java | 25 +-- .../service/accord/AccordConfigurationService.java | 4 +- .../cassandra/service/accord/AccordDataStore.java | 28 ++- .../cassandra/service/accord/AccordJournal.java | 3 +- .../cassandra/service/accord/AccordService.java | 34 ++-- .../cassandra/service/accord/AccordTopology.java | 3 +- .../cassandra/service/accord/IAccordService.java | 19 +- .../cassandra/service/accord/api/AccordAgent.java | 9 +- .../service/accord/api/AccordViolationHandler.java | 10 +- .../interop/AccordInteropStableThenRead.java | 4 +- .../accord/serializers/AcceptSerializers.java | 4 +- .../accord/serializers/AwaitSerializers.java | 4 +- .../serializers/BeginInvalidationSerializers.java | 4 +- .../accord/serializers/CheckStatusSerializers.java | 8 +- .../serializers/CommandStoreSerializers.java | 13 +- .../accord/serializers/CommitSerializers.java | 8 +- .../accord/serializers/RecoverySerializers.java | 2 +- .../accord/serializers/TxnRequestSerializer.java | 6 +- .../cassandra/tcm/sequences/BootstrapAndJoin.java | 3 +- .../cassandra/tcm/sequences/DropAccordTable.java | 3 +- .../org/apache/cassandra/tcm/sequences/Move.java | 3 +- .../cassandra/distributed/shared/ClusterUtils.java | 3 +- .../cassandra/distributed/test/TestBaseImpl.java | 9 +- .../test/accord/AccordBootstrapTest.java | 195 ++++++++------------- .../test/accord/AccordIncrementalRepairTest.java | 12 +- .../test/accord/AccordIntegrationTest.java | 2 +- .../distributed/test/accord/AccordTestBase.java | 2 +- .../cassandra/fuzz/topology/AccordBounceTest.java | 8 +- .../service/accord/AccordJournalBurnTest.java | 6 +- .../db/virtual/AccordDebugKeyspaceTest.java | 9 +- .../cassandra/index/accord/RouteIndexTest.java | 4 +- .../service/accord/AccordCommandTest.java | 2 +- .../service/accord/AccordSyncPropagatorTest.java | 6 +- .../accord/SimulatedAccordCommandStore.java | 10 +- .../service/accord/SimulatedAccordTaskTest.java | 4 +- .../serializers/CommandsForKeySerializerTest.java | 6 +- .../apache/cassandra/utils/AccordGenerators.java | 8 +- 44 files changed, 269 insertions(+), 286 deletions(-) diff --git a/.gitmodules b/.gitmodules index 616dacf610..417bf7102c 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "modules/accord"] path = modules/accord - url = https://github.com/apache/cassandra-accord.git - branch = trunk + url = https://github.com/belliottsmith/cassandra-accord.git + branch = rebootstrap diff --git a/modules/accord b/modules/accord index 78b84b08e1..036c4ff97b 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 78b84b08e13530722cb785a3b748fd2075c1c449 +Subproject commit 036c4ff97bbcc538a3e652b5fae2bf53b17341bd diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java index 87029aa94f..87c69688eb 100644 --- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java +++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java @@ -47,6 +47,7 @@ import accord.coordinate.Coordination; import accord.coordinate.Coordinations; import accord.coordinate.PrepareRecovery; import accord.coordinate.tracking.AbstractTracker; +import accord.primitives.RoutingKeys; import accord.utils.SortedListMap; import org.apache.cassandra.cql3.Operator; import org.apache.cassandra.db.EmptyIterators; @@ -74,8 +75,6 @@ import accord.local.CommandStores; import accord.local.CommandStores.LatentStoreSelector; import accord.local.Commands; import accord.local.DurableBefore; -import accord.local.LoadKeys; -import accord.local.LoadKeysFor; import accord.local.MaxConflicts; import accord.local.Node; import accord.local.PreLoadContext; @@ -143,8 +142,9 @@ import static accord.local.RedundantStatus.Property.LOCALLY_DURABLE_TO_DATA_STOR import static accord.local.RedundantStatus.Property.LOCALLY_REDUNDANT; import static accord.local.RedundantStatus.Property.LOCALLY_SYNCED; import static accord.local.RedundantStatus.Property.LOCALLY_WITNESSED; +import static accord.local.RedundantStatus.Property.LOG_UNAVAILABLE; import static accord.local.RedundantStatus.Property.QUORUM_APPLIED; -import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP; +import static accord.local.RedundantStatus.Property.UNREADY; import static accord.local.RedundantStatus.Property.SHARD_APPLIED; import static com.google.common.collect.ImmutableList.toImmutableList; import static java.lang.String.format; @@ -365,9 +365,8 @@ public class AccordDebugKeyspace extends VirtualKeyspace TokenKey key = TokenKey.parse(keyStr, DatabaseDescriptor.getPartitioner()); List<Entry> cfks = new CopyOnWriteArrayList<>(); - PreLoadContext context = PreLoadContext.contextFor(key, LoadKeys.SYNC, LoadKeysFor.READ_WRITE, "commands_for_key table query"); CommandStores commandStores = AccordService.instance().node().commandStores(); - AccordService.getBlocking(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> { + AccordService.getBlocking(commandStores.forEach("commands_for_key table query", RoutingKeys.of(key), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> { SafeCommandsForKey safeCfk = safeStore.get(key); CommandsForKey cfk = safeCfk.current(); if (cfk == null) @@ -475,9 +474,8 @@ public class AccordDebugKeyspace extends VirtualKeyspace TokenKey key = TokenKey.parse(keyStr, DatabaseDescriptor.getPartitioner()); List<Entry> cfks = new CopyOnWriteArrayList<>(); - PreLoadContext context = PreLoadContext.contextFor(key, LoadKeys.SYNC, LoadKeysFor.READ_WRITE, "commands_for_key_unmanaged table query"); CommandStores commandStores = AccordService.instance().node().commandStores(); - AccordService.getBlocking(commandStores.forEach(context, key, Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> { + AccordService.getBlocking(commandStores.forEach("commands_for_key_unmanaged table query", RoutingKeys.of(key), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> { SafeCommandsForKey safeCfk = safeStore.get(key); CommandsForKey cfk = safeCfk.current(); if (cfk == null) @@ -888,8 +886,9 @@ public class AccordDebugKeyspace extends VirtualKeyspace " locally_redundant 'TxnIdUtf8Type',\n" + " locally_synced 'TxnIdUtf8Type',\n" + " locally_witnessed 'TxnIdUtf8Type',\n" + - " pre_bootstrap 'TxnIdUtf8Type',\n" + - " stale_until_at_least 'TxnIdUtf8Type',\n" + + " log_unavailable 'TxnIdUtf8Type',\n" + + " unready 'TxnIdUtf8Type',\n" + + " stale_until 'TxnIdUtf8Type',\n" + " PRIMARY KEY (keyspace_name, table_name, table_id, command_store_id, token_start)" + ')', UTF8Type.instance)); } @@ -923,8 +922,9 @@ public class AccordDebugKeyspace extends VirtualKeyspace .column("locally_redundant", entry.maxBound(LOCALLY_REDUNDANT).toString()) .column("locally_synced", entry.maxBound(LOCALLY_SYNCED).toString()) .column("locally_witnessed", entry.maxBound(LOCALLY_WITNESSED).toString()) - .column("pre_bootstrap", entry.maxBound(PRE_BOOTSTRAP).toString()) - .column("stale_until_at_least", entry.staleUntilAtLeast != null ? entry.staleUntilAtLeast.toString() : null); + .column("log_unavailable", entry.maxBound(LOG_UNAVAILABLE).toString()) + .column("unready", entry.maxBound(UNREADY).toString()) + .column("stale_until", entry.staleUntilAtLeast != null ? entry.staleUntilAtLeast.toString() : null); return ds; }, dataSet, @@ -1188,7 +1188,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace TxnId txnId = TxnId.parse(txnIdStr); List<Entry> commands = new CopyOnWriteArrayList<>(); - AccordService.instance().node().commandStores().forEachCommandStore(store -> { + AccordService.instance().node().commandStores().forAllUnsafe(store -> { Command command = ((AccordCommandStore)store).loadCommand(txnId); if (command != null) commands.add(new Entry(store.id(), command)); @@ -1293,7 +1293,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace TxnId txnId = TxnId.parse(txnIdStr); List<Entry> entries = new ArrayList<>(); - AccordService.instance().node().commandStores().forEachCommandStore(store -> { + AccordService.instance().node().commandStores().forAllUnsafe(store -> { for (AccordJournal.DebugEntry e : ((AccordCommandStore)store).debugCommand(txnId)) entries.add(new Entry(store.id(), e.segment, e.position, e.builder)); }); diff --git a/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java b/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java index a47fff2302..c9f8130225 100644 --- a/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java +++ b/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java @@ -175,7 +175,7 @@ public class AccordSystemMetrics int nowSeconds = (int) (Clock.Global.currentTimeMillis() / 1000); SnapshotBuilder builder = new SnapshotBuilder(); - service.node().commandStores().forEachCommandStore(commandStore -> { + service.node().commandStores().forAllUnsafe(commandStore -> { DefaultProgressLog.ImmutableView view = ((DefaultProgressLog)commandStore.unsafeProgressLog()).immutableView(); builder.progressLogActive += view.activeCount(); builder.progressLogSize.increment(view.size()); diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index c297ab1912..27e50157a3 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -582,8 +582,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai { throw new IllegalArgumentException(String.format("Requested range %s intersects a local range (%s) " + "but is not fully contained in one; this would lead to " + - "imprecise repair. keyspace: %s", toRepair.toString(), - range.toString(), keyspaceName)); + "imprecise repair. keyspace: %s", toRepair, range, keyspaceName)); } } if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet)) diff --git a/src/java/org/apache/cassandra/service/Rebuild.java b/src/java/org/apache/cassandra/service/Rebuild.java index c7d40f08bf..56ad1f47ba 100644 --- a/src/java/org/apache/cassandra/service/Rebuild.java +++ b/src/java/org/apache/cassandra/service/Rebuild.java @@ -33,6 +33,7 @@ import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.api.ConfigurationService.EpochReady; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Range; @@ -160,7 +161,7 @@ public class Rebuild StreamResultFuture streamResult = streamer.fetchAsync(); - Future<?> accordReady = AccordService.instance().epochReadyFor(metadata); + Future<?> accordReady = AccordService.instance().epochReadyFor(metadata, EpochReady::reads); Future<?> ready = FutureCombiner.allOf(streamResult, accordReady); // wait for result diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 8d861618a5..84a61f6b26 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -71,9 +71,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; - -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.repair.autorepair.AutoRepair; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,6 +131,7 @@ import org.apache.cassandra.index.IndexStatusManager; import org.apache.cassandra.io.sstable.IScrubber; import org.apache.cassandra.io.sstable.IVerifier; import org.apache.cassandra.io.sstable.SSTableLoader; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.File; import org.apache.cassandra.io.util.FileUtils; @@ -158,7 +156,9 @@ import org.apache.cassandra.metrics.SamplingManager; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.RepairCoordinator; +import org.apache.cassandra.repair.RepairParallelism; import org.apache.cassandra.repair.SharedContext; +import org.apache.cassandra.repair.autorepair.AutoRepair; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; import org.apache.cassandra.schema.KeyspaceMetadata; @@ -183,6 +183,7 @@ import org.apache.cassandra.service.paxos.PaxosState; import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupLocalCoordinator; import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState; import org.apache.cassandra.service.snapshot.SnapshotManager; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.StreamManager; import org.apache.cassandra.streaming.StreamResultFuture; import org.apache.cassandra.streaming.StreamState; @@ -206,9 +207,9 @@ import org.apache.cassandra.tcm.sequences.BootstrapAndJoin; import org.apache.cassandra.tcm.sequences.BootstrapAndReplace; import org.apache.cassandra.tcm.sequences.InProgressSequences; import org.apache.cassandra.tcm.sequences.SingleNodeSequences; +import org.apache.cassandra.tcm.transformations.AlterTopology; import org.apache.cassandra.tcm.transformations.Assassinate; import org.apache.cassandra.tcm.transformations.CancelInProgressSequence; -import org.apache.cassandra.tcm.transformations.AlterTopology; import org.apache.cassandra.tcm.transformations.Register; import org.apache.cassandra.tcm.transformations.Startup; import org.apache.cassandra.tcm.transformations.Unregister; @@ -3145,6 +3146,29 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return new FutureTask<>(task); } + public RepairCoordinator repairAccordKeyspace(String keyspace, Collection<Range<Token>> ranges) + { + int cmd = nextRepairCommand.incrementAndGet(); + RepairOption options = new RepairOption(RepairParallelism.PARALLEL, // parallelism + false, // primaryRange + false, // incremental + false, // trace + 5, // jobThreads + ranges, // ranges + true, // pullRepair + true, // forceRepair + PreviewKind.NONE, // previewKind + false, // optimiseStreams + true, // ignoreUnreplicatedKeyspaces + true, // repairData + false, // repairPaxos + true, // dontPurgeTombstones + false // repairAccord + ); + + return new RepairCoordinator(this, cmd, options, keyspace); + } + private void tryRepairPaxosForTopologyChange(String reason) { try diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java index cbf1fe49dc..3a43357a73 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java @@ -37,9 +37,9 @@ import accord.utils.RandomSource; import org.apache.cassandra.cache.CacheSize; import org.apache.cassandra.config.AccordSpec.QueueShardModel; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.accord.AccordExecutor.AccordExecutorFactory; -import org.apache.cassandra.service.accord.api.TokenKey; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.NodeState; import static org.apache.cassandra.config.AccordSpec.QueueShardModel.THREAD_PER_SHARD; import static org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueShardCount; @@ -113,12 +113,12 @@ public class AccordCommandStores extends CommandStores implements CacheSize } @Override - protected boolean shouldBootstrap(Node node, Topology previous, Topology updated, Range range) + protected BootstrapRangeAction shouldBootstrap(Node node, Topology prevGlobal, Topology newLocal, Range range) { - if (!super.shouldBootstrap(node, previous, updated, range)) - return false; - // we see new ranges when a new keyspace is added, so avoid bootstrap in these cases - return contains(previous, ((TokenKey) range.start()).table()); + if (NodeState.isBootstrap(ClusterMetadata.current().myNodeState())) + return BootstrapRangeAction.UNSAFE_BOOTSTRAP; + + return super.shouldBootstrap(node, prevGlobal, newLocal, range); } @Override @@ -128,17 +128,6 @@ public class AccordCommandStores extends CommandStores implements CacheSize return executors[idx].newSequentialExecutor(); } - private static boolean contains(Topology previous, TableId searchTable) - { - for (Range range : previous.ranges()) - { - TableId table = ((TokenKey) range.start()).table(); - if (table.equals(searchTable)) - return true; - } - return false; - } - public synchronized void setCapacity(long bytes) { cacheSize = bytes; diff --git a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java index 5dc0d19d09..fca06894ed 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java @@ -113,7 +113,7 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc @Nullable AsyncResult<Void> reads() { - return reads; + return ready.reads; } AsyncResult.Settable<Void> localSyncNotified() @@ -449,7 +449,7 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc } @Override - protected void localSyncComplete(Topology topology, boolean startSync) + protected void onReadyToCoordinate(Topology topology, boolean startSync) { long epoch = topology.epoch(); EpochState epochState = getOrCreateEpochState(epoch); diff --git a/src/java/org/apache/cassandra/service/accord/AccordDataStore.java b/src/java/org/apache/cassandra/service/accord/AccordDataStore.java index 0234184a38..7934aebf76 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordDataStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordDataStore.java @@ -30,6 +30,7 @@ import accord.local.cfk.CommandsForKey; import accord.primitives.Range; import accord.primitives.Ranges; import accord.primitives.SyncPoint; +import accord.utils.UnhandledEnum; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.memtable.Memtable; import org.apache.cassandra.schema.Schema; @@ -40,14 +41,6 @@ public class AccordDataStore implements DataStore private static final Logger logger = LoggerFactory.getLogger(AccordDataStore.class); enum FlushListenerKey { KEY } - @Override - public FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback) - { - AccordFetchCoordinator coordinator = new AccordFetchCoordinator(node, ranges, syncPoint, callback, safeStore.commandStore()); - coordinator.start(); - return coordinator.result(); - } - /** * Ensures data for the intersecting ranges is flushed to sstable before calling back with reportOnSuccess. * This is used to gate journal cleanup, since we skip the CommitLog for applying to the data table. @@ -95,4 +88,23 @@ public class AccordDataStore implements DataStore prev = cfs; } } + + @Override + public FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback, FetchKind kind) + { + switch (kind) + { + default: throw new UnhandledEnum(kind); + case Image: + { + AccordFetchCoordinator coordinator = new AccordFetchCoordinator(node, ranges, syncPoint, callback, safeStore.commandStore()); + coordinator.start(); + return coordinator.result(); + } + case Sync: + { + throw new UnsupportedOperationException(); + } + } + } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index 045acea8df..cfa224ab49 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -585,7 +585,7 @@ public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier @SuppressWarnings("unchecked") @Override - public void replay(CommandStores commandStores) + public boolean replay(CommandStores commandStores) { // TODO (expected): make the parallelisms configurable // Replay is performed in parallel, where at most X commands can be in flight, accross at most Y commands stores. @@ -716,6 +716,7 @@ public class AccordJournal implements accord.api.Journal, RangeSearcher.Supplier ++cur; } + return true; } catch (Throwable t) { diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 058b9171ac..34b7dd28c4 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -39,6 +39,7 @@ import javax.annotation.concurrent.GuardedBy; import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; +import accord.api.ConfigurationService.EpochReady; import org.apache.cassandra.metrics.AccordReplicaMetrics; import org.apache.cassandra.service.accord.api.AccordViolationHandler; import org.apache.cassandra.utils.Clock; @@ -327,7 +328,7 @@ public class AccordService implements IAccordService, Shutdownable } @VisibleForTesting - public static void replayJournal(AccordService as) + public static boolean replayJournal(AccordService as) { logger.info("Starting journal replay."); long before = Clock.Global.nanoTime(); @@ -337,12 +338,12 @@ public class AccordService implements IAccordService, Shutdownable if (as.journalConfiguration().replayMode() == RESET) AccordKeyspace.truncateCommandsForKey(); - as.node.commandStores().forEachCommandStore(cs -> cs.unsafeProgressLog().stop()); + as.node.commandStores().forAllUnsafe(cs -> cs.unsafeProgressLog().stop()); as.journal().replay(as.node().commandStores()); logger.info("Waiting for command stores to quiesce."); ((AccordCommandStores)as.node.commandStores()).waitForQuiescense(); as.journal.unsafeSetStarted(); - as.node.commandStores().forEachCommandStore(cs -> cs.unsafeProgressLog().start()); + as.node.commandStores().forAllUnsafe(cs -> cs.unsafeProgressLog().start()); } finally { @@ -351,14 +352,7 @@ public class AccordService implements IAccordService, Shutdownable long after = Clock.Global.nanoTime(); logger.info("Finished journal replay. {}ms elapsed", NANOSECONDS.toMillis(after - before)); - } - - public static void shutdownServiceAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException - { - IAccordService i = instance; - if (i == null) - return; - i.shutdownAndWait(timeout, unit); + return true; } @Override @@ -565,7 +559,7 @@ public class AccordService implements IAccordService, Shutdownable if (keys.size() != 1) return syncInternal(minBound, keys, syncLocal, syncRemote); - return KeyBarriers.find(node, minBound, keys.get(0).toUnseekable(), syncLocal, syncRemote) + return KeyBarriers.find(node, minBound, keys.get(0).toUnseekable(), syncLocal, syncRemote).chain() .flatMap(found -> KeyBarriers.await(node, node.someSequentialExecutor(), found, syncLocal, syncRemote)) .flatMap(success -> { if (success) @@ -799,8 +793,8 @@ public class AccordService implements IAccordService, Shutdownable } Ready ready = new Ready(); AccordCommandStores commandStores = (AccordCommandStores) node.commandStores(); - getBlocking(commandStores.forEach((PreLoadContext.Empty)() -> "Flush Caches", safeStore -> { - AccordCommandStore commandStore = (AccordCommandStore)safeStore.commandStore(); + commandStores.forAllUnsafe(unsafeStore -> { + AccordCommandStore commandStore = (AccordCommandStore)unsafeStore; try (AccordCommandStore.ExclusiveCaches caches = commandStore.lockCaches()) { caches.commandsForKeys().forEach(entry -> { @@ -811,7 +805,7 @@ public class AccordService implements IAccordService, Shutdownable } }); } - })); + }); ready.decrement(); AsyncPromise<Void> result = new AsyncPromise<>(); ready.invoke((success, fail) -> { @@ -1037,18 +1031,18 @@ public class AccordService implements IAccordService, Shutdownable } @Override - public Future<Void> epochReady(Epoch epoch) + public Future<Void> epochReady(Epoch epoch, Function<EpochReady, AsyncResult<Void>> get) { - return toFuture(configService.epochReady(epoch.getEpoch())); + return toFuture(configService.epochReady(epoch.getEpoch(), get)); } @Override - public Future<Void> epochReadyFor(ClusterMetadata metadata) + public Future<Void> epochReadyFor(ClusterMetadata metadata, Function<EpochReady, AsyncResult<Void>> get) { if (!metadata.schema.hasAccordKeyspaces()) return EPOCH_READY; - return epochReady(metadata.epoch); + return epochReady(metadata.epoch, get); } @Override @@ -1116,7 +1110,7 @@ public class AccordService implements IAccordService, Shutdownable public AccordCompactionInfos getCompactionInfo() { AccordCompactionInfos compactionInfos = new AccordCompactionInfos(node.durableBefore(), node.topology().minEpoch()); - node.commandStores().forEachCommandStore(commandStore -> { + node.commandStores().forAllUnsafe(commandStore -> { compactionInfos.put(commandStore.id(), ((AccordCommandStore)commandStore).getCompactionInfo()); }); return compactionInfos; diff --git a/src/java/org/apache/cassandra/service/accord/AccordTopology.java b/src/java/org/apache/cassandra/service/accord/AccordTopology.java index ca817e647d..8c60ecae5e 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordTopology.java +++ b/src/java/org/apache/cassandra/service/accord/AccordTopology.java @@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import accord.api.ConfigurationService.EpochReady; import accord.local.Node; import accord.local.Node.Id; import accord.primitives.Ranges; @@ -399,7 +400,7 @@ public class AccordTopology { ClusterMetadataService.instance().fetchLogFromCMS(epoch); IAccordService service = AccordService.instance(); - service.epochReady(epoch).get(service.agent().expireEpochWait(MILLISECONDS), MILLISECONDS); + service.epochReady(epoch, EpochReady::reads).get(service.agent().expireEpochWait(MILLISECONDS), MILLISECONDS); } catch (InterruptedException e) { diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java b/src/java/org/apache/cassandra/service/accord/IAccordService.java index 34453d22c3..422155449b 100644 --- a/src/java/org/apache/cassandra/service/accord/IAccordService.java +++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java @@ -25,9 +25,11 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; +import java.util.function.Function; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import accord.api.ConfigurationService.EpochReady; import accord.utils.async.AsyncResult; import org.apache.cassandra.tcm.ClusterMetadata; import org.slf4j.Logger; @@ -127,9 +129,8 @@ public interface IAccordService * Return a future that will complete once the accord has completed it's local bootstrap process * for any ranges gained in the given epoch */ - Future<Void> epochReady(Epoch epoch); - - Future<Void> epochReadyFor(ClusterMetadata epoch); + Future<Void> epochReady(Epoch epoch, Function<EpochReady, AsyncResult<Void>> f); + Future<Void> epochReadyFor(ClusterMetadata epoch, Function<EpochReady, AsyncResult<Void>> f); void receive(Message<AccordSyncPropagator.Notification> message); @@ -308,13 +309,13 @@ public interface IAccordService } @Override - public Future<Void> epochReady(Epoch epoch) + public Future<Void> epochReady(Epoch epoch, Function<EpochReady, AsyncResult<Void>> get) { return BOOTSTRAP_SUCCESS; } @Override - public Future<Void> epochReadyFor(ClusterMetadata epoch) + public Future<Void> epochReadyFor(ClusterMetadata epoch, Function<EpochReady, AsyncResult<Void>> get) { return BOOTSTRAP_SUCCESS; } @@ -515,15 +516,15 @@ public interface IAccordService } @Override - public Future<Void> epochReady(Epoch epoch) + public Future<Void> epochReady(Epoch epoch, Function<EpochReady, AsyncResult<Void>> get) { - return delegate.epochReady(epoch); + return delegate.epochReady(epoch, get); } @Override - public Future<Void> epochReadyFor(ClusterMetadata epoch) + public Future<Void> epochReadyFor(ClusterMetadata epoch, Function<EpochReady, AsyncResult<Void>> get) { - return delegate.epochReadyFor(epoch); + return delegate.epochReadyFor(epoch, get); } @Override diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java index be7153fc2c..e09a565c41 100644 --- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java +++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import accord.api.Agent; import accord.api.CoordinatorEventListener; +import accord.api.OwnershipEventListener; import accord.api.ReplicaEventListener; import accord.api.ProgressLog.BlockedUntil; import accord.api.RoutingKey; @@ -93,7 +94,7 @@ import static org.apache.cassandra.service.accord.api.AccordWaitStrategies.slowR import static org.apache.cassandra.utils.Clock.Global.nanoTime; // TODO (expected): merge with AccordService -public class AccordAgent implements Agent +public class AccordAgent implements Agent, OwnershipEventListener { private static final Logger logger = LoggerFactory.getLogger(AccordAgent.class); private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1L, MINUTES); @@ -125,6 +126,12 @@ public class AccordAgent implements Agent return tracing.trace(txnId, eventType); } + @Override + public OwnershipEventListener ownershipEvents() + { + return this; + } + public void setNodeId(Node.Id id) { self = id; diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordViolationHandler.java b/src/java/org/apache/cassandra/service/accord/api/AccordViolationHandler.java index 67ec0111a8..c402789329 100644 --- a/src/java/org/apache/cassandra/service/accord/api/AccordViolationHandler.java +++ b/src/java/org/apache/cassandra/service/accord/api/AccordViolationHandler.java @@ -18,6 +18,8 @@ package org.apache.cassandra.service.accord.api; +import javax.annotation.Nullable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,8 +31,6 @@ import accord.primitives.Route; import accord.primitives.Timestamp; import accord.primitives.TxnId; -import static accord.utils.Invariants.illegalState; - public class AccordViolationHandler implements ViolationHandler { private static final Logger logger = LoggerFactory.getLogger(AccordViolationHandler.class); @@ -40,13 +40,11 @@ public class AccordViolationHandler implements ViolationHandler ViolationHandlerHolder.set(AccordViolationHandler::new); } - @Override - public void onTimestampViolation(SafeCommandStore safeStore, Command command, Participants<?> otherParticipants, Route<?> otherRoute, Timestamp otherExecuteAt) + public void onTimestampViolation(@Nullable SafeCommandStore safeStore, Command command, Participants<?> otherParticipants, @Nullable Route<?> otherRoute, Timestamp otherExecuteAt) { - throw illegalState(ViolationHandler.timestampViolationMessage(safeStore, command, otherParticipants, otherRoute, otherExecuteAt)); + logger.error(ViolationHandler.timestampViolationMessage(safeStore, command, otherParticipants, otherRoute, otherExecuteAt)); } - @Override public void onDependencyViolation(Participants<?> participants, TxnId notWitnessed, Timestamp notWitnessedExecuteAt, TxnId by, Timestamp byExecuteAt) { logger.error(ViolationHandler.dependencyViolationMessage(participants, notWitnessed, notWitnessedExecuteAt, by, byExecuteAt)); diff --git a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropStableThenRead.java b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropStableThenRead.java index 5dcbb4552f..468f5ff413 100644 --- a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropStableThenRead.java +++ b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropStableThenRead.java @@ -144,13 +144,13 @@ public class AccordInteropStableThenRead extends AccordInteropRead } @Override - public CommitOrReadNack apply(SafeCommandStore safeStore) + public CommitOrReadNack applyInternal(SafeCommandStore safeStore) { Route<?> route = this.route == null ? (Route)scope : this.route; StoreParticipants participants = StoreParticipants.execute(safeStore, route, txnId, minEpoch(), executeAtEpoch); SafeCommand safeCommand = safeStore.get(txnId, participants); Commands.commit(safeStore, safeCommand, participants, kind.saveStatus, Ballot.ZERO, txnId, route, partialTxn, executeAt, partialDeps, kind); - return super.apply(safeStore, safeCommand, participants); + return super.applyInternal(safeStore, safeCommand, participants); } @Override diff --git a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java index cd96796142..53391cd322 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java @@ -88,7 +88,7 @@ public class AcceptSerializers CommandSerializers.status.serialize(invalidate.status, out); CommandSerializers.ballot.serialize(invalidate.ballot, out); CommandSerializers.txnId.serialize(invalidate.txnId, out); - KeySerializers.participants.serialize(invalidate.participants, out); + KeySerializers.participants.serialize(invalidate.scope, out); } @Override @@ -106,7 +106,7 @@ public class AcceptSerializers return CommandSerializers.status.serializedSize(invalidate.status) + CommandSerializers.ballot.serializedSize(invalidate.ballot) + CommandSerializers.txnId.serializedSize(invalidate.txnId) - + KeySerializers.participants.serializedSize(invalidate.participants); + + KeySerializers.participants.serializedSize(invalidate.scope); } }; diff --git a/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java index af67c05b04..0e017184fb 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java @@ -122,7 +122,7 @@ public class AwaitSerializers public void serialize(AsyncAwaitComplete ok, DataOutputPlus out) throws IOException { CommandSerializers.txnId.serialize(ok.txnId, out); - KeySerializers.route.serialize(ok.route, out); + KeySerializers.route.serialize(ok.scope, out); out.writeByte(ok.newStatus.ordinal()); out.writeUnsignedVInt32(ok.callbackId); } @@ -141,7 +141,7 @@ public class AwaitSerializers public long serializedSize(AsyncAwaitComplete ok) { return CommandSerializers.txnId.serializedSize(ok.txnId) - + KeySerializers.route.serializedSize(ok.route) + + KeySerializers.route.serializedSize(ok.scope) + TypeSizes.BYTE_SIZE + VIntCoding.computeVIntSize(ok.callbackId); } diff --git a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java index 25a40a6a41..231c932878 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java @@ -39,7 +39,7 @@ public class BeginInvalidationSerializers public void serialize(BeginInvalidation begin, DataOutputPlus out) throws IOException { CommandSerializers.txnId.serialize(begin.txnId, out); - KeySerializers.participants.serialize(begin.participants, out); + KeySerializers.participants.serialize(begin.scope, out); CommandSerializers.ballot.serialize(begin.ballot, out); } @@ -55,7 +55,7 @@ public class BeginInvalidationSerializers public long serializedSize(BeginInvalidation begin) { return CommandSerializers.txnId.serializedSize(begin.txnId) - + KeySerializers.participants.serializedSize(begin.participants) + + KeySerializers.participants.serializedSize(begin.scope) + CommandSerializers.ballot.serializedSize(begin.ballot); } }; diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java index d1f81512b9..9747d9f72f 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java @@ -128,8 +128,8 @@ public class CheckStatusSerializers public void serialize(CheckStatus check, DataOutputPlus out) throws IOException { CommandSerializers.txnId.serialize(check.txnId, out); - KeySerializers.participants.serialize(check.query, out); - out.writeUnsignedVInt(check.sourceEpoch); + KeySerializers.participants.serialize(check.scope, out); + out.writeUnsignedVInt(check.waitForEpoch); out.writeByte(check.includeInfo.ordinal()); CommandSerializers.ballot.serialize(check.bumpBallot, out); } @@ -149,8 +149,8 @@ public class CheckStatusSerializers public long serializedSize(CheckStatus check) { return CommandSerializers.txnId.serializedSize(check.txnId) - + KeySerializers.participants.serializedSize(check.query) - + TypeSizes.sizeofUnsignedVInt(check.sourceEpoch) + + KeySerializers.participants.serializedSize(check.scope) + + TypeSizes.sizeofUnsignedVInt(check.waitForEpoch) + TypeSizes.BYTE_SIZE + CommandSerializers.ballot.serializedSize(check.bumpBallot); } diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java index a1cb244b13..a30ea59270 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java @@ -155,11 +155,18 @@ public class CommandStoreSerializers } for (int i = 0 ; i < b.bounds.length ; ++i) { - out.writeShort(b.status(i * 2)); - out.writeShort(b.status(i * 2 + 1)); + out.writeShort(cast(b.status(i * 2))); + out.writeShort(cast(b.status(i * 2 + 1))); } } + private short cast(long v) + { + if ((v & ~0xFFFF) != 0) + throw new IllegalStateException("Cannot serialize RedundantStatus larger than 0xFFFF. Requires serialization version bump."); + return (short)v; + } + @Override public RedundantBefore.Bounds deserialize(DataInputPlus in) throws IOException { @@ -174,7 +181,7 @@ public class CommandStoreSerializers TxnId[] bounds = new TxnId[count]; for (int i = 0 ; i < bounds.length ; ++i) bounds[i] = CommandSerializers.txnId.deserialize(in); - short[] statuses = new short[count * 2]; + int[] statuses = new int[count * 2]; for (int i = 0 ; i < statuses.length ; ++i) statuses[i] = in.readShort(); diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java index 31d879ce7c..2d2592eee8 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java @@ -52,9 +52,9 @@ public class CommitSerializers kind.serialize(msg.kind, out); CommandSerializers.ballot.serialize(msg.ballot, out); ExecuteAtSerializer.serialize(msg.txnId, msg.executeAt, out); - CommandSerializers.nullablePartialTxn.serialize(msg.partialTxn, out, version); + CommandSerializers.nullablePartialTxn.serialize(msg.partialTxn(), out, version); if (msg.kind.withDeps == Commit.WithDeps.HasDeps) - DepsSerializers.partialDeps.serialize(msg.partialDeps, out); + DepsSerializers.partialDeps.serialize(msg.partialDeps(), out); serializeNullable(msg.route, out, KeySerializers.fullRoute); } @@ -78,10 +78,10 @@ public class CommitSerializers long size = kind.serializedSize(msg.kind) + CommandSerializers.ballot.serializedSize(msg.ballot) + ExecuteAtSerializer.serializedSize(msg.txnId, msg.executeAt) - + CommandSerializers.nullablePartialTxn.serializedSize(msg.partialTxn, version); + + CommandSerializers.nullablePartialTxn.serializedSize(msg.partialTxn(), version); if (msg.kind.withDeps == Commit.WithDeps.HasDeps) - size += DepsSerializers.partialDeps.serializedSize(msg.partialDeps); + size += DepsSerializers.partialDeps.serializedSize(msg.partialDeps()); size += serializedNullableSize(msg.route, KeySerializers.fullRoute); return size; diff --git a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java index 3eb2fa4984..b3f81bec28 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java @@ -56,7 +56,7 @@ public class RecoverySerializers static final int HAS_EXECUTE_AT_EPOCH = 0x2; static final int IS_FAST_PATH_DECIDED = 0x4; static final int SIZE_OF_FLAGS = VIntCoding.computeUnsignedVIntSize(HAS_ROUTE | HAS_EXECUTE_AT_EPOCH | IS_FAST_PATH_DECIDED); - public static final IVersionedSerializer<BeginRecovery> request = new WithUnsyncedSerializer<BeginRecovery>() + public static final IVersionedSerializer<BeginRecovery> request = new WithUnsyncedSerializer<>() { @Override public void serializeBody(BeginRecovery recover, DataOutputPlus out, Version version) throws IOException diff --git a/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java b/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java index fe2cbe2613..37e5efdb8f 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java @@ -20,14 +20,14 @@ package org.apache.cassandra.service.accord.serializers; import java.io.IOException; -import accord.messages.TxnRequest; +import accord.messages.RouteRequest; import accord.primitives.Route; import accord.primitives.TxnId; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -public abstract class TxnRequestSerializer<T extends TxnRequest<?>> implements IVersionedSerializer<T> +public abstract class TxnRequestSerializer<T extends RouteRequest<?>> implements IVersionedSerializer<T> { void serializeHeader(T msg, DataOutputPlus out, Version version) throws IOException { @@ -72,7 +72,7 @@ public abstract class TxnRequestSerializer<T extends TxnRequest<?>> implements I return serializedHeaderSize(msg, version) + serializedBodySize(msg, version); } - public static abstract class WithUnsyncedSerializer<T extends TxnRequest.WithUnsynced<?>> extends TxnRequestSerializer<T> + public static abstract class WithUnsyncedSerializer<T extends RouteRequest.WithUnsynced<?>> extends TxnRequestSerializer<T> { @Override void serializeHeader(T msg, DataOutputPlus out, Version version) throws IOException diff --git a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java index 341eff845b..a40a42dded 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java +++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java @@ -28,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.api.ConfigurationService.EpochReady; import com.googlecode.concurrenttrees.common.Iterables; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.db.ColumnFamilyStore; @@ -362,7 +363,7 @@ public class BootstrapAndJoin extends MultiStepOperation<Epoch> StorageService.instance.repairPaxosForTopologyChange("bootstrap"); Future<StreamState> bootstrapStream = StorageService.instance.startBootstrap(metadata, beingReplaced, movements, strictMovements); - Future<?> accordReady = AccordService.instance().epochReadyFor(metadata); + Future<?> accordReady = AccordService.instance().epochReadyFor(metadata, EpochReady::reads); Future<?> ready = FutureCombiner.allOf(bootstrapStream, accordReady); try diff --git a/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java b/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java index 68fc2471ac..8a121fd2c2 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java +++ b/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.api.ConfigurationService.EpochReady; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.schema.Keyspaces; @@ -168,7 +169,7 @@ public class DropAccordTable extends MultiStepOperation<Epoch> return error(new IllegalStateException(String.format("Table %s is in an invalid state to be dropped", table))); long startNanos = nanoTime(); - AccordService.instance().epochReady(metadata.epoch).get(); + AccordService.instance().epochReady(metadata.epoch, EpochReady::reads).get(); long epochEndNanos = nanoTime(); // As of this writing this logic is based off ExclusiveSyncPoints which is a bit heavy weight for what is needed, this could cause timeouts for clusters that have a lot of data. diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java b/src/java/org/apache/cassandra/tcm/sequences/Move.java index b54b796749..25058e20f2 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/Move.java +++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java @@ -31,6 +31,7 @@ import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.api.ConfigurationService.EpochReady; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.TypeSizes; @@ -258,7 +259,7 @@ public class Move extends MultiStepOperation<Epoch> StreamResultFuture streamResult = streamPlan.execute(); - Future<?> accordReady = AccordService.instance().epochReadyFor(metadata); + Future<?> accordReady = AccordService.instance().epochReadyFor(metadata, EpochReady::reads); Future<?> ready = FutureCombiner.allOf(streamResult, accordReady); ready.get(); StorageService.instance.repairPaxosForTopologyChange("move"); diff --git a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java index 51a551a19a..a2d0429c3f 100644 --- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java +++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java @@ -49,6 +49,7 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Futures; +import accord.api.ConfigurationService.EpochReady; import org.agrona.collections.IntArrayList; import org.apache.cassandra.tcm.compatibility.TokenRingUtils; import org.apache.cassandra.utils.FBUtilities; @@ -1698,7 +1699,7 @@ public class ClusterUtils i.runOnInstance(() -> { try { - AccordService.instance().epochReady(Epoch.create(epoch)).get(); + AccordService.instance().epochReady(Epoch.create(epoch), EpochReady::reads).get(); } catch (InterruptedException | ExecutionException e) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java index 7d4320aa01..6d91a59237 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java +++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java @@ -44,7 +44,7 @@ import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import accord.messages.AbstractRequest; +import accord.messages.NoWaitRequest; import net.openhft.chronicle.core.util.SerializablePredicate; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.cql3.CQLTester; @@ -109,8 +109,8 @@ public class TestBaseImpl extends DistributedTestBase // This isn't perfect at excluding messages so make sure it excludes the ones you care about in your test public static final SerializablePredicate<Message<?>> EXCLUDE_SYNC_POINT_MESSAGES = message -> { - if (message.payload instanceof AbstractRequest) - return !((AbstractRequest<?>)message.payload).txnId.isSyncPoint(); + if (message.payload instanceof NoWaitRequest<?,?>) + return !((NoWaitRequest<?,?>)message.payload).txnId.isSyncPoint(); return true; }; @@ -251,7 +251,7 @@ public class TestBaseImpl extends DistributedTestBase return sb.toString(); } - protected void bootstrapAndJoinNode(Cluster cluster) + protected IInvokableInstance bootstrapAndJoinNode(Cluster cluster) { IInstanceConfig config = cluster.newInstanceConfig(); config.set("auto_bootstrap", true); @@ -261,6 +261,7 @@ public class TestBaseImpl extends DistributedTestBase () -> newInstance.startup(cluster)); newInstance.nodetoolResult("join").asserts().success(); newInstance.nodetoolResult("cms", "describe").asserts().success(); // just make sure we're joined, remove later + return newInstance; } @SuppressWarnings("unchecked") diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java index d30c3d887a..49632ed153 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java @@ -24,13 +24,16 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.Function; import org.junit.Assert; import org.junit.Test; -import accord.local.PreLoadContext; +import accord.api.ConfigurationService.EpochReady; +import accord.primitives.RoutingKeys; import accord.primitives.Timestamp; import accord.topology.TopologyManager; +import accord.utils.async.AsyncResult; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; @@ -41,14 +44,15 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableFunction; import org.apache.cassandra.distributed.api.TokenSupplier; import org.apache.cassandra.distributed.shared.NetworkTopology; import org.apache.cassandra.distributed.test.TestBaseImpl; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.accord.AccordCommandStore; import org.apache.cassandra.service.accord.AccordConfigurationService; -import org.apache.cassandra.service.accord.AccordConfigurationService.EpochSnapshot; import org.apache.cassandra.service.accord.AccordSafeCommandStore; import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.service.accord.api.PartitionKey; @@ -66,6 +70,8 @@ import static com.google.common.collect.Iterables.getOnlyElement; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; import static org.apache.cassandra.distributed.api.Feature.NETWORK; import static org.apache.cassandra.service.accord.AccordService.getBlocking; +import static org.apache.cassandra.service.accord.AccordConfigurationService.EpochSnapshot.ResultStatus.SUCCESS; +import static org.apache.cassandra.service.accord.AccordConfigurationService.SyncStatus.COMPLETED; public class AccordBootstrapTest extends TestBaseImpl { @@ -81,7 +87,7 @@ public class AccordBootstrapTest extends TestBaseImpl return new PartitionKey(tid, dk(key)); } - protected void bootstrapAndJoinNode(Cluster cluster) + protected IInvokableInstance bootstrapAndJoinNode(Cluster cluster) { IInstanceConfig config = cluster.newInstanceConfig(); config.set("auto_bootstrap", true); @@ -94,6 +100,7 @@ public class AccordBootstrapTest extends TestBaseImpl // () -> withProperty("cassandra.join_ring", false, () -> newInstance.startup(cluster))); // newInstance.nodetoolResult("join").asserts().success(); newInstance.nodetoolResult("cms", "describe").asserts().success(); // just make sure we're joined, remove later + return newInstance; } private static AccordService service() @@ -101,11 +108,11 @@ public class AccordBootstrapTest extends TestBaseImpl return (AccordService) AccordService.instance(); } - private static void awaitEpoch(long epoch) + private static void awaitEpoch(long epoch, Function<EpochReady, AsyncResult<Void>> await) { try { - boolean completed = service().epochReady(Epoch.create(epoch)).await(60, TimeUnit.SECONDS); + boolean completed = service().epochReady(Epoch.create(epoch), await).await(60, TimeUnit.SECONDS); Assertions.assertThat(completed) .describedAs("Epoch %s did not become ready within timeout on %s -> %s", epoch, FBUtilities.getBroadcastAddressAndPort(), @@ -168,6 +175,14 @@ public class AccordBootstrapTest extends TestBaseImpl @Test public void bootstrapTest() throws Throwable + { + bootstrapTest(Function.identity(), cluster -> { + bootstrapAndJoinNode(cluster); + awaitMaxEpochReadyToRead(cluster); + }); + } + + public void bootstrapTest(Function<Cluster.Builder, Cluster.Builder> setup, Consumer<Cluster> bootstrapAndJoinNode) throws Throwable { int originalNodeCount = 2; int expandedNodeCount = originalNodeCount + 1; @@ -188,49 +203,10 @@ public class AccordBootstrapTest extends TestBaseImpl cluster.schemaChange("CREATE KEYSPACE ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor':2}"); cluster.schemaChange("CREATE TABLE ks.tbl (k int, c int, v int, primary key(k, c)) WITH transactional_mode='full'"); - long initialMax = maxEpoch(cluster); - + awaitMaxEpochReadyToRead(cluster); for (IInvokableInstance node : cluster) - { - node.runOnInstance(() -> { - Assert.assertEquals(initialMax, ClusterMetadata.current().epoch.getEpoch()); - awaitEpoch(initialMax); - AccordConfigurationService configService = service().configService(); - long minEpoch = configService.minEpoch(); - - Assert.assertEquals(initialMax, configService.maxEpoch()); - - for (long epoch = minEpoch; epoch < initialMax; epoch++) - { - awaitEpoch(epoch); - Assert.assertEquals(EpochSnapshot.completed(epoch), configService.getEpochSnapshot(epoch)); - } - - awaitLocalSyncNotification(initialMax); - Assert.assertEquals(EpochSnapshot.completed(initialMax), configService.getEpochSnapshot(initialMax)); - }); - } - - for (IInvokableInstance node : cluster) - { node.runOnInstance(StreamListener::register); - } - - long schemaChangeMax = maxEpoch(cluster); - for (IInvokableInstance node : cluster) - { - node.runOnInstance(() -> { - ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(schemaChangeMax)); - awaitEpoch(schemaChangeMax); - AccordConfigurationService configService = service().configService(); - - for (long epoch = initialMax + 1; epoch <= schemaChangeMax; epoch++) - { - awaitLocalSyncNotification(epoch); - Assert.assertEquals(EpochSnapshot.completed(epoch), configService.getEpochSnapshot(epoch)); - } - }); - } + awaitMaxEpochReadyToRead(cluster); for (int key = 0; key < 100; key++) { @@ -251,21 +227,7 @@ public class AccordBootstrapTest extends TestBaseImpl }); } - bootstrapAndJoinNode(cluster); - long bootstrapMax = maxEpoch(cluster); - for (IInvokableInstance node : cluster) - { - node.runOnInstance(() -> { - ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(bootstrapMax)); - Assert.assertEquals(bootstrapMax, ClusterMetadata.current().epoch.getEpoch()); - AccordService service = (AccordService) AccordService.instance(); - awaitEpoch(bootstrapMax); - AccordConfigurationService configService = service.configService(); - - awaitLocalSyncNotification(bootstrapMax); - Assert.assertEquals(EpochSnapshot.completed(bootstrapMax), configService.getEpochSnapshot(bootstrapMax)); - }); - } + bootstrapAndJoinNode.accept(cluster); InetAddress node3Addr = cluster.get(3).broadcastAddress().getAddress(); for (IInvokableInstance node : cluster.get(1, 2)) @@ -278,15 +240,11 @@ public class AccordBootstrapTest extends TestBaseImpl Assert.assertTrue(session.getNumKeyspaceTransfers() > 0); }); - getBlocking(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test", safeStore -> { - AccordSafeCommandStore ss = (AccordSafeCommandStore) safeStore; - Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.bootstrapBeganAt().keySet())); - Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.safeToReadAt().keySet())); -// -// Assert.assertTrue(commandStore.maxBootstrapEpoch() > 0); -// Assert.assertTrue(commandStore.bootstrapBeganAt().isEmpty()); -// Assert.assertTrue(commandStore.safeToRead().isEmpty()); - })); + service().node().commandStores().forAllUnsafe(unsafeStore -> { + AccordCommandStore ss = (AccordCommandStore) unsafeStore; + Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.unsafeGetBootstrapBeganAt().keySet())); + Assert.assertEquals(Timestamp.NONE, getOnlyElement(ss.unsafeGetSafeToRead().keySet())); + }); }); } @@ -321,7 +279,7 @@ public class AccordBootstrapTest extends TestBaseImpl Assert.assertEquals(key, row.getInt("c")); Assert.assertEquals(key, row.getInt("v")); - getBlocking(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test", safeStore -> { + getBlocking(service().node().commandStores().forEach("Test", RoutingKeys.of(partitionKey.toUnseekable()), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> { if (safeStore.ranges().currentRanges().contains(partitionKey)) { AccordSafeCommandStore ss = (AccordSafeCommandStore) safeStore; @@ -375,44 +333,7 @@ public class AccordBootstrapTest extends TestBaseImpl tokens[i] = cluster.get(i+1).callOnInstance(() -> Long.valueOf(getOnlyElement(StorageService.instance.getTokens()))); } - for (IInvokableInstance node : cluster) - { - - node.runOnInstance(() -> { - Assert.assertEquals(initialMax, ClusterMetadata.current().epoch.getEpoch()); - awaitEpoch(initialMax); - AccordConfigurationService configService = service().configService(); - long minEpoch = configService.minEpoch(); - - Assert.assertEquals(initialMax, configService.maxEpoch()); - - for (long epoch = minEpoch; epoch < initialMax; epoch++) - { - awaitEpoch(epoch); - Assert.assertEquals(EpochSnapshot.completed(epoch), configService.getEpochSnapshot(epoch)); - } - - awaitLocalSyncNotification(initialMax); - Assert.assertEquals(EpochSnapshot.completed(initialMax), configService.getEpochSnapshot(initialMax)); - }); - } - - long schemaChangeMax = maxEpoch(cluster); - for (IInvokableInstance node : cluster) - { - node.runOnInstance(() -> { - Assert.assertEquals(schemaChangeMax, ClusterMetadata.current().epoch.getEpoch()); - AccordService service = (AccordService) AccordService.instance(); - awaitEpoch(schemaChangeMax); - AccordConfigurationService configService = service.configService(); - - for (long epoch = initialMax + 1; epoch <= schemaChangeMax; epoch++) - { - awaitLocalSyncNotification(epoch); - Assert.assertEquals(EpochSnapshot.completed(epoch), configService.getEpochSnapshot(epoch)); - } - }); - } + awaitMaxEpochReadyToRead(cluster); for (int key = 0; key < 100; key++) { @@ -431,20 +352,7 @@ public class AccordBootstrapTest extends TestBaseImpl cluster.get(1).runOnInstance(() -> StorageService.instance.move(Long.toString(token))); - long moveMax = maxEpoch(cluster); - for (IInvokableInstance node : cluster) - { - node.runOnInstance(() -> { - ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(moveMax)); - Assert.assertEquals(moveMax, ClusterMetadata.current().epoch.getEpoch()); - AccordService service = (AccordService) AccordService.instance(); - awaitEpoch(moveMax); - AccordConfigurationService configService = service.configService(); - - awaitLocalSyncNotification(moveMax); - Assert.assertEquals(EpochSnapshot.completed(moveMax), configService.getEpochSnapshot(moveMax)); - }); - } + long moveMax = awaitMaxEpochReadyToRead(cluster); for (IInvokableInstance node : cluster) { @@ -464,9 +372,7 @@ public class AccordBootstrapTest extends TestBaseImpl PartitionKey partitionKey = new PartitionKey(tableId, dk); - getBlocking(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test", - partitionKey.toUnseekable(), moveMax, moveMax, - safeStore -> { + getBlocking(service().node().commandStores().forEach("Test", RoutingKeys.of(partitionKey.toUnseekable()), moveMax, moveMax, safeStore -> { if (!safeStore.ranges().allAt(preMove).contains(partitionKey)) { AccordSafeCommandStore ss = (AccordSafeCommandStore) safeStore; @@ -493,4 +399,41 @@ public class AccordBootstrapTest extends TestBaseImpl } } } + + private static long awaitMaxEpochReadyToRead(Cluster cluster) + { + return awaitMaxEpoch(cluster, EpochReady::reads, true); + } + + private static long awaitMaxEpochMetadataReady(Cluster cluster) + { + return awaitMaxEpoch(cluster, EpochReady::metadata, false); + } + + private static long awaitMaxEpoch(Cluster cluster, SerializableFunction<EpochReady, AsyncResult<Void>> await, boolean expectReadyToRead) + { + long maxEpoch = maxEpoch(cluster); + for (IInvokableInstance node : cluster) + { + node.acceptOnInstance(aw -> { + ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(maxEpoch)); + Assert.assertEquals(maxEpoch, ClusterMetadata.current().epoch.getEpoch()); + AccordService service = (AccordService) AccordService.instance(); + awaitEpoch(maxEpoch, aw); + AccordConfigurationService configService = service.configService(); + + awaitLocalSyncNotification(maxEpoch); + for (long epoch = configService.minEpoch(); epoch <= maxEpoch; epoch++) + { + Assert.assertEquals(COMPLETED, configService.getEpochSnapshot(maxEpoch).syncStatus); + Assert.assertEquals(SUCCESS, configService.getEpochSnapshot(maxEpoch).acknowledged); + Assert.assertEquals(SUCCESS, configService.getEpochSnapshot(maxEpoch).received); + if (expectReadyToRead) + Assert.assertEquals(SUCCESS, configService.getEpochSnapshot(maxEpoch).reads); + } + }, node.transfer(await)); + } + return maxEpoch; + } + } diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java index a5a38edadf..bbb8a7f4cb 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import accord.local.Node; -import accord.local.PreLoadContext; import accord.local.SafeCommand; import accord.local.StoreParticipants; import accord.local.cfk.CommandsForKey; @@ -32,6 +31,7 @@ import accord.local.cfk.SafeCommandsForKey; import accord.local.durability.DurabilityService; import accord.primitives.Keys; import accord.primitives.Ranges; +import accord.primitives.RoutingKeys; import accord.primitives.Status; import accord.primitives.Timestamp; import accord.primitives.TxnId; @@ -68,8 +68,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static accord.local.LoadKeys.SYNC; -import static accord.local.LoadKeysFor.READ_WRITE; import static java.lang.String.format; import static org.apache.cassandra.distributed.test.accord.AccordTestBase.executeWithRetry; import static org.apache.cassandra.service.accord.AccordService.getBlocking; @@ -158,7 +156,7 @@ public class AccordIncrementalRepairTest extends TestBaseImpl { cluster.filters().reset(); for (IInvokableInstance instance : cluster) - instance.runOnInstance(() -> AccordService.instance().node().commandStores().forEachCommandStore(cs -> cs.unsafeProgressLog().start())); + instance.runOnInstance(() -> AccordService.instance().node().commandStores().forAllUnsafe(cs -> cs.unsafeProgressLog().start())); } } } @@ -207,7 +205,7 @@ public class AccordIncrementalRepairTest extends TestBaseImpl { Node node = accordService().node(); AtomicReference<TxnId> waitFor = new AtomicReference<>(null); - getBlocking(node.commandStores().ifLocal(PreLoadContext.contextFor(key, SYNC, READ_WRITE, "Test"), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> { + getBlocking(node.commandStores().forEach("Test", RoutingKeys.of(key), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> { AccordSafeCommandStore store = (AccordSafeCommandStore) safeStore; SafeCommandsForKey safeCfk = store.ifLoadedAndInitialised(key); if (safeCfk == null) @@ -229,7 +227,7 @@ public class AccordIncrementalRepairTest extends TestBaseImpl long now = Clock.Global.currentTimeMillis(); if (now - start > TimeUnit.MINUTES.toMillis(1)) throw new AssertionError("Timeout"); - AccordService.getBlocking(node.commandStores().ifLocal(PreLoadContext.contextFor(txnId, "Test"), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> { + getBlocking(node.commandStores().forEach("Test", RoutingKeys.of(key), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> { SafeCommand command = safeStore.get(txnId, StoreParticipants.empty(txnId)); Assert.assertNotNull(command.current()); if (command.current().status().hasBeen(Status.Applied)) @@ -291,7 +289,7 @@ public class AccordIncrementalRepairTest extends TestBaseImpl // heal partition and wait for node 1 to see node 3 again for (IInvokableInstance instance : cluster) instance.runOnInstance(() -> { - AccordService.instance().node().commandStores().forEachCommandStore(cs -> cs.unsafeProgressLog().stop()); + AccordService.instance().node().commandStores().forAllUnsafe(cs -> cs.unsafeProgressLog().stop()); Assert.assertFalse(barrierRecordingService().executedBarriers); }); cluster.filters().reset(); diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java index c2b8f1e486..a7354e17b7 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java @@ -125,6 +125,6 @@ public class AccordIntegrationTest extends AccordTestBase private void pauseSimpleProgressLog() { for (IInvokableInstance instance : SHARED_CLUSTER) - instance.runOnInstance(() -> AccordService.instance().node().commandStores().forEachCommandStore(cs -> cs.unsafeProgressLog().stop())); + instance.runOnInstance(() -> AccordService.instance().node().commandStores().forAllUnsafe(cs -> cs.unsafeProgressLog().stop())); } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java index b4c53b6159..cb91e9a160 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java @@ -166,7 +166,7 @@ public abstract class AccordTestBase extends TestBaseImpl { SHARED_CLUSTER.filters().reset(); for (IInvokableInstance instance : SHARED_CLUSTER) - instance.runOnInstance(() -> AccordService.instance().node().commandStores().forEachCommandStore(cs -> cs.unsafeProgressLog().start())); + instance.runOnInstance(() -> AccordService.instance().node().commandStores().forAllUnsafe(cs -> cs.unsafeProgressLog().start())); truncateSystemTables(); diff --git a/test/distributed/org/apache/cassandra/fuzz/topology/AccordBounceTest.java b/test/distributed/org/apache/cassandra/fuzz/topology/AccordBounceTest.java index 1e311bf8a5..b6aeb6231d 100644 --- a/test/distributed/org/apache/cassandra/fuzz/topology/AccordBounceTest.java +++ b/test/distributed/org/apache/cassandra/fuzz/topology/AccordBounceTest.java @@ -150,9 +150,9 @@ public class AccordBounceTest extends FuzzTestBase // Command Stores should not be lost on bounce Map<Integer, Set<String>> before = cluster.get(1).callOnInstance(() -> { Map<Integer, Set<String>> m = new HashMap<>(); - AccordService.instance().node().commandStores().forEach((store, ranges) -> { + AccordService.instance().node().commandStores().forAllUnsafe((store) -> { Set<String> set = new HashSet<>(); - for (Range range : ranges.all()) + for (Range range : store.unsafeGetRangesForEpoch().all()) set.add(range.toString()); m.put(store.id(), set); }); @@ -169,9 +169,9 @@ public class AccordBounceTest extends FuzzTestBase Map<Integer, Set<String>> after = cluster.get(1).callOnInstance(() -> { Map<Integer, Set<String>> m = new HashMap<>(); - AccordService.instance().node().commandStores().forEach((store, ranges) -> { + AccordService.instance().node().commandStores().forAllUnsafe(store -> { Set<String> set = new HashSet<>(); - for (Range range : ranges.all()) + for (Range range : store.unsafeGetRangesForEpoch().all()) set.add(range.toString()); m.put(store.id(), set); }); diff --git a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java index 25b7b952c8..42db56fb94 100644 --- a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java +++ b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java @@ -354,11 +354,11 @@ public class AccordJournalBurnTest extends BurnTestBase } @Override - public void replay(CommandStores commandStores) + public boolean replay(CommandStores commandStores) { // Make sure to replay _only_ static segments this.closeCurrentSegmentForTestingIfNonEmpty(); - super.replay(commandStores); + return super.replay(commandStores); } @Override @@ -388,7 +388,7 @@ public class AccordJournalBurnTest extends BurnTestBase public static IAccordService.AccordCompactionInfos getCompactionInfo(Node node, TableId tableId) { IAccordService.AccordCompactionInfos compactionInfos = new IAccordService.AccordCompactionInfos(node.durableBefore(), node.topology().minEpoch()); - node.commandStores().forEachCommandStore(commandStore -> { + node.commandStores().forAllUnsafe(commandStore -> { RedundantBefore redundantBefore = commandStore.unsafeGetRedundantBefore(); if (redundantBefore == null) redundantBefore = RedundantBefore.EMPTY; diff --git a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java index 7f7e44fd9f..024e343ed7 100644 --- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java +++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java @@ -34,8 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.api.ProtocolModifiers; -import accord.local.PreLoadContext; -import accord.messages.TxnRequest; +import accord.messages.NoWaitRequest; import accord.primitives.Ranges; import accord.primitives.Routable; import accord.primitives.SaveStatus; @@ -212,7 +211,7 @@ public class AccordDebugKeyspaceTest extends CQLTester TxnId syncId2 = new TxnId(101, 300, Txn.Kind.ExclusiveSyncPoint, Routable.Domain.Range, accord.nodeId()); Ranges ranges1 = Ranges.of(TokenRange.create(new TokenKey(tableId, new LongToken(1)), new TokenKey(tableId, new LongToken(100)))); Ranges ranges2 = Ranges.of(TokenRange.create(new TokenKey(tableId, new LongToken(100)), new TokenKey(tableId, new LongToken(200)))); - getBlocking(accord.node().commandStores().forEach((PreLoadContext.Empty)() -> "Test", safeStore -> { + getBlocking(accord.node().commandStores().forAll("Test", safeStore -> { safeStore.commandStore().markShardDurable(safeStore, syncId1, ranges1, HasOutcome.Universal); safeStore.commandStore().markShardDurable(safeStore, syncId2, ranges2, HasOutcome.Quorum); })); @@ -436,9 +435,9 @@ public class AccordDebugKeyspaceTest extends CQLTester if (!msg.verb().name().startsWith("ACCORD_")) return true; TxnId txnId = null; - if (msg.payload instanceof TxnRequest) + if (msg.payload instanceof NoWaitRequest<?,?>) { - txnId = ((TxnRequest<?>) msg.payload).txnId; + txnId = ((NoWaitRequest<?,?>) msg.payload).txnId; if (applyTo != null && !applyTo.contains(txnId)) return true; } diff --git a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java index d0c2d87f1e..8c6447538f 100644 --- a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java +++ b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java @@ -30,6 +30,8 @@ import java.util.TreeSet; import java.util.function.Supplier; import java.util.stream.Collectors; import javax.annotation.Nullable; + +import accord.api.ConfigurationService.EpochReady; import accord.api.Journal; import accord.api.RoutingKey; import accord.local.CommandStores; @@ -503,7 +505,7 @@ public class RouteIndexTest extends CQLTester storeRangesForEpochs.put(i, new RangesForEpoch(1, Ranges.of(TokenRange.fullRange(tableId, getPartitioner())))); accordService = startAccord(); - accordService.epochReady(ClusterMetadata.current().epoch).awaitUninterruptibly(); + accordService.epochReady(ClusterMetadata.current().epoch, EpochReady::reads).awaitUninterruptibly(); minDecidedIdNull = rs.nextFloat(); txnWriteFrequency = rs.pickInt(1, // every txn is a Write diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java index b714b833f1..ac322403d1 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java @@ -176,7 +176,7 @@ public class AccordCommandTest Command before = safeStore.ifInitialised(txnId).current(); Assert.assertEquals(commit.executeAt, before.executeAt()); Assert.assertTrue(before.hasBeen(Status.Committed)); - Assert.assertEquals(commit.partialDeps, before.partialDeps()); + Assert.assertEquals(commit.partialDeps(), before.partialDeps()); CommandsForKey cfk = safeStore.get(key(1).toUnseekable()).current(); Assert.assertTrue(cfk.indexOf(txnId) >= 0); diff --git a/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java b/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java index adaff55578..081aaeec47 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java @@ -41,7 +41,7 @@ import org.junit.BeforeClass; import org.junit.Test; import accord.api.Agent; -import accord.impl.AbstractConfigurationService; +import accord.impl.AbstractTestConfigurationService; import accord.impl.TestAgent; import accord.impl.basic.Pending; import accord.impl.basic.PendingQueue; @@ -410,7 +410,7 @@ public class AccordSyncPropagatorTest } } - private class ConfigService extends AbstractConfigurationService.Minimal implements AccordSyncPropagator.Listener + private class ConfigService extends AbstractTestConfigurationService implements AccordSyncPropagator.Listener { private final Map<Long, Set<Node.Id>> syncCompletes = new HashMap<>(); private final Map<Long, Set<Node.Id>> endpointAcks = new HashMap<>(); @@ -436,7 +436,7 @@ public class AccordSyncPropagatorTest } @Override - protected void localSyncComplete(Topology topology, boolean startSync) + protected void onReadyToCoordinate(Topology topology, boolean startSync) { Set<Node.Id> notify = topology.nodes().stream().filter(i -> !localId.equals(i)).collect(Collectors.toSet()); instances.get(localId).propagator.reportSyncComplete(topology.epoch(), notify, localId); diff --git a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java index b9602dec07..d73763ba5a 100644 --- a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java +++ b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java @@ -59,7 +59,7 @@ import accord.local.durability.DurabilityService; import accord.messages.BeginRecovery; import accord.messages.PreAccept; import accord.messages.Reply; -import accord.messages.TxnRequest; +import accord.messages.RouteRequest; import accord.primitives.AbstractUnseekableKeys; import accord.primitives.Ballot; import accord.primitives.EpochSupplier; @@ -421,9 +421,9 @@ public class SimulatedAccordCommandStore implements AutoCloseable throw error; } - public <T extends Reply> T process(TxnRequest<T> request) throws ExecutionException, InterruptedException + public <T extends Reply> T process(RouteRequest<T> request) throws ExecutionException, InterruptedException { - return process(request, request::apply); + return process(request, request); } public <T extends Reply> T process(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function) throws ExecutionException, InterruptedException @@ -433,9 +433,9 @@ public class SimulatedAccordCommandStore implements AutoCloseable return getBlocking(result); } - public <T extends Reply> AsyncResult<T> processAsync(TxnRequest<T> request) + public <T extends Reply> AsyncResult<T> processAsync(RouteRequest<T> request) { - return processAsync(request, request::apply); + return processAsync(request, request); } public <T extends Reply> AsyncResult<T> processAsync(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function) diff --git a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java index d54058b727..f18065adc9 100644 --- a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java +++ b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java @@ -120,9 +120,9 @@ public class SimulatedAccordTaskTest extends SimulatedAccordCommandStoreTestBase FullRoute<?> route = txnWithRoute.right; PreAccept preAccept = new PreAccept(nodeId, instance.topologies, txnId, txn, null, false, route) { @Override - public PreAcceptReply apply(SafeCommandStore safeStore) + public PreAcceptReply applyInternal(SafeCommandStore safeStore) { - PreAcceptReply result = super.apply(safeStore); + PreAcceptReply result = super.applyInternal(safeStore); if (action == Action.FAILURE) throw new SimulatedFault("PreAccept failed for keys " + keys()); return result; diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java index 884638da98..edca7e1c6b 100644 --- a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java +++ b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Random; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.function.BooleanSupplier; import java.util.function.Consumer; @@ -49,6 +50,7 @@ import accord.api.AsyncExecutor; import accord.api.DataStore; import accord.api.Journal; import accord.api.Key; +import accord.api.OwnershipEventListener; import accord.api.ProgressLog; import accord.api.RoutingKey; import accord.api.Timeouts; @@ -657,8 +659,8 @@ public class CommandsForKeySerializerTest @Override public Agent agent() { return this; } @Override public void execute(Runnable run) {} @Override public void shutdown() { } - @Override public void onFailedBootstrap(int attempts, String phase, Ranges ranges, Runnable retry, Throwable failure) { throw new UnsupportedOperationException(); } - @Override public void onStale(Timestamp staleSince, Ranges ranges) { throw new UnsupportedOperationException(); } + @Override public <T> AsyncChain<T> chain(Callable<T> call) { throw new UnsupportedOperationException(); } + @Override public OwnershipEventListener ownershipEvents() { return null; } @Override public void onUncaughtException(Throwable t) { throw new UnsupportedOperationException(); } @Override public void onCaughtException(Throwable t, String context) { throw new UnsupportedOperationException(); } @Override public boolean rejectPreAccept(TimeService time, TxnId txnId) { throw new UnsupportedOperationException(); } diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java b/test/unit/org/apache/cassandra/utils/AccordGenerators.java index bd994fc27a..83822db7e2 100644 --- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java +++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java @@ -90,7 +90,7 @@ import org.quicktheories.impl.JavaRandom; import static accord.local.CommandStores.RangesForEpoch; import static accord.local.RedundantStatus.Property.GC_BEFORE; -import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP; +import static accord.local.RedundantStatus.Property.UNREADY; import static accord.local.RedundantStatus.SomeStatus.LOCALLY_APPLIED_ONLY; import static accord.local.RedundantStatus.SomeStatus.LOCALLY_WITNESSED_ONLY; import static accord.local.RedundantStatus.SomeStatus.SHARD_APPLIED_ONLY; @@ -601,9 +601,9 @@ public class AccordGenerators if (rs.nextBoolean()) bounds.add(Bounds.create(range, txnIdGen.next(rs).addFlag(SHARD_BOUND), oneSlow(GC_BEFORE), null )); if (rs.nextBoolean()) - bounds.add(Bounds.create(range, txnIdGen.next(rs), oneSlow(PRE_BOOTSTRAP), null )); + bounds.add(Bounds.create(range, txnIdGen.next(rs), oneSlow(UNREADY), null )); if (rs.nextBoolean()) - bounds.add(new Bounds(range, Long.MIN_VALUE, Long.MAX_VALUE, new TxnId[0], new short[0], txnIdGen.next(rs))); + bounds.add(new Bounds(range, Long.MIN_VALUE, Long.MAX_VALUE, new TxnId[0], new int[0], txnIdGen.next(rs))); Collections.shuffle(bounds); long endEpoch = emptyGen.next(rs) ? Long.MAX_VALUE : rs.nextLong(0, Long.MAX_VALUE); @@ -618,7 +618,7 @@ public class AccordGenerators } long startEpoch = rs.nextLong(Math.min(minEpoch, endEpoch)); - Bounds epochBounds = new Bounds(range, startEpoch, endEpoch, new TxnId[0], new short[0], null); + Bounds epochBounds = new Bounds(range, startEpoch, endEpoch, new TxnId[0], new int[0], null); if (result == null) return epochBounds; return Bounds.reduce(result, epochBounds); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
