This is an automated email from the ASF dual-hosted git repository. aweisberg pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 3b99044d6d5491304d4a25d8dcea54510cfd3215 Author: Ariel Weisberg <aweisb...@apple.com> AuthorDate: Fri May 17 15:27:44 2024 -0400 Accord barrier/inclusive sync point fixes Patch by Ariel Weisberg, Benedict Elliott Smith; reviewed by Benedict Elliott Smith for CASSANDRA-19641 --- modules/accord | 2 +- .../cassandra/service/accord/AccordService.java | 2 +- .../cassandra/service/accord/api/AccordAgent.java | 10 +- .../migration/ConsensusKeyMigrationState.java | 20 ++- .../test/accord/AccordIncrementalRepairTest.java | 6 +- .../test/accord/AccordMigrationTest.java | 169 ++++++++++++++------- 6 files changed, 132 insertions(+), 77 deletions(-) diff --git a/modules/accord b/modules/accord index 778c45cd97..4e8bcae81f 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 778c45cd977576a901abf24a9759872d36fde056 +Subproject commit 4e8bcae81f9751b9d732fd5056bce31c97ad58f3 diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index f853c329c6..8e2778f394 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -366,7 +366,7 @@ public class AccordService implements IAccordService, Shutdownable { logger.debug("Starting barrier key: {} epoch: {} barrierType: {} isForWrite {}", keysOrRanges, epoch, barrierType, isForWrite); txnId = node.nextTxnId(Kind.SyncPoint, keysOrRanges.domain()); - AsyncResult<Timestamp> asyncResult = syncPoint == null + AsyncResult<TxnId> asyncResult = syncPoint == null ? Barrier.barrier(node, keysOrRanges, epoch, barrierType) : Barrier.barrier(node, keysOrRanges, epoch, barrierType, syncPoint); long deadlineNanos = queryStartNanos + timeoutNanos; diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java index 9c4b678996..c0fea38a37 100644 --- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java +++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java @@ -19,12 +19,11 @@ package org.apache.cassandra.service.accord.api; import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; - import accord.api.Agent; import accord.api.EventsListener; import accord.api.Result; @@ -35,8 +34,9 @@ import accord.primitives.Seekables; import accord.primitives.Timestamp; import accord.primitives.Txn; import accord.primitives.Txn.Kind; -import org.apache.cassandra.service.accord.AccordService; +import accord.primitives.TxnId; import org.apache.cassandra.metrics.AccordMetrics; +import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.service.accord.txn.TxnQuery; import org.apache.cassandra.service.accord.txn.TxnRead; import org.apache.cassandra.tcm.Epoch; @@ -84,12 +84,12 @@ public class AccordAgent implements Agent } @Override - public void onLocalBarrier(@Nonnull Seekables<?, ?> keysOrRanges, @Nonnull Timestamp executeAt) + public void onLocalBarrier(@Nonnull Seekables<?, ?> keysOrRanges, @Nonnull TxnId txnId) { if (keysOrRanges.domain() == Key) { PartitionKey key = (PartitionKey)keysOrRanges.get(0); - maybeSaveAccordKeyMigrationLocally(key, Epoch.create(executeAt.epoch())); + maybeSaveAccordKeyMigrationLocally(key, Epoch.create(txnId.epoch())); } } diff --git a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusKeyMigrationState.java b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusKeyMigrationState.java index 068e46d4a4..86eb97ae06 100644 --- a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusKeyMigrationState.java +++ b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusKeyMigrationState.java @@ -27,6 +27,8 @@ import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import accord.api.BarrierType; import accord.primitives.Seekables; @@ -68,9 +70,7 @@ import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.UUIDSerializer; import static org.apache.cassandra.net.Verb.CONSENSUS_KEY_MIGRATION; - import static org.apache.cassandra.service.consensus.migration.ConsensusMigrationTarget.paxos; - import static org.apache.cassandra.utils.Clock.Global.nanoTime; /** @@ -83,6 +83,8 @@ import static org.apache.cassandra.utils.Clock.Global.nanoTime; */ public abstract class ConsensusKeyMigrationState { + private static final Logger logger = LoggerFactory.getLogger(ConsensusKeyMigrationState.class); + /* * Used to notify other replicas when key migration has occurred so they can * also cache that the key migration was done @@ -188,12 +190,14 @@ public abstract class ConsensusKeyMigrationState private static final CacheLoader<Pair<ByteBuffer, UUID>, ConsensusMigratedAt> LOADING_FUNCTION = k -> SystemKeyspace.loadConsensusKeyMigrationState(k.left, k.right); private static final Weigher<Pair<ByteBuffer, UUID>, ConsensusMigratedAt> WEIGHER_FUNCTION = (k, v) -> EMPTY_KEY_SIZE + Ints.checkedCast(ByteBufferUtil.estimatedSizeOnHeap(k.left)) + VALUE_SIZE; - private static final LoadingCache<Pair<ByteBuffer, UUID>, ConsensusMigratedAt> MIGRATION_STATE_CACHE = - Caffeine.newBuilder() - .maximumWeight(DatabaseDescriptor.getConsensusMigrationCacheSizeInMiB() << 20) - .weigher(WEIGHER_FUNCTION) - .executor(ImmediateExecutor.INSTANCE) - .build(LOADING_FUNCTION); + + @VisibleForTesting + public static final LoadingCache<Pair<ByteBuffer, UUID>, ConsensusMigratedAt> MIGRATION_STATE_CACHE = + Caffeine.newBuilder() + .maximumWeight(DatabaseDescriptor.getConsensusMigrationCacheSizeInMiB() << 20) + .weigher(WEIGHER_FUNCTION) + .executor(ImmediateExecutor.INSTANCE) + .build(LOADING_FUNCTION); public static final IVerbHandler<ConsensusKeyMigrationFinished> consensusKeyMigrationFinishedHandler = message -> { saveConsensusKeyMigrationLocally(message.payload.partitionKey, message.payload.tableId, message.payload.consensusMigratedAt); diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java index 66cf1c0e00..1e76473eeb 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java @@ -99,12 +99,12 @@ public class AccordIncrementalRepairTest extends AccordTestBase private final List<ExecutedBarrier> barriers = new ArrayList<>(); @Override - public void onLocalBarrier(@Nonnull Seekables<?, ?> keysOrRanges, @Nonnull Timestamp executeAt) + public void onLocalBarrier(@Nonnull Seekables<?, ?> keysOrRanges, @Nonnull TxnId txnId) { - super.onLocalBarrier(keysOrRanges, executeAt); + super.onLocalBarrier(keysOrRanges, txnId); synchronized (barriers) { - barriers.add(new ExecutedBarrier(keysOrRanges, executeAt)); + barriers.add(new ExecutedBarrier(keysOrRanges, txnId)); } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java index a63642f819..97d35d79af 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java @@ -26,14 +26,13 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nonnull; -import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.ImmutableList; - import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -41,12 +40,14 @@ import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.yaml.snakeyaml.Yaml; +import com.fasterxml.jackson.core.type.TypeReference; import org.apache.cassandra.ServerTestUtils; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.Config.PaxosVariant; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.BufferDecoratedKey; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ConsistencyLevel; @@ -70,9 +71,10 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.consensus.TransactionalMode; import org.apache.cassandra.service.consensus.migration.ConsensusKeyMigrationState; -import org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter; +import org.apache.cassandra.service.consensus.migration.ConsensusMigratedAt; import org.apache.cassandra.service.consensus.migration.ConsensusMigrationState; import org.apache.cassandra.service.consensus.migration.ConsensusMigrationTarget; +import org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter; import org.apache.cassandra.service.consensus.migration.TableMigrationState; import org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode; import org.apache.cassandra.service.paxos.Ballot; @@ -87,15 +89,13 @@ import org.apache.cassandra.utils.ByteArrayUtil; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JsonUtils; +import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.PojoToString; +import org.yaml.snakeyaml.Yaml; +import static com.google.common.collect.ImmutableList.toImmutableList; import static java.lang.String.format; import static java.util.Collections.emptyList; -import static com.google.common.collect.ImmutableList.toImmutableList; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - import static org.apache.cassandra.Util.spinUntilSuccess; import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; import static org.apache.cassandra.db.SystemKeyspace.CONSENSUS_MIGRATION_STATE; @@ -108,6 +108,9 @@ import static org.apache.cassandra.schema.SchemaConstants.SYSTEM_KEYSPACE_NAME; import static org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.ConsensusRoutingDecision.paxosV2; import static org.apache.cassandra.service.paxos.PaxosState.MaybePromise.Outcome.PROMISE; import static org.assertj.core.api.Fail.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; /* * This test suite is intended to serve as an integration test with some pretty good visibility into actual execution @@ -296,7 +299,7 @@ public class AccordMigrationTest extends AccordTestBase * Helper to invoke a query and assert that the right metrics change indicating the correct * paths were taken to execute the query during migration */ - private static void assertTargetAccordWrite(Consumer<Integer> query, int coordinatorIndex, int key, int expectedAccordWriteCount, int expectedCasWriteCount, int expectedKeyMigrationCount, int expectedCasBeginRejects, int expectedCasAcceptRejects) + private static void assertTargetAccordWrite(Consumer<Integer> query, int coordinatorIndex, int key, List<Pair<ByteBuffer, UUID>> expectedKeyMigrations, int expectedAccordWriteCount, int expectedCasWriteCount, int expectedKeyMigrationCount, int expectedCasBeginRejects, int expectedCasAcceptRejects) { int startingWriteCount = getAccordWriteCount(coordinatorIndex); int startingCasWriteCount = getCasWriteCount(coordinatorIndex); @@ -304,6 +307,7 @@ public class AccordMigrationTest extends AccordTestBase int startingCasWriteBeginRejects = getCasWriteBeginRejects(coordinatorIndex); int startingCasWriteAcceptRejects = getCasWriteAcceptRejects(coordinatorIndex); query.accept(key); + validateKeyMigrations(expectedKeyMigrations); assertEquals("Accord writes", expectedAccordWriteCount, getAccordWriteCount(coordinatorIndex) - startingWriteCount); assertEquals("CAS writes", expectedCasWriteCount, getCasWriteCount(coordinatorIndex) - startingCasWriteCount); assertEquals("Key Migrations", expectedKeyMigrationCount, getKeyMigrationCount(coordinatorIndex) - startingKeyMigrationCount); @@ -311,7 +315,7 @@ public class AccordMigrationTest extends AccordTestBase assertEquals("CAS Accept rejects", expectedCasAcceptRejects, getCasWriteAcceptRejects(coordinatorIndex) - startingCasWriteAcceptRejects); } - private static Object[][] assertTargetAccordRead(Function<Integer, Object[][]> query, int coordinatorIndex, int key, int expectedAccordReadCount, int expectedCasPrepareCount, int expectedKeyMigrationCount, int expectedCasReadBeginRejects, int expectedCasReadAcceptRejects) + private static Object[][] assertTargetAccordRead(Function<Integer, Object[][]> query, int coordinatorIndex, int key, List<Pair<ByteBuffer, UUID>> expectedKeyMigrations, int expectedAccordReadCount, int expectedCasPrepareCount, int expectedKeyMigrationCount, int expectedCasReadBeginRejects, int expectedCasReadAcceptRejects) { int startingReadCount = getAccordReadCount(coordinatorIndex); int startingCasPrepareCount = getCasPrepareCount(coordinatorIndex); @@ -319,6 +323,7 @@ public class AccordMigrationTest extends AccordTestBase int startingCasReadBeginRejects = getCasReadBeginRejects(coordinatorIndex); int startingCasReadAcceptRejects = getCasReadAcceptRejects(coordinatorIndex); Object[][] result = query.apply(key); + validateKeyMigrations(expectedKeyMigrations); assertEquals("Accord reads", expectedAccordReadCount, getAccordReadCount(coordinatorIndex) - startingReadCount); assertEquals("CAS prepares", expectedCasPrepareCount, getCasPrepareCount(coordinatorIndex) - startingCasPrepareCount); assertEquals("Key Migrations", expectedKeyMigrationCount, getKeyMigrationCount(coordinatorIndex) - startingKeyMigrationCount); @@ -327,7 +332,7 @@ public class AccordMigrationTest extends AccordTestBase return result; } - private static void assertTargetPaxosWrite(Consumer<Integer> query, int coordinatorIndex, int key, int expectedAccordWriteCount, int expectedCasWriteCount, int expectedKeyMigrationCount, int expectedMigrationRejects, int expectedSkippedReads) + private static void assertTargetPaxosWrite(Consumer<Integer> query, int coordinatorIndex, int key, List<Pair<ByteBuffer, UUID>> expectedKeyMigrations, int expectedAccordWriteCount, int expectedCasWriteCount, int expectedKeyMigrationCount, int expectedMigrationRejects, int expectedSkippedReads) { int startingWriteCount = getAccordWriteCount(coordinatorIndex); int startingCasWriteCount = getCasWriteCount(coordinatorIndex); @@ -335,6 +340,7 @@ public class AccordMigrationTest extends AccordTestBase int startingMigrationRejectsCount = getAccordMigrationRejects(coordinatorIndex); int startingSkippedReadsCount = getAccordMigrationSkippedReads(); query.accept(key); + validateKeyMigrations(expectedKeyMigrations); assertEquals("Accord writes", expectedAccordWriteCount, getAccordWriteCount(coordinatorIndex) - startingWriteCount); assertEquals("CAS writes", expectedCasWriteCount, getCasWriteCount(coordinatorIndex) - startingCasWriteCount); assertEquals("Key Migrations", expectedKeyMigrationCount, getKeyMigrationCount(coordinatorIndex) - startingKeyMigrationCount); @@ -342,12 +348,60 @@ public class AccordMigrationTest extends AccordTestBase assertEquals("Accord skipped reads", expectedSkippedReads, getAccordMigrationSkippedReads() - startingSkippedReadsCount); } + private static void validateKeyMigrations(List<Pair<ByteBuffer, UUID>> expectedMigrations) + { + spinUntilSuccess(() -> { + try + { + List<byte[]> keys = expectedMigrations.stream().map(p -> p.left.array()).collect(Collectors.toList()); + List<Integer> intKeys = expectedMigrations.stream().map(p -> ByteBufferUtil.toInt(p.left)).collect(Collectors.toList()); + List<UUID> tables = expectedMigrations.stream().map(p -> p.right).collect(Collectors.toList()); + for (int i = 1; i < SHARED_CLUSTER.size(); i++) + { + int instanceIndex = i; + IInvokableInstance instance = SHARED_CLUSTER.get(i); + instance.runOnInstance(() -> { + Map<Pair<ByteBuffer, UUID>, ConsensusMigratedAt> cacheMap = ConsensusKeyMigrationState.MIGRATION_STATE_CACHE.asMap(); + String cacheMessage = format("Instance %d Expected %s migrations but found in cache %s", instanceIndex, intKeys, cacheMap); + assertEquals(cacheMessage, keys.size(), cacheMap.size()); + for (int j = 0; j < keys.size(); j++) + { + assertTrue(cacheMessage, + cacheMap.containsKey(Pair.create(ByteBuffer.wrap(keys.get(j)), tables.get(j)))); + } + + UntypedResultSet result = QueryProcessor.executeInternal("SELECT * from " + SYSTEM_KEYSPACE_NAME + "." + CONSENSUS_MIGRATION_STATE); + String tableMessage = format("Instance %d Expected %s migrations but found in system table %s", instanceIndex, intKeys, result); + assertEquals(tableMessage, keys.size(), result.size()); + Iterator<UntypedResultSet.Row> resultIterator = result.iterator(); + for (int j = 0; j < result.size(); j++) + { + UntypedResultSet.Row row = resultIterator.next(); + boolean foundKey = false; + for (byte[] expectedKey : keys) + if (ByteBuffer.wrap(expectedKey).equals(row.getBytes("row_key"))) + foundKey = true; + assertTrue(tableMessage, foundKey); + } + }); + } + } + catch (Throwable t) + { + // For some reason full stack trace wasn't displayed without rethrowing + throw new AssertionError(t); + } + }); + } + @Test public void testPaxosToAccordCAS() throws Exception { test(format(TABLE_FMT, qualifiedTableName), cluster -> { + List<Pair<ByteBuffer, UUID>> expectedKeyMigrations = new ArrayList<>(); String table = tableName; + UUID tableUUID = cluster.get(1).callOnInstance(() -> ColumnFamilyStore.getIfExists(KEYSPACE, table).getTableId().asUUID()); cluster.forEach(node -> node.runOnInstance(() -> { TableMetadata tbl = Schema.instance.getTableMetadata(KEYSPACE, table); Assert.assertEquals(TransactionalMode.off, tbl.params.transactionalMode); @@ -373,24 +427,25 @@ public class AccordMigrationTest extends AccordTestBase List<Range<Token>> migratingRanges = ImmutableList.of(migratingRange); // Not actually migrating yet so should do nothing special - assertTargetAccordWrite(runCasNoApply, 1, migratingKey, 0, 1, 0, 0, 0); + assertTargetAccordWrite(runCasNoApply, 1, migratingKey, expectedKeyMigrations, 0, 1, 0, 0, 0); // Mark ranges migrating and check migration state is correct nodetool(coordinator, "consensus_admin", "begin-migration", "-st", midToken.toString(), "-et", maxToken.toString(), "-tp", "accord", KEYSPACE, tableName); assertMigrationState(tableName, ConsensusMigrationTarget.accord, emptyList(), migratingRanges, 1); // Should be routed directly to Accord, and perform key migration, as well as key migration read in Accord - assertTargetAccordWrite(runCasNoApply, 1, migratingKey, 1, 0, 1, 0, 0); + addExpectedMigratedKey(expectedKeyMigrations, migratingKey, tableUUID); + assertTargetAccordWrite(runCasNoApply, 1, migratingKey, expectedKeyMigrations, 1, 0, 1, 0, 0); // Should not repeat key migration, and should still do a migration read in Accord - assertTargetAccordWrite(runCasNoApply, 1, migratingKey, 1, 0, 0, 0, 0); + assertTargetAccordWrite(runCasNoApply, 1, migratingKey, expectedKeyMigrations, 1, 0, 0, 0, 0); // Should run on Paxos since it is not in the migrating range - assertTargetAccordWrite(runCasNoApply, 1, notMigratingKey, 0, 1, 0, 0, 0); + assertTargetAccordWrite(runCasNoApply, 1, notMigratingKey, expectedKeyMigrations, 0, 1, 0, 0, 0); // Check that the coordinator on the other node also has saved that the key migration was performed // and runs the query on Accord immediately without key migration - assertTargetAccordWrite(runCasOnSecondNode, 2, migratingKey, 1, 0, 0, 0, 0); + assertTargetAccordWrite(runCasOnSecondNode, 2, migratingKey, expectedKeyMigrations, 1, 0, 0, 0, 0); // Forced repair while a node is down shouldn't work, use repair instead of finish-migration because repair exposes --force // and regular Cassandra repairs are eligible to drive migration so it's important they check --force and down nodes @@ -411,10 +466,10 @@ public class AccordMigrationTest extends AccordTestBase assertMigrationState(tableName, ConsensusMigrationTarget.accord, migratingRanges, emptyList(), 0); // Should run on Accord, and not perform key migration nor should it need to perform a migration read in Accord now that it is repaired - assertTargetAccordWrite(runCasNoApply, 1, migratingKey, 1, 0, 0, 0, 0); + assertTargetAccordWrite(runCasNoApply, 1, migratingKey, expectedKeyMigrations, 1, 0, 0, 0, 0); // Should run on Paxos, and not perform key migration - assertTargetAccordWrite(runCasNoApply, 1, notMigratingKey, 0, 1, 0, 0, 0); + assertTargetAccordWrite(runCasNoApply, 1, notMigratingKey, expectedKeyMigrations, 0, 1, 0, 0, 0); // Pivot to testing repair with a subrange of the migrating range as well as key migration // Will use the unmigrated range between lowerMidToken and midToken @@ -429,7 +484,8 @@ public class AccordMigrationTest extends AccordTestBase saveAcceptedPaxosProposal(tableName, ballotString, migratingKey); // PaxosRepair will have inserted a condition matching row, so it can apply, demonstrating repair and // key migration occurred - assertTargetAccordWrite(runCasApplies, 1, migratingKey, 1, 0, 1, 0, 0); + addExpectedMigratedKey(expectedKeyMigrations, migratingKey, tableUUID); + assertTargetAccordWrite(runCasApplies, 1, migratingKey, expectedKeyMigrations, 1, 0, 1, 0, 0); // This will force the write to use the normal write patch cluster.get(1).runOnInstance(() -> ConsensusRequestRouter.setInstance(new PaxosToAccordMigrationNotHappeningUpToBegin())); @@ -443,7 +499,8 @@ public class AccordMigrationTest extends AccordTestBase // This will force the request to run on Paxos up to Accept // and the accept will be rejected at both nodes and we are certain we need to retry the transaction cluster.get(1).runOnInstance(() -> ConsensusRequestRouter.setInstance(new PaxosToAccordMigrationNotHappeningUpToBegin())); - assertTargetAccordWrite(runCasApplies, 1, migratingKey, 1, 1, 1, 0, 1); + addExpectedMigratedKey(expectedKeyMigrations, migratingKey, tableUUID); + assertTargetAccordWrite(runCasApplies, 1, migratingKey, expectedKeyMigrations, 1, 1, 1, 0, 1); // One node will now accept the other will reject and we are uncertain if we should retry the transaction // and should surface that as a timeout exception @@ -467,7 +524,9 @@ public class AccordMigrationTest extends AccordTestBase // retry it on Accord cluster.get(1).runOnInstance(() -> ConsensusRequestRouter.setInstance(new RoutesToPaxosOnce())); // Should exit Paxos from begin, key migration should occur because it's a new key, and Accord will need to do a migration read - assertTargetAccordWrite(runCasNoApply, 1, testingKeys.next(), 1, 1, 1, 1, 0); + migratingKey = testingKeys.next(); + addExpectedMigratedKey(expectedKeyMigrations, migratingKey, tableUUID); + assertTargetAccordWrite(runCasNoApply, 1, migratingKey, expectedKeyMigrations, 1, 1, 1, 1, 0); // Now do two repairs to complete the migration repair, and we are done with black box integration testing // First repair is a range smack dab in the middle @@ -494,6 +553,9 @@ public class AccordMigrationTest extends AccordTestBase { test(format(TABLE_FMT, qualifiedTableName), cluster -> { + String table = tableName; + UUID tableUUID = cluster.get(1).callOnInstance(() -> ColumnFamilyStore.getIfExists(KEYSPACE, table).getTableId().asUUID()); + List<Pair<ByteBuffer, UUID>> expectedKeyMigrations = new ArrayList<>(); cluster.schemaChange(format("ALTER TABLE %s.%s WITH transactional_mode='%s'", KEYSPACE, tableName, TransactionalMode.full)); String readCQL = format("SELECT * FROM %s WHERE id = ? and c = %s", qualifiedTableName, CLUSTERING_VALUE); Function<Integer, Object[][]> runRead = key -> cluster.coordinator(1).execute(readCQL, SERIAL, key); @@ -501,17 +563,21 @@ public class AccordMigrationTest extends AccordTestBase List<Range<Token>> migratingRanges = ImmutableList.of(migratingRange); int key = 0; - assertTargetAccordRead(runRead, 1, 0, 0, 1, 0, 0, 0); + assertTargetAccordRead(runRead, 1, key, expectedKeyMigrations, 0, 1, 0, 0, 0); // Mark wrap around range as migrating nodetool(coordinator, "consensus_admin", "begin-migration", "-st", String.valueOf(Long.MIN_VALUE + 1), "-et", String.valueOf(Long.MIN_VALUE), "-tp", "accord", KEYSPACE, tableName); assertMigrationState(tableName, ConsensusMigrationTarget.accord, emptyList(), migratingRanges, 1); // Should run directly on accord, migrate the key, and perform a quorum read from Accord, Paxos repair will run prepare once - assertTargetAccordRead(runRead, 1, key++, 1, 1, 1, 0, 0); + addExpectedMigratedKey(expectedKeyMigrations, key, tableUUID); + assertTargetAccordRead(runRead, 1, key, expectedKeyMigrations, 1, 1, 1, 0, 0); + key++; // Should run up to accept with both nodes refusing to accept savePromisedAndCommittedPaxosProposal(tableName, key); cluster.get(1).runOnInstance(() -> ConsensusRequestRouter.setInstance(new PaxosToAccordMigrationNotHappeningUpToBegin())); - assertTargetAccordRead(runRead, 1, key++, 1, 2, 1, 0, 1); + addExpectedMigratedKey(expectedKeyMigrations, key, tableUUID); + assertTargetAccordRead(runRead, 1, key, expectedKeyMigrations, 1, 2, 1, 0, 1); + key++; }); } @@ -542,6 +608,7 @@ public class AccordMigrationTest extends AccordTestBase String casCQL = format(CAS_FMT, qualifiedTableName, CLUSTERING_VALUE); Consumer<Integer> runCasNoApply = key -> assertRowEquals(cluster, new Object[]{false}, casCQL, key); String tableName = qualifiedTableName.split("\\.")[1]; + UUID tableUUID = cluster.get(1).callOnInstance(() -> ColumnFamilyStore.getIfExists(KEYSPACE, tableName).getTableId().asUUID()); alterTableTransactionalMode(TransactionalMode.mixed_reads); assertTransactionalModes(TransactionalMode.mixed_reads, TransactionalMigrationFromMode.off); @@ -562,25 +629,33 @@ public class AccordMigrationTest extends AccordTestBase Iterator<Integer> paxosMigratingKeys = getKeysBetweenTokens(upperMidToken, maxToken); Iterator<Integer> accordKeys = getKeysBetweenTokens(midToken, upperMidToken); + List<Pair<ByteBuffer, UUID>> expectedKeyMigrations = new ArrayList<>(); + // Paxos non-migrating keys should run on Paxos as per normal - assertTargetPaxosWrite(runCasNoApply, 1, paxosNonMigratingKeys.next(), 0, 1, 0, 0, 0); + assertTargetPaxosWrite(runCasNoApply, 1, paxosNonMigratingKeys.next(), expectedKeyMigrations, 0, 1, 0, 0, 0); + Integer nextMigratingKey = paxosMigratingKeys.next(); + addExpectedMigratedKey(expectedKeyMigrations, nextMigratingKey, tableUUID); // Paxos migrating keys should be key migrated which means a local barrier is run by Paxos during read at each replica, the key migration barrier is also counted as a write - assertTargetPaxosWrite(runCasNoApply, 1, paxosMigratingKeys.next(), 1, 1, 1, 0, 0); + assertTargetPaxosWrite(runCasNoApply, 1, nextMigratingKey, expectedKeyMigrations, 1, 1, 1, 0, 0); // A key from a range migrated to Accord is now not migrating/migrated and should be accessed through Accord - assertTargetPaxosWrite(runCasNoApply, 1, accordKeys.next(), 1, 0, 0, 0, 0); + assertTargetPaxosWrite(runCasNoApply, 1, accordKeys.next(), expectedKeyMigrations, 1, 0, 0, 0, 0); // If an Accord transaction races with cluster metadata updates it should be rejected if the epoch it runs in contains the migration cluster.get(1).runOnInstance(() -> ConsensusRequestRouter.setInstance(new RoutesToAccordOnce())); - assertTargetPaxosWrite(runCasNoApply, 1, paxosMigratingKeys.next(), 2, 1, 1, 1, 1); + nextMigratingKey = paxosMigratingKeys.next(); + addExpectedMigratedKey(expectedKeyMigrations, nextMigratingKey, tableUUID); + assertTargetPaxosWrite(runCasNoApply, 1, nextMigratingKey, expectedKeyMigrations, 2, 1, 1, 1, 1); // Repair the currently migrating range from when targets were switched, but it's not an Accord repair, this is to make sure the wrong repair type doesn't trigger progress nodetool(coordinator, "repair", "-st", upperMidToken.toString(), "-et", maxAlignedWithLocalRanges.toString(), "--paxos-only"); assertMigrationState(tableName, ConsensusMigrationTarget.paxos, ImmutableList.of(new Range(minToken, midToken), new Range(maxToken, minToken)), ImmutableList.of(accordMigratingRange), 1); // Paxos migrating keys should still need key migration after non-Accord repair - assertTargetPaxosWrite(runCasNoApply, 1, paxosMigratingKeys.next(), 1, 1, 1, 0, 0); + nextMigratingKey = paxosMigratingKeys.next(); + addExpectedMigratedKey(expectedKeyMigrations, nextMigratingKey, tableUUID); + assertTargetPaxosWrite(runCasNoApply, 1, nextMigratingKey, expectedKeyMigrations, 1, 1, 1, 0, 0); // Now do it with an Accord repair so key migration shouldn't be necessary nodetool(coordinator, "consensus_admin", "finish-migration", "-st", upperMidToken.toString(), "-et", maxAlignedWithLocalRanges.toString()); @@ -591,40 +666,16 @@ public class AccordMigrationTest extends AccordTestBase assertMigrationState(tableName, ConsensusMigrationTarget.paxos, ImmutableList.of(new Range(minToken, midToken), repairedRange, new Range(maxToken, minToken)), ImmutableList.of(remainingRange), 1); // Paxos migrating keys shouldn't need key migration after Accord repair - assertTargetPaxosWrite(runCasNoApply, 1, paxosMigratingKeys.next(), 0, 1, 0, 0, 0); + assertTargetPaxosWrite(runCasNoApply, 1, paxosMigratingKeys.next(), expectedKeyMigrations, 0, 1, 0, 0, 0); }); } - private static void assertCompletedMigrationState(String tableName) throws Throwable + private static void addExpectedMigratedKey(List<Pair<ByteBuffer, UUID>> expectedKeyMigrations, Integer nextMigratingKey, UUID tableUUID) { - // Validate nodetool consensus admin list output - String yamlResultString = nodetool(SHARED_CLUSTER.coordinator(1), "consensus_admin", "list"); - Map<String, Object> yamlStateMap = new Yaml().load(yamlResultString); - String minifiedYamlResultString = nodetool(SHARED_CLUSTER.coordinator(1), "consensus_admin", "list", "-f", "minified-yaml"); - Map<String, Object> minifiedYamlStateMap = new Yaml().load(minifiedYamlResultString); - String jsonResultString = nodetool(SHARED_CLUSTER.coordinator(1), "consensus_admin", "list", "-f", "json"); - Map<String, Object> jsonStateMap = JsonUtils.JSON_OBJECT_MAPPER.readValue(jsonResultString, new TypeReference<Map<String, Object>>(){}); - String minifiedJsonResultString = nodetool(SHARED_CLUSTER.coordinator(1), "consensus_admin", "list", "-f", "minified-json"); - Map<String, Object> minifiedJsonStateMap = JsonUtils.JSON_OBJECT_MAPPER.readValue(minifiedJsonResultString, new TypeReference<Map<String, Object>>(){}); - - for (Map<String, Object> migrationStateMap : ImmutableList.of(yamlStateMap, jsonStateMap, minifiedYamlStateMap, minifiedJsonStateMap)) { - assertEquals(PojoToString.CURRENT_VERSION, migrationStateMap.get("version")); - assertTrue(Epoch.EMPTY.getEpoch() < ((Number) migrationStateMap.get("lastModifiedEpoch")).longValue()); - List<Map<String, Object>> tableStates = (List<Map<String, Object>>) migrationStateMap.get("tableStates"); - assertEquals(0, tableStates.size()); - } - spinUntilSuccess(() -> { - for (IInvokableInstance instance : SHARED_CLUSTER) - { - ConsensusMigrationState snapshot = getMigrationStateSnapshot(instance); - assertEquals(0, snapshot.tableStates.size()); - instance.runOnInstance(() -> { - TableMetadata tbl = Schema.instance.getTableMetadata(KEYSPACE, tableName); - Assert.assertEquals(TransactionalMigrationFromMode.none, tbl.params.transactionalMigrationFrom); - }); - } - }); + ByteBuffer key = ByteBuffer.allocate(4); + key.putInt(0, nextMigratingKey); + expectedKeyMigrations.add(Pair.create(key, tableUUID)); } private static void assertMigrationState(String tableName, ConsensusMigrationTarget target, List<Range<Token>> migratedRanges, List<Range<Token>> migratingRanges, int numMigratingEpochs) throws Throwable --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org