This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 1bba093c6ce366ca7bcd152cb40d356235e53c38 Author: Benedict Elliott Smith <[email protected]> AuthorDate: Wed Feb 19 11:52:31 2025 +0000 Fix: - Only use persisted RedundantBefore for compaction - RouteIndex should index only touches, not Route - Flush RangesForEpoch updates to journal immediately, so we do not rely on the command we are processing succeeding - DurableBefore updates must wait for the epochs to be known locally - Shard.mustWitnessEpoch to support guaranteeing to witness relevant non-topology schema changes - We must propagate RedundantBefore RX shard bounds along with epoch syncs - Prevent a truncated transaction FetchData infinite loop - GC_BEFORE status being overwritten by bootstrappedAt, permitting old transaction state to be resurrected - Avoid CFK.maxUniqueHlc read race on bootstrap - TopologyManager.awaitEpoch could wait for wrong epoch - Journal fsync thread could miss notifications Also improve: - CommandStores uses SearchableRangeList for finding matching stores - Refactor RedundantBefore to use a sorted array of TxnId/RedundantStatus pairs (to better fix GC_BEFORE issue) - Accord debug keyspace operates on keyspace/table, and sorts correctly by token patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20361 --- modules/accord | 2 +- .../db/compaction/CompactionIterator.java | 2 +- .../cassandra/db/virtual/AccordDebugKeyspace.java | 271 +++++++++++++-------- .../cassandra/index/accord/RangeMemoryIndex.java | 4 +- .../cassandra/index/accord/RouteIndexFormat.java | 20 +- .../cassandra/index/accord/RouteJournalIndex.java | 3 +- src/java/org/apache/cassandra/journal/Flusher.java | 6 +- src/java/org/apache/cassandra/journal/Journal.java | 2 +- .../service/accord/AccordCommandStore.java | 48 +++- .../service/accord/AccordFetchCoordinator.java | 6 +- .../cassandra/service/accord/AccordKeyspace.java | 2 +- .../service/accord/AccordObjectSizes.java | 2 +- .../service/accord/AccordSafeCommandStore.java | 30 +++ .../cassandra/service/accord/AccordService.java | 25 +- .../cassandra/service/accord/AccordTask.java | 6 +- .../cassandra/service/accord/AccordTopology.java | 20 +- .../accord/CommandStoreTxnBlockedGraph.java | 4 +- .../service/accord/interop/AccordInteropRead.java | 54 ++-- .../accord/interop/AccordInteropReadRepair.java | 2 +- .../accord/serializers/CommandSerializers.java | 17 +- .../serializers/CommandStoreSerializers.java | 91 ++++--- .../accord/serializers/TopologySerializers.java | 11 +- .../cassandra/service/accord/txn/TxnRead.java | 4 +- .../compaction/CompactionAccordIteratorsTest.java | 6 +- .../db/virtual/AccordDebugKeyspaceTest.java | 14 +- .../cassandra/dht/RandomPartitionerTest.java | 2 - .../cassandra/index/accord/RouteIndexTest.java | 31 ++- .../service/accord/AccordJournalOrderTest.java | 2 +- .../service/accord/AccordSyncPropagatorTest.java | 2 +- .../serializers/CommandStoreSerializersTest.java | 2 +- .../apache/cassandra/utils/AccordGenerators.java | 60 +++-- 31 files changed, 475 insertions(+), 276 deletions(-) diff --git a/modules/accord b/modules/accord index 58f107625a..a341979bd8 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 58f107625a183d77b154221efd2f6f0623214027 +Subproject commit a341979bd8fc1d26192cd6bc1edb145e7945e2e9 diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index 50e57ef935..1b8b347ce0 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -810,7 +810,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte return row; RedundantBefore redundantBefore = info.redundantBefore; - RedundantBefore.Entry redundantBeforeEntry = redundantBefore.get(tokenKey.toUnseekable()); + RedundantBefore.Bounds redundantBeforeEntry = redundantBefore.get(tokenKey.toUnseekable()); if (redundantBeforeEntry == null) return row; diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java index ce71048a38..0c66a40d4c 100644 --- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java +++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java @@ -36,10 +36,10 @@ import accord.api.RoutingKey; import accord.impl.DurabilityScheduling; import accord.impl.progresslog.DefaultProgressLog; import accord.impl.progresslog.TxnStateKind; +import accord.local.CommandStore; import accord.local.CommandStores; import accord.local.DurableBefore; import accord.local.MaxConflicts; -import accord.local.RedundantBefore; import accord.local.RejectBefore; import accord.primitives.Status; import accord.primitives.TxnId; @@ -49,11 +49,9 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.db.marshal.Int32Type; -import org.apache.cassandra.db.marshal.TupleType; import org.apache.cassandra.db.marshal.UTF8Type; -import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.LocalPartitioner; import org.apache.cassandra.dht.NormalizedRanges; import org.apache.cassandra.dht.Token; @@ -62,6 +60,7 @@ import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.accord.AccordCache; +import org.apache.cassandra.service.accord.AccordCommandStore; import org.apache.cassandra.service.accord.AccordCommandStores; import org.apache.cassandra.service.accord.AccordExecutor; import org.apache.cassandra.service.accord.AccordKeyspace; @@ -71,14 +70,17 @@ import org.apache.cassandra.service.accord.api.TokenKey; import org.apache.cassandra.service.consensus.migration.ConsensusMigrationState; import org.apache.cassandra.service.consensus.migration.TableMigrationState; import org.apache.cassandra.tcm.ClusterMetadata; -import org.apache.cassandra.utils.Pair; +import static accord.local.RedundantStatus.Property.GC_BEFORE; +import static accord.local.RedundantStatus.Property.LOCALLY_APPLIED; +import static accord.local.RedundantStatus.Property.LOCALLY_REDUNDANT; +import static accord.local.RedundantStatus.Property.LOCALLY_SYNCED; +import static accord.local.RedundantStatus.Property.LOCALLY_WITNESSED; +import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP; +import static accord.local.RedundantStatus.Property.SHARD_ONLY_APPLIED; import static java.lang.String.format; -import static java.util.Comparator.comparing; import static com.google.common.collect.ImmutableList.toImmutableList; -import static accord.utils.async.AsyncChains.getBlockingAndRethrow; import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_ACCORD_DEBUG; -import static org.apache.cassandra.utils.ByteBufferUtil.bytes; import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; public class AccordDebugKeyspace extends VirtualKeyspace @@ -93,10 +95,6 @@ public class AccordDebugKeyspace extends VirtualKeyspace public static final String REJECT_BEFORE = "reject_before"; public static final String TXN_BLOCKED_BY = "txn_blocked_by"; - // {table_id, token} or {table_id, +Inf/-Inf} - private static final TupleType ROUTING_KEY_TYPE = new TupleType(List.of(UUIDType.instance, UTF8Type.instance)); - private static final String ROUTING_KEY_TYPE_STRING = ROUTING_KEY_TYPE.asCQL3Type().toString(); - public static final AccordDebugKeyspace instance = new AccordDebugKeyspace(); private AccordDebugKeyspace() @@ -122,8 +120,11 @@ public class AccordDebugKeyspace extends VirtualKeyspace super(parse(VIRTUAL_ACCORD_DEBUG, DURABILITY_SCHEDULING, "Accord per-Range Durability Scheduling State", "CREATE TABLE %s (\n" + - format("range_start %s,\n", ROUTING_KEY_TYPE_STRING) + - format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) + + " keyspace_name text,\n" + + " table_name text,\n" + + " token_sort blob,\n" + + " token_start text,\n" + + " token_end text,\n" + " node_offset int,\n" + " \"index\" int,\n" + " number_of_splits int,\n" + @@ -131,8 +132,8 @@ public class AccordDebugKeyspace extends VirtualKeyspace " cycle_started_at bigint,\n" + " retry_delay_micros bigint,\n" + " is_defunct boolean,\n" + - " PRIMARY KEY ((range_start, range_end))" + - ')', CompositeType.getInstance(ROUTING_KEY_TYPE, ROUTING_KEY_TYPE))); + " PRIMARY KEY (keyspace_name, table_name, token_start)" + + ')', UTF8Type.instance)); } @Override @@ -143,7 +144,11 @@ public class AccordDebugKeyspace extends VirtualKeyspace SimpleDataSet ds = new SimpleDataSet(metadata()); while (view.advance()) { - ds.row(decompose(view.range().start()), decompose(view.range().end())) + TableId tableId = (TableId) view.range().start().prefix(); + TableMetadata tableMetadata = tableMetadata(tableId); + ds.row(keyspace(tableMetadata), table(tableId, tableMetadata), sortToken(view.range().start())) + .column("start_token", printToken(view.range().start())) + .column("end_token", printToken(view.range().end())) .column("node_offset", view.nodeOffset()) .column("index", view.index()) .column("number_of_splits", view.numberOfSplits()) @@ -163,12 +168,15 @@ public class AccordDebugKeyspace extends VirtualKeyspace super(parse(VIRTUAL_ACCORD_DEBUG, DURABLE_BEFORE, "Accord Node's DurableBefore State", "CREATE TABLE %s (\n" + - format("range_start %s,\n", ROUTING_KEY_TYPE_STRING) + - format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) + + " keyspace_name text,\n" + + " table_name text,\n" + + " token_sort blob,\n" + + " token_start text,\n" + + " token_end text,\n" + " majority_before text,\n" + " universal_before text,\n" + - " PRIMARY KEY ((range_start, range_end))" + - ')', CompositeType.getInstance(ROUTING_KEY_TYPE, ROUTING_KEY_TYPE))); + " PRIMARY KEY (keyspace_name, table_name, token_sort)" + + ')', UTF8Type.instance)); } @Override @@ -177,7 +185,11 @@ public class AccordDebugKeyspace extends VirtualKeyspace DurableBefore durableBefore = AccordService.instance().node().durableBefore(); return durableBefore.foldlWithBounds( (entry, ds, start, end) -> { - ds.row(decompose(start), decompose(end)) + TableId tableId = (TableId) start.prefix(); + TableMetadata tableMetadata = tableMetadata(tableId); + ds.row(keyspace(tableMetadata), table(tableId, tableMetadata), sortToken(start)) + .column("start_token", printToken(start)) + .column("end_token", printToken(end)) .column("majority_before", entry.majorityBefore.toString()) .column("universal_before", entry.universalBefore.toString()); return ds; @@ -237,30 +249,38 @@ public class AccordDebugKeyspace extends VirtualKeyspace super(parse(VIRTUAL_ACCORD_DEBUG, MAX_CONFLICTS, "Accord per-CommandStore MaxConflicts State", "CREATE TABLE %s (\n" + - " command_store_id int,\n" + - format("range_start %s,\n", ROUTING_KEY_TYPE_STRING) + - format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) + + " keyspace_name text,\n" + + " table_name text,\n" + + " token_sort blob,\n" + + " token_start text,\n" + + " token_end text,\n" + + " command_store_id bigint,\n" + " timestamp text,\n" + - " PRIMARY KEY (command_store_id, range_start, range_end)" + - ')', Int32Type.instance)); + " PRIMARY KEY (keyspace_name, table_name, token_sort, command_store_id)" + + ')', UTF8Type.instance)); } @Override public DataSet data() { - CommandStores stores = AccordService.instance().node().commandStores(); - List<Pair<Integer, MaxConflicts>> rangeMaps = - getBlockingAndRethrow(stores.map(store -> Pair.create(store.commandStore().id(), store.commandStore().unsafeGetMaxConflicts()))); - rangeMaps.sort(comparing(p -> p.left)); + CommandStores commandStores = AccordService.instance().node().commandStores(); SimpleDataSet dataSet = new SimpleDataSet(metadata()); - for (Pair<Integer, MaxConflicts> pair : rangeMaps) + for (CommandStore commandStore : commandStores.all()) { - int storeId = pair.left; - MaxConflicts maxConflicts = pair.right; + int commandStoreId = commandStore.id(); + MaxConflicts maxConflicts = commandStore.unsafeGetMaxConflicts(); + TableId tableId = ((AccordCommandStore) commandStore).tableId(); + TableMetadata tableMetadata = tableMetadata(tableId); maxConflicts.foldlWithBounds( - (timestamp, ds, start, end) -> ds.row(storeId, decompose(start), decompose(end)).column("timestamp", timestamp.toString()), + (timestamp, ds, start, end) -> { + return ds.row(keyspace(tableMetadata), table(tableId, tableMetadata), sortToken(start), commandStoreId) + .column("start_token", printToken(start)) + .column("end_token", printToken(end)) + .column("timestamp", timestamp.toString()) + ; + }, dataSet, ignore -> false ); @@ -365,6 +385,8 @@ public class AccordDebugKeyspace extends VirtualKeyspace super(parse(VIRTUAL_ACCORD_DEBUG, PROGRESS_LOG, "Accord per-CommandStore ProgressLog State", "CREATE TABLE %s (\n" + + " keyspace_name text,\n" + + " table_name text,\n" + " command_store_id int,\n" + " txn_id text,\n" + // Timer + BaseTxnState @@ -382,25 +404,23 @@ public class AccordDebugKeyspace extends VirtualKeyspace " home_progress text,\n" + " home_retry_counter int,\n" + " home_scheduled_at timestamp,\n" + - " PRIMARY KEY (command_store_id, txn_id)" + - ')', Int32Type.instance)); + " PRIMARY KEY (keyspace_name, table_name, command_store_id, txn_id)" + + ')', UTF8Type.instance)); } @Override public DataSet data() { - CommandStores stores = AccordService.instance().node().commandStores(); - List<DefaultProgressLog.ImmutableView> views = - getBlockingAndRethrow(stores.map(store -> ((DefaultProgressLog) store.progressLog()).immutableView())); - views.sort(comparing(DefaultProgressLog.ImmutableView::storeId)); - + CommandStores commandStores = AccordService.instance().node().commandStores(); SimpleDataSet ds = new SimpleDataSet(metadata()); - for (int i = 0, size = views.size(); i < size; ++i) + for (CommandStore commandStore : commandStores.all()) { - DefaultProgressLog.ImmutableView view = views.get(i); + DefaultProgressLog.ImmutableView view = (DefaultProgressLog.ImmutableView) commandStore.unsafeProgressLog(); + TableId tableId = ((AccordCommandStore)commandStore).tableId(); + TableMetadata tableMetadata = tableMetadata(tableId); while (view.advance()) { - ds.row(view.storeId(), view.txnId().toString()) + ds.row(keyspace(tableMetadata), table(tableId, tableMetadata), view.commandStoreId(), view.txnId().toString()) .column("contact_everyone", view.contactEveryone()) .column("waiting_is_uninitialised", view.isWaitingUninitialised()) .column("waiting_blocked_until", view.waitingIsBlockedUntil().name()) @@ -436,47 +456,53 @@ public class AccordDebugKeyspace extends VirtualKeyspace super(parse(VIRTUAL_ACCORD_DEBUG, REDUNDANT_BEFORE, "Accord per-CommandStore RedundantBefore State", "CREATE TABLE %s (\n" + - " command_store_id int,\n" + - format("range_start %s,\n", ROUTING_KEY_TYPE_STRING) + - format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) + - " start_ownership_epoch bigint,\n" + - " end_ownership_epoch bigint,\n" + - " locally_applied_or_invalidated_before text,\n" + - " locally_decided_and_applied_or_invalidated_before text,\n" + - " shard_applied_or_invalidated_before text,\n" + + " keyspace_name text,\n" + + " table_name text,\n" + + " token_sort blob,\n" + + " token_start text,\n" + + " token_end text,\n" + + " command_store_id bigint,\n" + + " start_epoch bigint,\n" + + " end_epoch bigint,\n" + " gc_before text,\n" + - " shard_only_applied_or_invalidated_before text,\n" + - " bootstrapped_at text,\n" + + " shard_only_applied text,\n" + + " locally_applied text,\n" + + " locally_synced text,\n" + + " locally_redundant text,\n" + + " locally_witnessed text,\n" + + " pre_bootstrap text,\n" + " stale_until_at_least text,\n" + - " PRIMARY KEY (command_store_id, range_start, range_end)" + - ')', Int32Type.instance)); + " PRIMARY KEY (keyspace_name, table_name, token_sort, command_store_id)" + + ')', UTF8Type.instance)); } @Override public DataSet data() { - CommandStores stores = AccordService.instance().node().commandStores(); - List<Pair<Integer, RedundantBefore>> rangeMaps = - getBlockingAndRethrow(stores.map(store -> Pair.create(store.commandStore().id(), store.commandStore().unsafeGetRedundantBefore()))); - rangeMaps.sort(comparing(p -> p.left)); + CommandStores commandStores = AccordService.instance().node().commandStores(); SimpleDataSet dataSet = new SimpleDataSet(metadata()); - for (Pair<Integer, RedundantBefore> pair : rangeMaps) + for (CommandStore commandStore : commandStores.all()) { - int storeId = pair.left; - RedundantBefore redundantBefore = pair.right; - - redundantBefore.foldlWithBounds( - (entry, ds, start, end) -> { - ds.row(storeId, decompose(start), decompose(end)) - .column("start_ownership_epoch", entry.startOwnershipEpoch) - .column("end_ownership_epoch", entry.endOwnershipEpoch) - .column("locally_applied_before", entry.locallyAppliedBefore.toString()) - .column("locally_decided_and_applied_before", entry.locallyDecidedAndAppliedBefore.toString()) - .column("shard_applied_before", entry.shardAppliedBefore.toString()) - .column("gc_before", entry.gcBefore.toString()) - .column("shard_only_applied_before", entry.shardOnlyAppliedBefore.toString()) - .column("bootstrapped_at", entry.bootstrappedAt.toString()) + int commandStoreId = commandStore.id(); + TableId tableId = ((AccordCommandStore)commandStore).tableId(); + TableMetadata tableMetadata = tableMetadata(tableId); + String keyspace = keyspace(tableMetadata); + String table = table(tableId, tableMetadata); + commandStore.unsafeGetRedundantBefore().foldl( + (entry, ds) -> { + ds.row(keyspace, table, sortToken(entry.range.start()), commandStoreId) + .column("start_token", printToken(entry.range.start())) + .column("end_token", printToken(entry.range.end())) + .column("start_epoch", entry.startEpoch) + .column("end_epoch", entry.endEpoch) + .column("gc_before", entry.maxBound(GC_BEFORE).toString()) + .column("shard_only_applied", entry.maxBound(SHARD_ONLY_APPLIED).toString()) + .column("locally_applied", entry.maxBound(LOCALLY_APPLIED).toString()) + .column("locally_synced", entry.maxBound(LOCALLY_SYNCED).toString()) + .column("locally_redundant", entry.maxBound(LOCALLY_REDUNDANT).toString()) + .column("locally_witnessed", entry.maxBound(LOCALLY_WITNESSED).toString()) + .column("pre_bootstrap", entry.maxBound(PRE_BOOTSTRAP).toString()) .column("stale_until_at_least", entry.staleUntilAtLeast != null ? entry.staleUntilAtLeast.toString() : null); return ds; }, @@ -495,33 +521,38 @@ public class AccordDebugKeyspace extends VirtualKeyspace super(parse(VIRTUAL_ACCORD_DEBUG, REJECT_BEFORE, "Accord per-CommandStore RejectBefore State", "CREATE TABLE %s (\n" + + " keyspace_name text,\n" + + " table_name text,\n" + + " token_sort blob,\n" + + " token_start text,\n" + + " token_end text,\n" + " command_store_id int,\n" + - format("range_start %s,\n", ROUTING_KEY_TYPE_STRING) + - format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) + " txn_id text,\n" + - " PRIMARY KEY (command_store_id, range_start, range_end)" + - ')', Int32Type.instance)); + " PRIMARY KEY (keyspace_name, table_name, token_sort, command_store_id)" + + ')', UTF8Type.instance)); } @Override public DataSet data() { - CommandStores stores = AccordService.instance().node().commandStores(); - List<Pair<Integer, RejectBefore>> rangeMaps = - getBlockingAndRethrow(stores.map(store -> Pair.create(store.commandStore().id(), store.commandStore().unsafeGetRejectBefore()))); - rangeMaps.sort(comparing(p -> p.left)); - + CommandStores commandStores = AccordService.instance().node().commandStores(); SimpleDataSet dataSet = new SimpleDataSet(metadata()); - for (Pair<Integer, RejectBefore> pair : rangeMaps) + for (CommandStore commandStore : commandStores.all()) { - int storeId = pair.left; - RejectBefore rejectBefore = pair.right; - + RejectBefore rejectBefore = commandStore.unsafeGetRejectBefore(); if (rejectBefore == null) continue; + TableId tableId = ((AccordCommandStore)commandStore).tableId(); + TableMetadata tableMetadata = tableMetadata(tableId); + String keyspace = keyspace(tableMetadata); + String table = table(tableId, tableMetadata); rejectBefore.foldlWithBounds( - (txnId, ds, start, end) -> ds.row(storeId, decompose(start), decompose(end)).column("txn_id", txnId.toString()), + (txnId, ds, start, end) -> ds.row(keyspace, table, sortToken(start), commandStore.id()) + .column("token_start", printToken(start)) + .column("token_end", printToken(end)) + .column("txn_id", txnId.toString()) + , dataSet, ignore -> false ); @@ -540,14 +571,16 @@ public class AccordDebugKeyspace extends VirtualKeyspace "Accord Transactions Blocked By Table" , "CREATE TABLE %s (\n" + " txn_id text,\n" + + " keyspace_name text,\n" + + " table_name text,\n" + " command_store_id int,\n" + " depth int,\n" + " blocked_by text,\n" + " reason text,\n" + " save_status text,\n" + " execute_at text,\n" + - format("key %s,\n", ROUTING_KEY_TYPE_STRING) + - " PRIMARY KEY (txn_id, command_store_id, depth, blocked_by, reason)" + + " key text,\n" + + " PRIMARY KEY (txn_id, keyspace_name, table_name, command_store_id, depth, blocked_by, reason)" + ')', UTF8Type.instance)); } @@ -558,10 +591,11 @@ public class AccordDebugKeyspace extends VirtualKeyspace List<CommandStoreTxnBlockedGraph> shards = AccordService.instance().debugTxnBlockedGraph(id); SimpleDataSet ds = new SimpleDataSet(metadata()); + CommandStores commandStores = AccordService.instance().node().commandStores(); for (CommandStoreTxnBlockedGraph shard : shards) { Set<TxnId> processed = new HashSet<>(); - process(ds, shard, processed, id, 0, id, Reason.Self, null); + process(ds, commandStores, shard, processed, id, 0, id, Reason.Self, null); // everything was processed right? if (!shard.txns.isEmpty() && !shard.txns.keySet().containsAll(processed)) throw new IllegalStateException("Skipped txns: " + Sets.difference(shard.txns.keySet(), processed)); @@ -570,7 +604,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace return ds; } - private void process(SimpleDataSet ds, CommandStoreTxnBlockedGraph shard, Set<TxnId> processed, TxnId userTxn, int depth, TxnId txnId, Reason reason, Runnable onDone) + private void process(SimpleDataSet ds, CommandStores commandStores, CommandStoreTxnBlockedGraph shard, Set<TxnId> processed, TxnId userTxn, int depth, TxnId txnId, Reason reason, Runnable onDone) { if (!processed.add(txnId)) throw new IllegalStateException("Double processed " + txnId); @@ -583,7 +617,10 @@ public class AccordDebugKeyspace extends VirtualKeyspace // was it applied? If so ignore it if (reason != Reason.Self && txn.saveStatus.hasBeen(Status.Applied)) return; - ds.row(userTxn.toString(), shard.storeId, depth, reason == Reason.Self ? "" : txn.txnId.toString(), reason.name()); + TableId tableId = tableId(shard.commandStoreId, commandStores); + TableMetadata tableMetadata = tableMetadata(tableId); + ds.row(userTxn.toString(), keyspace(tableMetadata), table(tableId, tableMetadata), + shard.commandStoreId, depth, reason == Reason.Self ? "" : txn.txnId.toString(), reason.name()); ds.column("save_status", txn.saveStatus.name()); if (txn.executeAt != null) ds.column("execute_at", txn.executeAt.toString()); @@ -594,14 +631,14 @@ public class AccordDebugKeyspace extends VirtualKeyspace for (TxnId blockedBy : txn.blockedBy) { if (!processed.contains(blockedBy)) - process(ds, shard, processed, userTxn, depth + 1, blockedBy, Reason.Txn, null); + process(ds, commandStores, shard, processed, userTxn, depth + 1, blockedBy, Reason.Txn, null); } for (TokenKey blockedBy : txn.blockedByKey) { TxnId blocking = shard.keys.get(blockedBy); if (!processed.contains(blocking)) - process(ds, shard, processed, userTxn, depth + 1, blocking, Reason.Key, () -> ds.column("key", decompose(blockedBy))); + process(ds, commandStores, shard, processed, userTxn, depth + 1, blocking, Reason.Key, () -> ds.column("key", printToken(blockedBy))); } } } @@ -613,10 +650,46 @@ public class AccordDebugKeyspace extends VirtualKeyspace } } - private static ByteBuffer decompose(RoutingKey routingKey) + private static TableId tableId(int commandStoreId, CommandStores commandStores) + { + AccordCommandStore commandStore = (AccordCommandStore) commandStores.forId(commandStoreId); + if (commandStore == null) + return null; + return commandStore.tableId(); + } + + private static TableMetadata tableMetadata(TableId tableId) + { + if (tableId == null) + return null; + return Schema.instance.getTableMetadata(tableId); + } + + private static String keyspace(TableMetadata metadata) + { + return metadata == null ? "Unknown" : metadata.keyspace; + } + + private static String table(TableId tableId, TableMetadata metadata) + { + return metadata == null ? tableId.toString() : metadata.name; + } + + private static String printToken(RoutingKey routingKey) + { + TokenKey key = (TokenKey) routingKey; + return key.token().getPartitioner().getTokenFactory().toString(key.token()); + } + + private static ByteBuffer sortToken(RoutingKey routingKey) { TokenKey key = (TokenKey) routingKey; - return ROUTING_KEY_TYPE.pack(UUIDType.instance.decompose(key.table().asUUID()), bytes(key.suffix().toString())); + Token token = key.token(); + IPartitioner partitioner = token.getPartitioner(); + ByteBuffer out = ByteBuffer.allocate(partitioner.accordSerializedSize(token)); + partitioner.accordSerialize(token, out); + out.flip(); + return out; } private static TableMetadata parse(String keyspace, String table, String comment, String schema, AbstractType<?> partitionKeyType) diff --git a/src/java/org/apache/cassandra/index/accord/RangeMemoryIndex.java b/src/java/org/apache/cassandra/index/accord/RangeMemoryIndex.java index 604d9539d6..43eba0f806 100644 --- a/src/java/org/apache/cassandra/index/accord/RangeMemoryIndex.java +++ b/src/java/org/apache/cassandra/index/accord/RangeMemoryIndex.java @@ -52,7 +52,7 @@ import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.RTree; import org.apache.cassandra.utils.RangeTree; -import static org.apache.cassandra.index.accord.RouteIndexFormat.deserializeRoute; +import static org.apache.cassandra.index.accord.RouteIndexFormat.deserializeParticipants; public class RangeMemoryIndex { @@ -110,7 +110,7 @@ public class RangeMemoryIndex Route<?> route; try { - route = deserializeRoute(value); + route = deserializeParticipants(value); } catch (IOException e) { diff --git a/src/java/org/apache/cassandra/index/accord/RouteIndexFormat.java b/src/java/org/apache/cassandra/index/accord/RouteIndexFormat.java index ddec40e17f..1821e2c162 100644 --- a/src/java/org/apache/cassandra/index/accord/RouteIndexFormat.java +++ b/src/java/org/apache/cassandra/index/accord/RouteIndexFormat.java @@ -34,6 +34,7 @@ import java.util.zip.Checksum; import com.google.common.collect.Maps; import accord.local.StoreParticipants; +import accord.primitives.Participants; import accord.primitives.Route; import accord.primitives.TxnId; import org.apache.cassandra.db.DecoratedKey; @@ -71,30 +72,30 @@ public class RouteIndexFormat { public static final Supplier<Checksum> CHECKSUM_SUPPLIER = CRC32C::new; - static final LocalVersionedSerializer<Route<?>> route = localSerializer(KeySerializers.route); + static final LocalVersionedSerializer<Participants<?>> participants = localSerializer(KeySerializers.participants); private static <T> LocalVersionedSerializer<T> localSerializer(IVersionedSerializer<T> serializer) { return new LocalVersionedSerializer<>(AccordSerializerVersion.CURRENT, AccordSerializerVersion.serializer, serializer); } - public static ByteBuffer serialize(Route<?> value) throws IOException + public static ByteBuffer serialize(Participants<?> value) throws IOException { - int size = Math.toIntExact(route.serializedSize(value)); + int size = Math.toIntExact(participants.serializedSize(value)); try (DataOutputBuffer buffer = new DataOutputBuffer(size)) { - route.serialize(value, buffer); + participants.serialize(value, buffer); return buffer.buffer(true); } } - static Route<?> deserializeRoute(ByteBuffer bytes) throws IOException + static Route<?> deserializeParticipants(ByteBuffer bytes) throws IOException { if (bytes == null || ByteBufferAccessor.instance.isEmpty(bytes)) return null; try (DataInputBuffer in = new DataInputBuffer(bytes, true)) { - MessageVersionProvider versionProvider = route.deserializeVersion(in); + MessageVersionProvider versionProvider = participants.deserializeVersion(in); return KeySerializers.route.deserialize(in, versionProvider.messageVersion()); } } @@ -163,13 +164,14 @@ public class RouteIndexFormat StoreParticipants participants = builder.participants(); if (participants == null) return null; - Route<?> route = participants.route(); - if (route == null) + + Participants<?> touches = participants.touches(); + if (touches == null) return null; try { - return serialize(participants.route()); + return serialize(touches); } catch (IOException e) { diff --git a/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java b/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java index 144b1cddea..aa97dd981e 100644 --- a/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java +++ b/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java @@ -94,8 +94,7 @@ import static accord.primitives.Routable.Domain.Range; public class RouteJournalIndex implements Index, INotificationConsumer { - public enum RegisterStatus - {PENDING, REGISTERED, UNREGISTERED} + public enum RegisterStatus { PENDING, REGISTERED, UNREGISTERED } private static final Logger logger = LoggerFactory.getLogger(RouteJournalIndex.class); diff --git a/src/java/org/apache/cassandra/journal/Flusher.java b/src/java/org/apache/cassandra/journal/Flusher.java index 07c7516d48..6a2a387c32 100644 --- a/src/java/org/apache/cassandra/journal/Flusher.java +++ b/src/java/org/apache/cassandra/journal/Flusher.java @@ -149,7 +149,7 @@ final class Flusher<K, V> return; awaitingWork = Thread.currentThread(); - do + while (true) { if (Thread.interrupted()) { @@ -157,9 +157,11 @@ final class Flusher<K, V> throw new InterruptedException(); } + if (fsyncWaitingSince != lastStartedAt) + break; + LockSupport.park(); } - while (fsyncWaitingSince == lastStartedAt); awaitingWork = null; } diff --git a/src/java/org/apache/cassandra/journal/Journal.java b/src/java/org/apache/cassandra/journal/Journal.java index 08626b441f..98150f1cee 100644 --- a/src/java/org/apache/cassandra/journal/Journal.java +++ b/src/java/org/apache/cassandra/journal/Journal.java @@ -146,7 +146,7 @@ public class Journal<K, V> implements Shutdownable @Override public void onFlushFailed(Throwable cause) { - // TODO: panic + // TODO (required): panic } private void submit(RecordPointer pointer, Runnable runnable) diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java index 77f39f74bf..7355e7f2da 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java @@ -25,6 +25,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.Lock; import java.util.function.Consumer; import java.util.function.Function; @@ -61,6 +63,7 @@ import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.accord.AccordKeyspace.CommandsForKeyAccessor; +import org.apache.cassandra.service.accord.IAccordService.AccordCompactionInfo; import org.apache.cassandra.service.accord.api.TokenKey; import org.apache.cassandra.service.accord.txn.TxnRead; import org.apache.cassandra.utils.Clock; @@ -134,6 +137,10 @@ public class AccordCommandStore extends CommandStore } } + static final AtomicReferenceFieldUpdater<AccordCommandStore, SafeRedundantBefore> safeRedundantBeforeUpdater + = AtomicReferenceFieldUpdater.newUpdater(AccordCommandStore.class, SafeRedundantBefore.class, "safeRedundantBefore"); + static final AtomicLong nextSafeRedundantBeforeTicket = new AtomicLong(); + public final String loggingId; private final Journal journal; private final RangeSearcher rangeSearcher; @@ -143,6 +150,7 @@ public class AccordCommandStore extends CommandStore private long lastSystemTimestampMicros = Long.MIN_VALUE; private final CommandsForRanges.Manager commandsForRanges; private final TableId tableId; + volatile SafeRedundantBefore safeRedundantBefore; private AccordSafeCommandStore current; private Thread currentThread; @@ -384,7 +392,6 @@ public class AccordCommandStore extends CommandStore Invariants.require(thread == null ? currentThread == self : currentThread == null); currentThread = thread; if (thread != null) CommandStore.register(this); - } public boolean hasSafeStore() @@ -484,6 +491,15 @@ public class AccordCommandStore extends CommandStore return journal.loadMinimal(id, txnId, MINIMAL, unsafeGetRedundantBefore(), durableBefore()); } + public AccordCompactionInfo getCompactionInfo() + { + SafeRedundantBefore safeRedundantBefore = this.safeRedundantBefore; + RedundantBefore redundantBefore; + if (safeRedundantBefore == null) redundantBefore = RedundantBefore.EMPTY; + else redundantBefore = safeRedundantBefore.redundantBefore; + return new AccordCompactionInfo(id, redundantBefore, rangesForEpoch, tableId); + } + public RangeSearcher rangeSearcher() { return rangeSearcher; @@ -494,6 +510,12 @@ public class AccordCommandStore extends CommandStore return loader; } + @VisibleForTesting + public void unsafeUpsertRedundantBefore(RedundantBefore addRedundantBefore) + { + super.unsafeUpsertRedundantBefore(addRedundantBefore); + } + private static class CommandStoreLoader extends AbstractLoader { private final AccordCommandStore store; @@ -522,7 +544,11 @@ public class AccordCommandStore extends CommandStore void maybeLoadRedundantBefore(RedundantBefore redundantBefore) { if (redundantBefore != null) + { loadRedundantBefore(redundantBefore); + Invariants.require(safeRedundantBefore == null); + safeRedundantBefore = new SafeRedundantBefore(0, redundantBefore); + } } void maybeLoadBootstrapBeganAt(NavigableMap<TxnId, Ranges> bootstrapBeganAt) @@ -542,4 +568,24 @@ public class AccordCommandStore extends CommandStore if (rangesForEpoch != null) loadRangesForEpoch(rangesForEpoch); } + + // TODO (expected): handle journal failures, and consider how we handle partial failures. + // Very likely we will not be able to safely or cleanly handle partial failures of this logic, but decide and document. + // TODO (desired): consider merging with PersistentField? This version is cheaper to manage which may be preferable at the CommandStore level. + static class SafeRedundantBefore + { + final long ticket; + final RedundantBefore redundantBefore; + + SafeRedundantBefore(long ticket, RedundantBefore redundantBefore) + { + this.ticket = ticket; + this.redundantBefore = redundantBefore; + } + + static SafeRedundantBefore max(SafeRedundantBefore a, SafeRedundantBefore b) + { + return a.ticket >= b.ticket ? a : b; + } + } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordFetchCoordinator.java b/src/java/org/apache/cassandra/service/accord/AccordFetchCoordinator.java index f56c388012..b9ec97b68a 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordFetchCoordinator.java +++ b/src/java/org/apache/cassandra/service/accord/AccordFetchCoordinator.java @@ -421,11 +421,11 @@ public class AccordFetchCoordinator extends AbstractFetchCoordinator implements } @Override - protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp executeAt, PartialTxn txn, Ranges unavailable) + protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp executeAt, PartialTxn txn, Participants<?> execute) { - AsyncChain<Data> result = super.beginRead(safeStore, executeAt, txn, unavailable); + AsyncChain<Data> result = super.beginRead(safeStore, executeAt, txn, execute); // TODO (required): verify that streaming snapshots have all been created by now, so we won't stream any data that arrives after this - readStarted(safeStore, unavailable); + readStarted(safeStore); return result; } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index 43ea8add3b..b676b110e3 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -313,7 +313,7 @@ public class AccordKeyspace } // TODO (expected): garbage-free filtering, reusing encoding - public Row withoutRedundantCommands(TokenKey key, Row row, RedundantBefore.Entry redundantBefore) + public Row withoutRedundantCommands(TokenKey key, Row row, RedundantBefore.Bounds redundantBefore) { Invariants.require(row.columnCount() == 1); Cell<?> cell = row.getCell(data); diff --git a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java index da27eca0f3..9036116067 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java +++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java @@ -293,7 +293,7 @@ public class AccordObjectSizes FullKeyRoute route = new FullKeyRoute(EMPTY_KEY, new RoutingKey[]{ EMPTY_KEY }); Participants<?> empty = route.slice(0, 0); ICommand.Builder builder = new ICommand.Builder(EMPTY_TXNID) - .setParticipants(StoreParticipants.create(route, empty, executes ? empty : null, empty, route)) + .setParticipants(StoreParticipants.create(route, empty, executes ? empty : null, executes ? empty : null, empty, route)) .durability(Status.Durability.NotDurable) .executeAt(EMPTY_TXNID) .promised(Ballot.ZERO); diff --git a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java index 8e4948a49a..89f3214dc7 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java @@ -28,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting; import accord.api.Agent; import accord.api.DataStore; +import accord.api.Journal.FieldUpdates; import accord.api.ProgressLog; import accord.api.RoutingKey; import accord.impl.AbstractSafeCommandStore; @@ -40,6 +41,7 @@ import accord.primitives.Txn; import accord.primitives.TxnId; import accord.primitives.Unseekables; import org.apache.cassandra.service.accord.AccordCommandStore.ExclusiveCaches; +import org.apache.cassandra.service.accord.AccordCommandStore.SafeRedundantBefore; import static accord.utils.Invariants.illegalState; @@ -115,6 +117,34 @@ public class AccordSafeCommandStore extends AbstractSafeCommandStore<AccordSafeC } } + @Override + protected void persistFieldUpdates() + { + super.persistFieldUpdates(); + } + + protected void persistFieldUpdatesInternal(Runnable onDone) + { + FieldUpdates updates = fieldUpdates(); + if (updates == null) + return; + + if (updates.newRedundantBefore != null) + { + long ticket = AccordCommandStore.nextSafeRedundantBeforeTicket.incrementAndGet(); + SafeRedundantBefore update = new SafeRedundantBefore(ticket, updates.newRedundantBefore); + Runnable reportRedundantBefore = () -> { + AccordCommandStore.safeRedundantBeforeUpdater.accumulateAndGet((AccordCommandStore)commandStore, update, SafeRedundantBefore::max); + }; + Runnable prevOnDone = onDone; + onDone = prevOnDone == null ? reportRedundantBefore : () -> { + try { reportRedundantBefore.run(); } + finally { prevOnDone.run(); } + }; + } + commandStore.persistFieldUpdates(updates, onDone); + } + protected AccordSafeCommandsForKey add(AccordSafeCommandsForKey safeCfk, ExclusiveCaches caches) { Object check = task.ensureCommandsForKey().putIfAbsent(safeCfk.key(), safeCfk); diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 26dfa7dc66..bd42e666b7 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -360,6 +360,11 @@ public class AccordService implements IAccordService, Shutdownable @Override public synchronized void startup() + { + unsafeStartupWithOverrides(null); + } + + public synchronized void unsafeStartupWithOverrides(@Nullable Journal.TopologyUpdate overrideNullTopologyUpdate) { if (state != State.INIT) return; @@ -380,6 +385,8 @@ public class AccordService implements IAccordService, Shutdownable Invariants.require(lastSeen == null || update.global.epoch() > lastSeen.global.epoch()); lastSeen = update; } + if (lastSeen == null) + lastSeen = overrideNullTopologyUpdate; if (lastSeen != null) { @@ -1328,21 +1335,9 @@ public class AccordService implements IAccordService, Shutdownable public AccordCompactionInfos getCompactionInfo() { AccordCompactionInfos compactionInfos = new AccordCompactionInfos(node.durableBefore()); - if (node.commandStores().all().length > 0) - { - AsyncChains.getBlockingAndRethrow(node.commandStores().forEach(safeStore -> { - synchronized (compactionInfos) - { - int id = safeStore.commandStore().id(); - compactionInfos.put(id, new AccordCompactionInfo( - id, - safeStore.redundantBefore(), - safeStore.ranges(), - ((AccordCommandStore)safeStore.commandStore()).tableId() - )); - } - })); - } + node.commandStores().forEachCommandStore(commandStore -> { + compactionInfos.put(commandStore.id(), ((AccordCommandStore)commandStore).getCompactionInfo()); + }); return compactionInfos; } diff --git a/src/java/org/apache/cassandra/service/accord/AccordTask.java b/src/java/org/apache/cassandra/service/accord/AccordTask.java index 209bff7cdd..38bf137f10 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordTask.java +++ b/src/java/org/apache/cassandra/service/accord/AccordTask.java @@ -685,10 +685,8 @@ public abstract class AccordTask<R> extends Task implements Runnable, Function<S { state(PERSISTING); Runnable onFlush = () -> finish(result, null); - if (safeStore.fieldUpdates() != null) - commandStore.persistFieldUpdates(safeStore.fieldUpdates(), changes == null ? onFlush : null); - if (changes != null) - save(changes, onFlush); + safeStore.persistFieldUpdatesInternal(changes == null ? onFlush : null); + if (changes != null) save(changes, onFlush); } commandStore.complete(safeStore); diff --git a/src/java/org/apache/cassandra/service/accord/AccordTopology.java b/src/java/org/apache/cassandra/service/accord/AccordTopology.java index 140eb7395c..c78a787d99 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordTopology.java +++ b/src/java/org/apache/cassandra/service/accord/AccordTopology.java @@ -41,6 +41,7 @@ import accord.topology.Shard; import accord.topology.Topology; import accord.utils.Invariants; import accord.utils.SortedArrays.SortedArrayList; +import accord.utils.TinyEnumSet; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.IPartitioner; @@ -80,18 +81,18 @@ public class AccordTopology private static class ShardLookup extends HashMap<accord.primitives.Range, Shard> { - private Shard createOrReuse(boolean pendingRemoval, accord.primitives.Range range, SortedArrayList<Id> nodes, SortedArrayList<Id> fastPath, Set<Id> joining) + private Shard createOrReuse(TinyEnumSet<Shard.Flag> flags, accord.primitives.Range range, SortedArrayList<Id> nodes, SortedArrayList<Id> fastPath, Set<Id> joining) { Shard prev = get(range); if (prev != null - && prev.pendingRemoval == pendingRemoval + && prev.flags().bitset() == flags.bitset() && prev.nodes.equals(nodes) && prev.fastPathElectorateSize == fastPath.size() && prev.nodes.without(prev.notInFastPath).equals(fastPath) && joining.size() == prev.joining.size() && prev.joining.containsAll(joining)) return prev; - return Shard.create(range, nodes, fastPath, joining, pendingRemoval); + return Shard.create(range, nodes, fastPath, joining, flags); } } @@ -120,7 +121,7 @@ public class AccordTopology return strategy; } - List<Shard> createForTable(TableMetadata metadata, Set<Id> unavailable, Map<Id, String> dcMap, ShardLookup lookup) + List<Shard> createForTable(Epoch epoch, TableMetadata metadata, Set<Id> unavailable, Map<Id, String> dcMap, ShardLookup lookup) { Ranges ranges = this.ranges.stream() .map(range -> Ranges.single(AccordTopology.range(metadata.id, range))) @@ -131,7 +132,14 @@ public class AccordTopology List<Shard> shards = new ArrayList<>(ranges.size()); for (accord.primitives.Range range : ranges) - shards.add(lookup.createOrReuse(metadata.params.pendingDrop, range, nodes, electorate, pending)); + { + TinyEnumSet<Shard.Flag> flags = Shard.NO_FLAGS; + if (metadata.params.pendingDrop) + flags = flags.with(Shard.Flag.PENDING_REMOVAL); + if (metadata.epoch.isEqualOrAfter(epoch)) + flags = flags.with(Shard.Flag.MUST_WITNESS); + shards.add(lookup.createOrReuse(flags, range, nodes, electorate, pending)); + } return shards; } @@ -296,7 +304,7 @@ public class AccordTopology if (tables.isEmpty()) continue; List<KeyspaceShard> ksShards = KeyspaceShard.forKeyspace(keyspace, placements, directory); - tables.forEach(table -> ksShards.forEach(shard -> res.addAll(shard.createForTable(table, unavailable, dcMap, lookup)))); + tables.forEach(table -> ksShards.forEach(shard -> res.addAll(shard.createForTable(epoch, table, unavailable, dcMap, lookup)))); } res.sort((a, b) -> a.range.compare(b.range)); diff --git a/src/java/org/apache/cassandra/service/accord/CommandStoreTxnBlockedGraph.java b/src/java/org/apache/cassandra/service/accord/CommandStoreTxnBlockedGraph.java index bae7d6b479..40f2c3703e 100644 --- a/src/java/org/apache/cassandra/service/accord/CommandStoreTxnBlockedGraph.java +++ b/src/java/org/apache/cassandra/service/accord/CommandStoreTxnBlockedGraph.java @@ -36,13 +36,13 @@ import org.apache.cassandra.service.accord.api.TokenKey; public class CommandStoreTxnBlockedGraph { - public final int storeId; + public final int commandStoreId; public final Map<TxnId, TxnState> txns; public final Map<TokenKey, TxnId> keys; public CommandStoreTxnBlockedGraph(Builder builder) { - storeId = builder.storeId; + commandStoreId = builder.storeId; txns = ImmutableMap.copyOf(builder.txns); keys = ImmutableMap.copyOf(builder.keys); } diff --git a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java index 6ccf8c4c03..0998c72634 100644 --- a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java +++ b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java @@ -34,11 +34,11 @@ import accord.local.Node; import accord.local.SafeCommandStore; import accord.messages.MessageType; import accord.messages.ReadData; +import accord.primitives.AbstractRanges; import accord.primitives.PartialTxn; import accord.primitives.Participants; import accord.primitives.Range; import accord.primitives.Ranges; -import accord.primitives.Routables.Slice; import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.topology.Topologies; @@ -241,38 +241,34 @@ public class AccordInteropRead extends ReadData } @Override - protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp executeAt, PartialTxn txn, Ranges unavailable) + protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp executeAt, PartialTxn txn, Participants<?> execute) { TxnRead txnRead = (TxnRead)txn.read(); - Ranges ranges = safeStore.ranges().allAt(executeAt).without(unavailable).intersecting(scope, Slice.Minimal); long nowInSeconds = TxnNamedRead.nowInSeconds(executeAt); - List<AsyncChain<Data>> chains = new ArrayList<>(ranges.size()); - for (Range r : ranges) + if (!command.isRangeRequest()) { - ReadCommand readCommand = this.command; - TokenKey routingKey = null; - final ReadCommand readCommandFinal; - if (readCommand.isRangeRequest()) - { - // This path can have a subrange we have never seen before provided by short read protection or read repair so we need to - // calculate the intersection with this instance of the command store and the actual command if it is not empty we - // will need to execute it - TokenRange commandRange = TxnNamedRead.boundsAsAccordRange(readCommand.dataRange().keyRange(), readCommand.metadata().id); - Range intersection = commandRange.intersection(r); - if (intersection == null) - continue; - readCommandFinal = TxnNamedRead.commandForSubrange((PartitionRangeReadCommand) readCommand, intersection, txnRead.cassandraConsistencyLevel(), nowInSeconds); - routingKey = ((TokenRange)r).start(); - } - else - { - SinglePartitionReadCommand singlePartitionReadCommand = ((SinglePartitionReadCommand)readCommand); - if (!r.contains(new TokenKey(singlePartitionReadCommand.metadata().id, singlePartitionReadCommand.partitionKey().getToken()))) - continue; - readCommandFinal = ((SinglePartitionReadCommand)readCommand).withTransactionalSettings(TxnNamedRead.readsWithoutReconciliation(txnRead.cassandraConsistencyLevel()), nowInSeconds); - } - TokenKey routingKeyFinal = routingKey; - chains.add(AsyncChains.ofCallable(Stage.READ.executor(), () -> new LocalReadData(routingKeyFinal, ReadCommandVerbHandler.instance.doRead(readCommandFinal, false), readCommand))); + SinglePartitionReadCommand readCommand = ((SinglePartitionReadCommand)command); + TokenKey key = new TokenKey(readCommand.metadata().id, readCommand.partitionKey().getToken()); + if (!execute.contains(key)) + return AsyncChains.success(null); + + ReadCommand submit = readCommand.withTransactionalSettings(TxnNamedRead.readsWithoutReconciliation(txnRead.cassandraConsistencyLevel()), nowInSeconds); + return AsyncChains.ofCallable(Stage.READ.executor(), () -> new LocalReadData(key, ReadCommandVerbHandler.instance.doRead(submit, false), command)); + } + + // This path can have a subrange we have never seen before provided by short read protection or read repair so we need to + // calculate the intersection with this instance of the command store and the actual command if it is not empty we + // will need to execute it + TokenRange commandRange = TxnNamedRead.boundsAsAccordRange(command.dataRange().keyRange(), command.metadata().id); + List<AsyncChain<Data>> chains = new ArrayList<>(execute.size()); + for (Range r : (AbstractRanges)execute) + { + Range intersection = commandRange.intersection(r); + if (intersection == null) + continue; + ReadCommand submit = TxnNamedRead.commandForSubrange((PartitionRangeReadCommand) command, intersection, txnRead.cassandraConsistencyLevel(), nowInSeconds); + TokenKey routingKey = ((TokenRange)r).start(); + chains.add(AsyncChains.ofCallable(Stage.READ.executor(), () -> new LocalReadData(routingKey, ReadCommandVerbHandler.instance.doRead(submit, false), command))); } if (chains.isEmpty()) diff --git a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java index 81f652351f..be71dc0953 100644 --- a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java +++ b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java @@ -146,7 +146,7 @@ public class AccordInteropReadRepair extends ReadData } @Override - protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp executeAt, PartialTxn txn, Ranges unavailable) + protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp executeAt, PartialTxn txn, Participants<?> execute) { // TODO (required): subtract unavailable ranges, either from read or from response (or on coordinator) return AsyncChains.ofCallable(Verb.READ_REPAIR_REQ.stage.executor(), () -> { diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java index bf349b988b..23d7dd33f1 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java @@ -328,6 +328,7 @@ public class CommandSerializers static final int OWNS_EQUALS_TOUCHES = 0x8; static final int EXECUTES_IS_NULL = 0x10; static final int EXECUTES_IS_OWNS = 0x20; + static final int WAITSON_IS_OWNS = 0x40; @Override public void serialize(StoreParticipants t, DataOutputPlus out, int version) throws IOException @@ -338,18 +339,21 @@ public class CommandSerializers boolean ownsEqualsTouches = t.owns() == t.touches(); boolean executesIsNull = t.executes() == null; boolean executesIsOwns = !executesIsNull && t.executes() == t.owns(); + boolean waitsOnIsOwns = !executesIsNull && t.waitsOn() == t.owns(); out.writeByte((hasRoute ? HAS_ROUTE : 0) | (hasTouchedEqualsRoute ? HAS_TOUCHED_EQUALS_ROUTE : 0) | (touchesEqualsHasTouched ? TOUCHES_EQUALS_HAS_TOUCHED : 0) | (ownsEqualsTouches ? OWNS_EQUALS_TOUCHES : 0) | (executesIsNull ? EXECUTES_IS_NULL : 0) | (executesIsOwns ? EXECUTES_IS_OWNS : 0) + | (waitsOnIsOwns ? WAITSON_IS_OWNS : 0) ); if (hasRoute) KeySerializers.route.serialize(t.route(), out, version); if (!hasTouchedEqualsRoute) KeySerializers.participants.serialize(t.hasTouched(), out, version); if (!touchesEqualsHasTouched) KeySerializers.participants.serialize(t.touches(), out, version); if (!ownsEqualsTouches) KeySerializers.participants.serialize(t.owns(), out, version); if (!executesIsNull && !executesIsOwns) KeySerializers.participants.serialize(t.executes(), out, version); + if (!executesIsNull && !waitsOnIsOwns) KeySerializers.participants.serialize(t.waitsOn(), out, version); } public void skip(DataInputPlus in, int version) throws IOException @@ -360,6 +364,7 @@ public class CommandSerializers if (0 == (flags & TOUCHES_EQUALS_HAS_TOUCHED)) KeySerializers.participants.skip(in, version); if (0 == (flags & OWNS_EQUALS_TOUCHES)) KeySerializers.participants.skip(in, version); if (0 == (flags & (EXECUTES_IS_OWNS | EXECUTES_IS_NULL))) KeySerializers.participants.skip(in, version); + if (0 == (flags & (WAITSON_IS_OWNS | EXECUTES_IS_NULL))) KeySerializers.participants.skip(in, version); } @Override @@ -371,16 +376,8 @@ public class CommandSerializers Participants<?> touches = 0 != (flags & TOUCHES_EQUALS_HAS_TOUCHED) ? hasTouched : KeySerializers.participants.deserialize(in, version); Participants<?> owns = 0 != (flags & OWNS_EQUALS_TOUCHES) ? touches : KeySerializers.participants.deserialize(in, version); Participants<?> executes = 0 != (flags & EXECUTES_IS_NULL) ? null : 0 != (flags & EXECUTES_IS_OWNS) ? owns : KeySerializers.participants.deserialize(in, version); - return StoreParticipants.create(route, owns, executes, touches, hasTouched); - } - - public Route<?> deserializeRouteOnly(DataInputPlus in, int version) throws IOException - { - int flags = in.readByte(); - if (0 == (flags & HAS_ROUTE)) - return null; - - return KeySerializers.route.deserialize(in, version); + Participants<?> waitsOn = 0 != (flags & EXECUTES_IS_NULL) ? null : 0 != (flags & WAITSON_IS_OWNS) ? owns : KeySerializers.participants.deserialize(in, version); + return StoreParticipants.create(route, owns, executes, waitsOn, touches, hasTouched); } @Override diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java index 9fa9333def..e2a9f0dfa6 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java @@ -41,6 +41,10 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.CollectionSerializers; import org.apache.cassandra.utils.NullableSerializer; +import static org.apache.cassandra.service.accord.serializers.CommandSerializers.ExecuteAtSerializer.deserializeNullable; +import static org.apache.cassandra.service.accord.serializers.CommandSerializers.ExecuteAtSerializer.serializeNullable; +import static org.apache.cassandra.service.accord.serializers.CommandSerializers.ExecuteAtSerializer.serializedNullableSize; + public class CommandStoreSerializers { private CommandStoreSerializers() {} @@ -132,63 +136,74 @@ public class CommandStoreSerializers } }), DurableBefore.Entry[]::new, DurableBefore.SerializerSupport::create); - public static final IVersionedSerializer<RedundantBefore.Entry> redundantBeforeEntry = new IVersionedSerializer<>() + public static final IVersionedSerializer<RedundantBefore.Bounds> redundantBeforeEntry = new IVersionedSerializer<>() { @Override - public void serialize(RedundantBefore.Entry t, DataOutputPlus out, int version) throws IOException + public void serialize(RedundantBefore.Bounds b, DataOutputPlus out, int version) throws IOException { - KeySerializers.range.serialize(t.range, out, version); - Invariants.require(t.startOwnershipEpoch <= t.endOwnershipEpoch); - out.writeUnsignedVInt(t.startOwnershipEpoch); - if (t.endOwnershipEpoch == Long.MAX_VALUE) out.writeUnsignedVInt(0L); - else out.writeUnsignedVInt(1 + t.endOwnershipEpoch - t.startOwnershipEpoch); - CommandSerializers.txnId.serialize(t.locallyWitnessedBefore, out, version); - CommandSerializers.txnId.serialize(t.locallyAppliedBefore, out, version); - CommandSerializers.txnId.serialize(t.locallyDecidedAndAppliedBefore, out, version); - CommandSerializers.txnId.serialize(t.shardOnlyAppliedBefore, out, version); - CommandSerializers.txnId.serialize(t.shardAppliedBefore, out, version); - CommandSerializers.txnId.serialize(t.gcBefore, out, version); - CommandSerializers.txnId.serialize(t.bootstrappedAt, out, version); - CommandSerializers.nullableTimestamp.serialize(t.staleUntilAtLeast, out, version); + KeySerializers.range.serialize(b.range, out, version); + Invariants.require(b.startEpoch <= b.endEpoch); + out.writeUnsignedVInt(b.startEpoch); + if (b.endEpoch == Long.MAX_VALUE) out.writeUnsignedVInt(0L); + else out.writeUnsignedVInt(1 + b.endEpoch - b.startEpoch); + serializeNullable(b.staleUntilAtLeast, out); + out.writeUnsignedVInt32(b.bounds.length); + for (TxnId bound : b.bounds) + { + CommandSerializers.txnId.serialize(bound, out, version); + } + int prev = 0; + for (int status : b.statuses) + { + out.writeUnsignedVInt32(status ^ prev); + prev = status; + } } @Override - public RedundantBefore.Entry deserialize(DataInputPlus in, int version) throws IOException + public RedundantBefore.Bounds deserialize(DataInputPlus in, int version) throws IOException { Range range = KeySerializers.range.deserialize(in, version); long startEpoch = in.readUnsignedVInt(); long endEpoch = in.readUnsignedVInt(); if (endEpoch == 0) endEpoch = Long.MAX_VALUE; else endEpoch = endEpoch - 1 + startEpoch; - TxnId locallyWitnessedOrInvalidatedBefore = CommandSerializers.txnId.deserialize(in, version); - TxnId locallyAppliedOrInvalidatedBefore = CommandSerializers.txnId.deserialize(in, version); - TxnId locallyDecidedAndAppliedOrInvalidatedBefore = CommandSerializers.txnId.deserialize(in, version); - TxnId shardOnlyAppliedOrInvalidatedBefore = CommandSerializers.txnId.deserialize(in, version); - TxnId shardAppliedOrInvalidatedBefore = CommandSerializers.txnId.deserialize(in, version); - TxnId gcBefore = CommandSerializers.txnId.deserialize(in, version); - TxnId bootstrappedAt = CommandSerializers.txnId.deserialize(in, version); - Timestamp staleUntilAtLeast = CommandSerializers.nullableTimestamp.deserialize(in, version); - return new RedundantBefore.Entry(range, startEpoch, endEpoch, locallyWitnessedOrInvalidatedBefore, locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast); + Timestamp staleUntilAtLeast = deserializeNullable(in); + int count = in.readUnsignedVInt32(); + + TxnId[] bounds = new TxnId[count]; + for (int i = 0 ; i < bounds.length ; ++i) + bounds[i] = CommandSerializers.txnId.deserialize(in); + int[] statuses = new int[count * 2]; + int prev = 0; + for (int i = 0 ; i < statuses.length ; ++i) + statuses[i] = prev = in.readUnsignedVInt32() ^ prev; + + return new RedundantBefore.Bounds(range, startEpoch, endEpoch, bounds, statuses, staleUntilAtLeast); } @Override - public long serializedSize(RedundantBefore.Entry t, int version) + public long serializedSize(RedundantBefore.Bounds b, int version) { - long size = KeySerializers.range.serializedSize(t.range, version); - size += TypeSizes.sizeofUnsignedVInt(t.startOwnershipEpoch); - size += TypeSizes.sizeofUnsignedVInt(t.endOwnershipEpoch == Long.MAX_VALUE ? 0 : 1 + t.endOwnershipEpoch - t.startOwnershipEpoch); - size += CommandSerializers.txnId.serializedSize(t.locallyWitnessedBefore, version); - size += CommandSerializers.txnId.serializedSize(t.locallyAppliedBefore, version); - size += CommandSerializers.txnId.serializedSize(t.locallyDecidedAndAppliedBefore, version); - size += CommandSerializers.txnId.serializedSize(t.shardOnlyAppliedBefore, version); - size += CommandSerializers.txnId.serializedSize(t.shardAppliedBefore, version); - size += CommandSerializers.txnId.serializedSize(t.gcBefore, version); - size += CommandSerializers.txnId.serializedSize(t.bootstrappedAt, version); - size += CommandSerializers.nullableTimestamp.serializedSize(t.staleUntilAtLeast, version); + long size = KeySerializers.range.serializedSize(b.range, version); + size += TypeSizes.sizeofUnsignedVInt(b.startEpoch); + size += TypeSizes.sizeofUnsignedVInt(b.endEpoch == Long.MAX_VALUE ? 0 : 1 + b.endEpoch - b.startEpoch); + size += serializedNullableSize(b.staleUntilAtLeast); + size += TypeSizes.sizeofUnsignedVInt(b.bounds.length); + for (TxnId bound : b.bounds) + { + size += CommandSerializers.txnId.serializedSize(bound, version); + } + int prev = 0; + for (int status : b.statuses) + { + size += TypeSizes.sizeofUnsignedVInt(status ^ prev); + prev = status; + } return size; } }; - public static IVersionedSerializer<RedundantBefore> redundantBefore = new ReducingRangeMapSerializer<>(NullableSerializer.wrap(redundantBeforeEntry), RedundantBefore.Entry[]::new, RedundantBefore.SerializerSupport::create); + public static IVersionedSerializer<RedundantBefore> redundantBefore = new ReducingRangeMapSerializer<>(NullableSerializer.wrap(redundantBeforeEntry), RedundantBefore.Bounds[]::new, RedundantBefore.SerializerSupport::create); private static class TimestampToRangesSerializer<T extends Timestamp> implements IVersionedSerializer<NavigableMap<T, Ranges>> { diff --git a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java index 2e91542f1a..fca97c5d45 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java @@ -26,6 +26,7 @@ import accord.primitives.Range; import accord.topology.Shard; import accord.topology.Topology; import accord.utils.SortedArrays.SortedArrayList; +import accord.utils.TinyEnumSet; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.marshal.ValueAccessor; import org.apache.cassandra.io.IVersionedSerializer; @@ -124,6 +125,8 @@ public class TopologySerializers public static class ShardSerializer implements IVersionedSerializer<Shard> { + private static final int PENDING_REMOVAL = 1; + private static final int MUST_WITNESS = 2; protected IVersionedSerializer<Range> range; public ShardSerializer(IVersionedSerializer<Range> range) @@ -138,7 +141,7 @@ public class TopologySerializers CollectionSerializers.serializeList(shard.nodes, out, version, nodeId); CollectionSerializers.serializeList(shard.notInFastPath, out, version, nodeId); CollectionSerializers.serializeList(shard.joining, out, version, nodeId); - out.writeBoolean(shard.pendingRemoval); + out.writeUnsignedVInt32(shard.flags().bitset()); } @Override @@ -148,8 +151,8 @@ public class TopologySerializers SortedArrayList<Node.Id> nodes = CollectionSerializers.deserializeSortedArrayList(in, version, nodeId, Node.Id[]::new); SortedArrayList<Node.Id> notInFastPath = CollectionSerializers.deserializeSortedArrayList(in, version, nodeId, Node.Id[]::new); SortedArrayList<Node.Id> joining = CollectionSerializers.deserializeSortedArrayList(in, version, nodeId, Node.Id[]::new); - boolean pendingRemoval = in.readBoolean(); - return Shard.SerializerSupport.create(range, nodes, notInFastPath, joining, pendingRemoval); + int flags = in.readUnsignedVInt32(); + return Shard.SerializerSupport.create(range, nodes, notInFastPath, joining, new TinyEnumSet<>(flags)); } @Override @@ -159,7 +162,7 @@ public class TopologySerializers size += CollectionSerializers.serializedListSize(shard.nodes, version, nodeId); size += CollectionSerializers.serializedListSize(shard.notInFastPath, version, nodeId); size += CollectionSerializers.serializedListSize(shard.joining, version, nodeId); - size += TypeSizes.BOOL_SIZE; + size += TypeSizes.sizeofUnsignedVInt(shard.flags().bitset()); return size; } }; diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java index aa58aa7875..ee106e216e 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java +++ b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java @@ -296,7 +296,7 @@ public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read int i = 0, j = 0; while (i < items.length && j < that.items.length) { - TxnNamedRead r1 = this.items[i], r2 = that.items[i]; + TxnNamedRead r1 = this.items[i], r2 = that.items[j]; int c = compareKey(r1, r2); if (c <= 0) { @@ -319,7 +319,7 @@ public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read TxnNamedRead pending = null; while (i < items.length && j < that.items.length) { - TxnNamedRead r1 = this.items[i], r2 = that.items[i]; + TxnNamedRead r1 = this.items[i], r2 = that.items[j]; int c = compareRange(r1, r2); TxnNamedRead add; if (c == 0) diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java index d4e9343fc9..f3137c8c51 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java @@ -43,6 +43,7 @@ import accord.local.Command; import accord.local.CommandStore; import accord.local.DurableBefore; import accord.local.RedundantBefore; +import accord.local.RedundantStatus; import accord.local.StoreParticipants; import accord.local.cfk.CommandsForKey; import accord.local.cfk.Serialize; @@ -92,6 +93,7 @@ import static accord.local.KeyHistory.SYNC; import static accord.local.PreLoadContext.contextFor; import static accord.primitives.Routable.Domain.Range; import static accord.primitives.Timestamp.Flag.HLC_BOUND; +import static accord.primitives.Timestamp.Flag.SHARD_BOUND; import static accord.utils.async.AsyncChains.getUninterruptibly; import static org.apache.cassandra.Util.spinAssertEquals; import static org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse; @@ -227,8 +229,8 @@ public class CompactionAccordIteratorsTest private static RedundantBefore redundantBefore(TxnId txnId) { Ranges ranges = AccordTestUtils.fullRange(AccordTestUtils.keys(table, 42)); - txnId = txnId.as(Kind.ExclusiveSyncPoint, Range); - return RedundantBefore.create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, txnId, txnId, txnId, txnId, LT_TXN_ID.as(Range)); + txnId = txnId.as(Kind.ExclusiveSyncPoint, Range).addFlag(SHARD_BOUND); + return RedundantBefore.create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, txnId, RedundantStatus.GC_BEFORE_AND_LOCALLY_APPLIED, LT_TXN_ID.as(Range)); } enum DurableBeforeType diff --git a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java index 877ba587c4..ea03f381c0 100644 --- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java +++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java @@ -95,7 +95,7 @@ public class AccordDebugKeyspaceTest extends CQLTester AsyncChains.getBlocking(accord.node().coordinate(id, txn)); spinUntilSuccess(() -> assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()), - row(id.toString(), anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, anyOf(SaveStatus.ReadyToExecute.name(), SaveStatus.Applying.name(), SaveStatus.Applied.name())))); + row(id.toString(), KEYSPACE, tableName, anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, anyOf(SaveStatus.ReadyToExecute.name(), SaveStatus.Applying.name(), SaveStatus.Applied.name())))); } @Test @@ -120,11 +120,11 @@ public class AccordDebugKeyspaceTest extends CQLTester filter.preAccept.awaitThrowUncheckedOnInterrupt(); assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()), - row(id.toString(), anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name()))); + row(id.toString(), KEYSPACE, tableName, anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name()))); filter.apply.awaitThrowUncheckedOnInterrupt(); assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()), - row(id.toString(), anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, SaveStatus.ReadyToExecute.name())); + row(id.toString(), KEYSPACE, tableName, anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, SaveStatus.ReadyToExecute.name())); } finally { @@ -154,11 +154,11 @@ public class AccordDebugKeyspaceTest extends CQLTester filter.preAccept.awaitThrowUncheckedOnInterrupt(); assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()), - row(first.toString(), anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name()))); + row(first.toString(), KEYSPACE, tableName, anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name()))); filter.apply.awaitThrowUncheckedOnInterrupt(); assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()), - row(first.toString(), anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null, SaveStatus.ReadyToExecute.name())); + row(first.toString(), KEYSPACE, tableName, anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null, SaveStatus.ReadyToExecute.name())); filter.reset(); @@ -173,8 +173,8 @@ public class AccordDebugKeyspaceTest extends CQLTester return rs.size() == 2; }); assertRows(execute(QUERY_TXN_BLOCKED_BY, second.toString()), - row(second.toString(), anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null, SaveStatus.Stable.name()), - row(second.toString(), anyInt(), 1, first.toString(), "Key", anyNonNull(), anyNonNull(), SaveStatus.ReadyToExecute.name())); + row(second.toString(), KEYSPACE, tableName, anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null, SaveStatus.Stable.name()), + row(second.toString(), KEYSPACE, tableName, anyInt(), 1, first.toString(), "Key", anyNonNull(), anyNonNull(), SaveStatus.ReadyToExecute.name())); } finally { diff --git a/test/unit/org/apache/cassandra/dht/RandomPartitionerTest.java b/test/unit/org/apache/cassandra/dht/RandomPartitionerTest.java index fa3ca0fa61..1fefa1d382 100644 --- a/test/unit/org/apache/cassandra/dht/RandomPartitionerTest.java +++ b/test/unit/org/apache/cassandra/dht/RandomPartitionerTest.java @@ -25,8 +25,6 @@ import org.junit.Test; import org.apache.cassandra.cql3.functions.types.utils.Bytes; import org.apache.cassandra.harry.checker.TestHelper; -import org.apache.cassandra.harry.gen.EntropySource; -import org.apache.cassandra.harry.gen.rng.JdkRandomEntropySource; public class RandomPartitionerTest extends PartitionerTestCase { diff --git a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java index 118ecbb6cd..2af230b466 100644 --- a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java +++ b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java @@ -38,7 +38,9 @@ import org.junit.Test; import accord.api.Journal; import accord.api.RoutingKey; +import accord.local.CommandStore; import accord.local.CommandStores; +import accord.local.CommandStores.RangesForEpoch; import accord.local.DurableBefore; import accord.local.Node; import accord.local.RedundantBefore; @@ -48,7 +50,6 @@ import accord.primitives.Ballot; import accord.primitives.Deps; import accord.primitives.FullKeyRoute; import accord.primitives.PartialDeps; -import accord.primitives.Participants; import accord.primitives.Range; import accord.primitives.Ranges; import accord.primitives.Routable.Domain; @@ -57,6 +58,8 @@ import accord.primitives.SaveStatus; import accord.primitives.Timestamp; import accord.primitives.Txn; import accord.primitives.TxnId; +import accord.topology.Shard; +import accord.topology.Topology; import accord.utils.Gen; import accord.utils.Gens; import accord.utils.Property.Command; @@ -80,6 +83,7 @@ import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.dht.Murmur3Partitioner.LongToken; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.accord.AccordCommandStore; import org.apache.cassandra.service.accord.AccordJournal; import org.apache.cassandra.service.accord.AccordKeyspace; import org.apache.cassandra.service.accord.AccordService; @@ -102,6 +106,8 @@ import org.mockito.Mockito; import static accord.utils.Property.commands; import static accord.utils.Property.stateful; +import static accord.utils.SortedArrays.SortedArrayList.ofSorted; +import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner; import static org.apache.cassandra.schema.SchemaConstants.ACCORD_KEYSPACE_NAME; public class RouteIndexTest extends CQLTester.InMemory @@ -449,6 +455,8 @@ public class RouteIndexTest extends CQLTester.InMemory { private final Int2ObjectHashMap<Map<TableId, Long2ObjectHashMap<List<TxnId>>>> storeToTableToRoutingKeysToTxns = new Int2ObjectHashMap<>(); private final Int2ObjectHashMap<Map<TableId, RangeTree<TokenKey, TokenRange, TxnId>>> storeToTableToRangesToTxns = new Int2ObjectHashMap<>(); + private final Int2ObjectHashMap<RangesForEpoch> storeRangesForEpochs = new Int2ObjectHashMap<>(); + private final RedundantBefore emptyRedundantBefore = RedundantBefore.create(Ranges.of(TokenRange.fullRange(tableId, getPartitioner())), TxnId.NONE, RedundantStatus.NONE); private final int numStores; private final List<TableId> tables; @@ -468,8 +476,10 @@ public class RouteIndexTest extends CQLTester.InMemory tokenGen = TOKEN_DISTRIBUTION.next(rs); rangeGen = rangeGen(rs, tables); domainGen = DOMAIN_DISTRIBUTION.next(rs); + journalTable = Keyspace.open(ACCORD_KEYSPACE_NAME).getColumnFamilyStore(AccordKeyspace.JOURNAL); - this.journalTable = Keyspace.open(ACCORD_KEYSPACE_NAME).getColumnFamilyStore(AccordKeyspace.JOURNAL); + for (int i = 0 ; i < numStores ; ++i) + storeRangesForEpochs.put(i, new RangesForEpoch(1, Ranges.of(TokenRange.fullRange(tableId, getPartitioner())))); accordService = startAccord(); accordService.configurationService().listener.notifyPostCommit(null, ClusterMetadata.current(), false); @@ -480,11 +490,13 @@ public class RouteIndexTest extends CQLTester.InMemory { NodeId tcmNodeId = ClusterMetadata.current().myNodeId(); AccordService as = new AccordService(AccordTopology.tcmIdToAccord(tcmNodeId)); - as.startup(); - + Topology topology = new Topology(1, Shard.create(TokenRange.fullRange(tableId, getPartitioner()), ofSorted(new Node.Id(1)), ofSorted(new Node.Id(1)))); + as.unsafeStartupWithOverrides(new Journal.TopologyUpdate(storeRangesForEpochs, topology, topology)); + for (CommandStore commandStore : as.node().commandStores().all()) + ((AccordCommandStore)commandStore).unsafeUpsertRedundantBefore(emptyRedundantBefore); // the reason for the mocking is to speed up compaction. Collecting the info from the stores has been slow and its always empty in this test... so stub it out to speed up the test AccordService mock = Mockito.spy(as); - Mockito.doReturn(emptyCompactionInfo(tableId)).when(mock).getCompactionInfo(); + Mockito.doReturn(emptyCompactionInfo(tableId, emptyRedundantBefore, storeRangesForEpochs)).when(mock).getCompactionInfo(); AccordService.unsafeSetNewAccordService(mock); AccordService.replayJournal(as); @@ -628,14 +640,11 @@ public class RouteIndexTest extends CQLTester.InMemory } }; - private static IAccordService.AccordCompactionInfos emptyCompactionInfo(TableId tableId) + private static IAccordService.AccordCompactionInfos emptyCompactionInfo(TableId tableId, RedundantBefore redundantBefore, Int2ObjectHashMap<RangesForEpoch> storeRangesForEpoch) { IAccordService.AccordCompactionInfos compactionInfos = new IAccordService.AccordCompactionInfos(DurableBefore.EMPTY); - RedundantBefore redundantBefore = Mockito.spy(RedundantBefore.EMPTY); - Mockito.doReturn(RedundantStatus.NONE).when(redundantBefore).status(Mockito.any(), Mockito.any(), (Participants<?>) Mockito.any()); - Mockito.doReturn(RedundantStatus.NONE).when(redundantBefore).status(Mockito.any(), Mockito.any(), (RoutingKey) Mockito.any()); - for (int i = 0; i < MAX_STORES; i++) - compactionInfos.put(i, new AccordCompactionInfo(i, redundantBefore, new CommandStores.RangesForEpoch(1, Ranges.EMPTY), tableId)); + for (int i = 0; i < storeRangesForEpoch.size(); i++) + compactionInfos.put(i, new AccordCompactionInfo(i, redundantBefore, storeRangesForEpoch.get(i), tableId)); return compactionInfos; } diff --git a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java index f1fa4ac57c..14e162955d 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java @@ -88,7 +88,7 @@ public class AccordJournalOrderTest JournalKey key = new JournalKey(txnId, JournalKey.Type.COMMAND_DIFF, randomSource.nextInt(5)); res.compute(key, (k, prev) -> prev == null ? 1 : prev + 1); Participants<?> participants = RoutingKeys.of(new TokenKey(TableId.generate(), new ByteOrderedPartitioner.BytesToken(new byte[1]))); - Command command = Command.NotDefined.notDefined(txnId, SaveStatus.NotDefined, Status.Durability.NotDurable, StoreParticipants.create(null, participants, null, participants, participants), Ballot.ZERO); + Command command = Command.NotDefined.notDefined(txnId, SaveStatus.NotDefined, Status.Durability.NotDurable, StoreParticipants.create(null, participants, null, null, participants, participants), Ballot.ZERO); accordJournal.saveCommand(key.commandStoreId, new Journal.CommandUpdate(null, command), () -> {}); diff --git a/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java b/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java index bbdaf617e8..1385edc1e7 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java @@ -106,7 +106,7 @@ public class AccordSyncPropagatorTest RandomDelayQueue delayQueue = new RandomDelayQueue.Factory(rs).get(); PendingQueue queue = new MonitoredPendingQueue(failures, delayQueue); Agent agent = new TestAgent.RethrowAgent(); - SimulatedDelayedExecutorService globalExecutor = new SimulatedDelayedExecutorService(queue, agent); + SimulatedDelayedExecutorService globalExecutor = new SimulatedDelayedExecutorService(queue, agent, null); ScheduledExecutorPlus scheduler = new AdaptingScheduledExecutorPlus(globalExecutor); Cluster cluster = new Cluster(nodes, rs, scheduler); diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CommandStoreSerializersTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CommandStoreSerializersTest.java index 738de4809e..35b0cee6a5 100644 --- a/test/unit/org/apache/cassandra/service/accord/serializers/CommandStoreSerializersTest.java +++ b/test/unit/org/apache/cassandra/service/accord/serializers/CommandStoreSerializersTest.java @@ -50,7 +50,7 @@ public class CommandStoreSerializersTest DataOutputBuffer buffer = new DataOutputBuffer(); qt().forAll(Gens.random(), AccordGenerators.partitioner()).check((rs, partitioner) -> { DatabaseDescriptor.setPartitionerUnsafe(partitioner); - RedundantBefore.Entry entry = AccordGenerators.redundantBeforeEntry(partitioner).next(rs); + RedundantBefore.Bounds entry = AccordGenerators.redundantBeforeEntry(partitioner).next(rs); for (Version version : SUPPORTED_VERSIONS) IVersionedSerializers.testSerde(buffer, CommandStoreSerializers.redundantBeforeEntry, entry, version.value); }); diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java b/test/unit/org/apache/cassandra/utils/AccordGenerators.java index 6c5638e42b..19a922450a 100644 --- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java +++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.NavigableMap; import java.util.Set; import java.util.function.BiFunction; -import java.util.stream.Stream; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; @@ -37,6 +36,7 @@ import accord.local.Command.Truncated; import accord.local.ICommand; import accord.local.DurableBefore; import accord.local.RedundantBefore; +import accord.local.RedundantBefore.Bounds; import accord.local.StoreParticipants; import accord.primitives.Ballot; import accord.primitives.Deps; @@ -75,7 +75,14 @@ import org.apache.cassandra.service.accord.txn.TxnWrite; import org.quicktheories.impl.JavaRandom; import static accord.local.CommandStores.RangesForEpoch; +import static accord.local.RedundantStatus.LOCALLY_APPLIED_ONLY; +import static accord.local.RedundantStatus.LOCALLY_WITNESSED_ONLY; +import static accord.local.RedundantStatus.Property.GC_BEFORE; +import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP; +import static accord.local.RedundantStatus.SHARD_ONLY_APPLIED_ONLY; +import static accord.local.RedundantStatus.oneSlow; import static accord.primitives.Status.Durability.NotDurable; +import static accord.primitives.Timestamp.Flag.SHARD_BOUND; import static accord.primitives.Txn.Kind.Write; import static org.apache.cassandra.service.accord.AccordTestUtils.TABLE_ID1; import static org.apache.cassandra.service.accord.AccordTestUtils.createPartialTxn; @@ -442,28 +449,47 @@ public class AccordGenerators return AccordGens.deps(keyDepsGen(partitioner), rangeDepsGen(partitioner), directKeyDepsGen(partitioner)); } - public static Gen<RedundantBefore.Entry> redundantBeforeEntry(IPartitioner partitioner) + public static Gen<Bounds> redundantBeforeEntry(IPartitioner partitioner) { return redundantBeforeEntry(Gens.bools().all(), range(partitioner), AccordGens.txnIds(Gens.pick(Txn.Kind.ExclusiveSyncPoint), ignore -> Routable.Domain.Range)); } - public static Gen<RedundantBefore.Entry> redundantBeforeEntry(Gen<Boolean> emptyGen, Gen<Range> rangeGen, Gen<TxnId> txnIdGen) + public static Gen<Bounds> redundantBeforeEntry(Gen<Boolean> emptyGen, Gen<Range> rangeGen, Gen<TxnId> txnIdGen) { return rs -> { Range range = rangeGen.next(rs); - TxnId locallyWitnessedOrInvalidatedBefore = emptyGen.next(rs) ? TxnId.NONE : txnIdGen.next(rs); // emptyable or range - TxnId locallyAppliedOrInvalidatedBefore = TxnId.nonNullOrMin(locallyWitnessedOrInvalidatedBefore, emptyGen.next(rs) ? TxnId.NONE : txnIdGen.next(rs)); // emptyable or range - TxnId locallyDecidedAndAppliedOrInvalidatedBefore = TxnId.nonNullOrMin(locallyAppliedOrInvalidatedBefore, emptyGen.next(rs) ? TxnId.NONE : txnIdGen.next(rs)); // emptyable or range - TxnId shardOnlyAppliedOrInvalidatedBefore = emptyGen.next(rs) ? TxnId.NONE : txnIdGen.next(rs); // emptyable or range - TxnId shardAppliedOrInvalidatedBefore = TxnId.nonNullOrMin(locallyAppliedOrInvalidatedBefore, TxnId.nonNullOrMin(shardOnlyAppliedOrInvalidatedBefore, emptyGen.next(rs) ? TxnId.NONE : txnIdGen.next(rs))); // emptyable or range - TxnId gcBefore = TxnId.nonNullOrMin(shardAppliedOrInvalidatedBefore, emptyGen.next(rs) ? TxnId.NONE : txnIdGen.next(rs)); // emptyable or range - TxnId bootstrappedAt = txnIdGen.next(rs); - Timestamp staleUntilAtLeast = emptyGen.next(rs) ? null : txnIdGen.next(rs); // nullable - - long maxEpoch = Stream.of(locallyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, bootstrappedAt, staleUntilAtLeast).filter(t -> t != null).mapToLong(Timestamp::epoch).max().getAsLong(); - long startEpoch = rs.nextLong(maxEpoch); - long endEpoch = emptyGen.next(rs) ? Long.MAX_VALUE : 1 + rs.nextLong(startEpoch, Long.MAX_VALUE); - return new RedundantBefore.Entry(range, startEpoch, endEpoch, locallyWitnessedOrInvalidatedBefore, locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, gcBefore, bootstrappedAt, staleUntilAtLeast); + + List<Bounds> bounds = new ArrayList<>(); + if (rs.nextBoolean()) + bounds.add(Bounds.create(range, txnIdGen.next(rs), LOCALLY_WITNESSED_ONLY, null )); + if (rs.nextBoolean()) + bounds.add(Bounds.create(range, txnIdGen.next(rs), LOCALLY_APPLIED_ONLY, null )); + if (rs.nextBoolean()) + bounds.add(Bounds.create(range, txnIdGen.next(rs), SHARD_ONLY_APPLIED_ONLY, null )); + if (rs.nextBoolean()) + bounds.add(Bounds.create(range, txnIdGen.next(rs).addFlag(SHARD_BOUND), oneSlow(GC_BEFORE), null )); + if (rs.nextBoolean()) + bounds.add(Bounds.create(range, txnIdGen.next(rs), oneSlow(PRE_BOOTSTRAP), null )); + if (rs.nextBoolean()) + bounds.add(new Bounds(range, Long.MIN_VALUE, Long.MAX_VALUE, new TxnId[0], new int[0], txnIdGen.next(rs))); + + Collections.shuffle(bounds); + long endEpoch = emptyGen.next(rs) ? Long.MAX_VALUE : rs.nextLong(0, Long.MAX_VALUE); + long minEpoch = Long.MAX_VALUE; + Bounds result = null; + for (Bounds b : bounds) + { + if (b.bounds.length > 0) + minEpoch = Math.min(minEpoch, b.bounds[0].epoch()); + if (result == null) result = b; + else result = Bounds.reduce(result, b); + } + + long startEpoch = rs.nextLong(Math.min(minEpoch, endEpoch)); + Bounds epochBounds = new Bounds(range, startEpoch, endEpoch, new TxnId[0], new int[0], null); + if (result == null) + return epochBounds; + return Bounds.reduce(result, epochBounds); }; } @@ -471,7 +497,7 @@ public class AccordGenerators { Gen<Ranges> rangeGen = rangesArbitrary(partitioner); Gen<TxnId> txnIdGen = AccordGens.txnIds(Gens.pick(Txn.Kind.ExclusiveSyncPoint), ignore -> Routable.Domain.Range); - BiFunction<RandomSource, Range, RedundantBefore.Entry> entryGen = (rs, range) -> redundantBeforeEntry(Gens.bools().all(), i -> range, txnIdGen).next(rs); + BiFunction<RandomSource, Range, Bounds> entryGen = (rs, range) -> redundantBeforeEntry(Gens.bools().all(), i -> range, txnIdGen).next(rs); return AccordGens.redundantBefore(rangeGen, entryGen); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
