This is an automated email from the ASF dual-hosted git repository.

aweisberg pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 3b99044d6d5491304d4a25d8dcea54510cfd3215
Author: Ariel Weisberg <aweisb...@apple.com>
AuthorDate: Fri May 17 15:27:44 2024 -0400

    Accord barrier/inclusive sync point fixes
    
    Patch by Ariel Weisberg, Benedict Elliott Smith; reviewed by Benedict 
Elliott Smith for CASSANDRA-19641
---
 modules/accord                                     |   2 +-
 .../cassandra/service/accord/AccordService.java    |   2 +-
 .../cassandra/service/accord/api/AccordAgent.java  |  10 +-
 .../migration/ConsensusKeyMigrationState.java      |  20 ++-
 .../test/accord/AccordIncrementalRepairTest.java   |   6 +-
 .../test/accord/AccordMigrationTest.java           | 169 ++++++++++++++-------
 6 files changed, 132 insertions(+), 77 deletions(-)

diff --git a/modules/accord b/modules/accord
index 778c45cd97..4e8bcae81f 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 778c45cd977576a901abf24a9759872d36fde056
+Subproject commit 4e8bcae81f9751b9d732fd5056bce31c97ad58f3
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index f853c329c6..8e2778f394 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -366,7 +366,7 @@ public class AccordService implements IAccordService, 
Shutdownable
         {
             logger.debug("Starting barrier key: {} epoch: {} barrierType: {} 
isForWrite {}", keysOrRanges, epoch, barrierType, isForWrite);
             txnId = node.nextTxnId(Kind.SyncPoint, keysOrRanges.domain());
-            AsyncResult<Timestamp> asyncResult = syncPoint == null
+            AsyncResult<TxnId> asyncResult = syncPoint == null
                                                  ? Barrier.barrier(node, 
keysOrRanges, epoch, barrierType)
                                                  : Barrier.barrier(node, 
keysOrRanges, epoch, barrierType, syncPoint);
             long deadlineNanos = queryStartNanos + timeoutNanos;
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java 
b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
index 9c4b678996..c0fea38a37 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
@@ -19,12 +19,11 @@
 package org.apache.cassandra.service.accord.api;
 
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
-
 import accord.api.Agent;
 import accord.api.EventsListener;
 import accord.api.Result;
@@ -35,8 +34,9 @@ import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.Txn.Kind;
-import org.apache.cassandra.service.accord.AccordService;
+import accord.primitives.TxnId;
 import org.apache.cassandra.metrics.AccordMetrics;
+import org.apache.cassandra.service.accord.AccordService;
 import org.apache.cassandra.service.accord.txn.TxnQuery;
 import org.apache.cassandra.service.accord.txn.TxnRead;
 import org.apache.cassandra.tcm.Epoch;
@@ -84,12 +84,12 @@ public class AccordAgent implements Agent
     }
 
     @Override
-    public void onLocalBarrier(@Nonnull Seekables<?, ?> keysOrRanges, @Nonnull 
Timestamp executeAt)
+    public void onLocalBarrier(@Nonnull Seekables<?, ?> keysOrRanges, @Nonnull 
TxnId txnId)
     {
         if (keysOrRanges.domain() == Key)
         {
             PartitionKey key = (PartitionKey)keysOrRanges.get(0);
-            maybeSaveAccordKeyMigrationLocally(key, 
Epoch.create(executeAt.epoch()));
+            maybeSaveAccordKeyMigrationLocally(key, 
Epoch.create(txnId.epoch()));
         }
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusKeyMigrationState.java
 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusKeyMigrationState.java
index 068e46d4a4..86eb97ae06 100644
--- 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusKeyMigrationState.java
+++ 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusKeyMigrationState.java
@@ -27,6 +27,8 @@ import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import accord.api.BarrierType;
 import accord.primitives.Seekables;
@@ -68,9 +70,7 @@ import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 import static org.apache.cassandra.net.Verb.CONSENSUS_KEY_MIGRATION;
-
 import static 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationTarget.paxos;
-
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
 /**
@@ -83,6 +83,8 @@ import static 
org.apache.cassandra.utils.Clock.Global.nanoTime;
  */
 public abstract class ConsensusKeyMigrationState
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(ConsensusKeyMigrationState.class);
+
     /*
      * Used to notify other replicas when key migration has occurred so they 
can
      * also cache that the key migration was done
@@ -188,12 +190,14 @@ public abstract class ConsensusKeyMigrationState
 
     private static final CacheLoader<Pair<ByteBuffer, UUID>, 
ConsensusMigratedAt> LOADING_FUNCTION = k -> 
SystemKeyspace.loadConsensusKeyMigrationState(k.left, k.right);
     private static final Weigher<Pair<ByteBuffer, UUID>, ConsensusMigratedAt> 
WEIGHER_FUNCTION = (k, v) -> EMPTY_KEY_SIZE + 
Ints.checkedCast(ByteBufferUtil.estimatedSizeOnHeap(k.left)) + VALUE_SIZE;
-    private static final LoadingCache<Pair<ByteBuffer, UUID>, 
ConsensusMigratedAt> MIGRATION_STATE_CACHE =
-        Caffeine.newBuilder()
-                
.maximumWeight(DatabaseDescriptor.getConsensusMigrationCacheSizeInMiB() << 20)
-                .weigher(WEIGHER_FUNCTION)
-                .executor(ImmediateExecutor.INSTANCE)
-                .build(LOADING_FUNCTION);
+
+    @VisibleForTesting
+    public static final LoadingCache<Pair<ByteBuffer, UUID>, 
ConsensusMigratedAt> MIGRATION_STATE_CACHE =
+            Caffeine.newBuilder()
+                    
.maximumWeight(DatabaseDescriptor.getConsensusMigrationCacheSizeInMiB() << 20)
+                    .weigher(WEIGHER_FUNCTION)
+                    .executor(ImmediateExecutor.INSTANCE)
+                    .build(LOADING_FUNCTION);
 
     public static final IVerbHandler<ConsensusKeyMigrationFinished> 
consensusKeyMigrationFinishedHandler = message -> {
         saveConsensusKeyMigrationLocally(message.payload.partitionKey, 
message.payload.tableId, message.payload.consensusMigratedAt);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
index 66cf1c0e00..1e76473eeb 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
@@ -99,12 +99,12 @@ public class AccordIncrementalRepairTest extends 
AccordTestBase
         private final List<ExecutedBarrier> barriers = new ArrayList<>();
 
         @Override
-        public void onLocalBarrier(@Nonnull Seekables<?, ?> keysOrRanges, 
@Nonnull Timestamp executeAt)
+        public void onLocalBarrier(@Nonnull Seekables<?, ?> keysOrRanges, 
@Nonnull TxnId txnId)
         {
-            super.onLocalBarrier(keysOrRanges, executeAt);
+            super.onLocalBarrier(keysOrRanges, txnId);
             synchronized (barriers)
             {
-                barriers.add(new ExecutedBarrier(keysOrRanges, executeAt));
+                barriers.add(new ExecutedBarrier(keysOrRanges, txnId));
             }
         }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
index a63642f819..97d35d79af 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
@@ -26,14 +26,13 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.collect.ImmutableList;
-
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -41,12 +40,14 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.Config.PaxosVariant;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.BufferDecoratedKey;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
@@ -70,9 +71,10 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.consensus.TransactionalMode;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusKeyMigrationState;
-import org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter;
+import org.apache.cassandra.service.consensus.migration.ConsensusMigratedAt;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationTarget;
+import org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter;
 import org.apache.cassandra.service.consensus.migration.TableMigrationState;
 import 
org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode;
 import org.apache.cassandra.service.paxos.Ballot;
@@ -87,15 +89,13 @@ import org.apache.cassandra.utils.ByteArrayUtil;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JsonUtils;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.PojoToString;
+import org.yaml.snakeyaml.Yaml;
 
+import static com.google.common.collect.ImmutableList.toImmutableList;
 import static java.lang.String.format;
 import static java.util.Collections.emptyList;
-import static com.google.common.collect.ImmutableList.toImmutableList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 import static org.apache.cassandra.Util.spinUntilSuccess;
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 import static org.apache.cassandra.db.SystemKeyspace.CONSENSUS_MIGRATION_STATE;
@@ -108,6 +108,9 @@ import static 
org.apache.cassandra.schema.SchemaConstants.SYSTEM_KEYSPACE_NAME;
 import static 
org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.ConsensusRoutingDecision.paxosV2;
 import static 
org.apache.cassandra.service.paxos.PaxosState.MaybePromise.Outcome.PROMISE;
 import static org.assertj.core.api.Fail.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 /*
  * This test suite is intended to serve as an integration test with some 
pretty good visibility into actual execution
@@ -296,7 +299,7 @@ public class AccordMigrationTest extends AccordTestBase
      * Helper to invoke a query and assert that the right metrics change 
indicating the correct
      * paths were taken to execute the query during migration
      */
-    private static void assertTargetAccordWrite(Consumer<Integer> query, int 
coordinatorIndex, int key, int expectedAccordWriteCount, int 
expectedCasWriteCount, int expectedKeyMigrationCount, int 
expectedCasBeginRejects, int expectedCasAcceptRejects)
+    private static void assertTargetAccordWrite(Consumer<Integer> query, int 
coordinatorIndex, int key, List<Pair<ByteBuffer, UUID>> expectedKeyMigrations, 
int expectedAccordWriteCount, int expectedCasWriteCount, int 
expectedKeyMigrationCount, int expectedCasBeginRejects, int 
expectedCasAcceptRejects)
     {
         int startingWriteCount = getAccordWriteCount(coordinatorIndex);
         int startingCasWriteCount = getCasWriteCount(coordinatorIndex);
@@ -304,6 +307,7 @@ public class AccordMigrationTest extends AccordTestBase
         int startingCasWriteBeginRejects = 
getCasWriteBeginRejects(coordinatorIndex);
         int startingCasWriteAcceptRejects = 
getCasWriteAcceptRejects(coordinatorIndex);
         query.accept(key);
+        validateKeyMigrations(expectedKeyMigrations);
         assertEquals("Accord writes", expectedAccordWriteCount, 
getAccordWriteCount(coordinatorIndex) - startingWriteCount);
         assertEquals("CAS writes", expectedCasWriteCount, 
getCasWriteCount(coordinatorIndex) - startingCasWriteCount);
         assertEquals("Key Migrations", expectedKeyMigrationCount, 
getKeyMigrationCount(coordinatorIndex) - startingKeyMigrationCount);
@@ -311,7 +315,7 @@ public class AccordMigrationTest extends AccordTestBase
         assertEquals("CAS Accept rejects", expectedCasAcceptRejects, 
getCasWriteAcceptRejects(coordinatorIndex) - startingCasWriteAcceptRejects);
     }
 
-    private static Object[][] assertTargetAccordRead(Function<Integer, 
Object[][]> query, int coordinatorIndex, int key, int expectedAccordReadCount, 
int expectedCasPrepareCount, int expectedKeyMigrationCount, int 
expectedCasReadBeginRejects, int expectedCasReadAcceptRejects)
+    private static Object[][] assertTargetAccordRead(Function<Integer, 
Object[][]> query, int coordinatorIndex, int key, List<Pair<ByteBuffer, UUID>> 
expectedKeyMigrations, int expectedAccordReadCount, int 
expectedCasPrepareCount, int expectedKeyMigrationCount, int 
expectedCasReadBeginRejects, int expectedCasReadAcceptRejects)
     {
         int startingReadCount = getAccordReadCount(coordinatorIndex);
         int startingCasPrepareCount = getCasPrepareCount(coordinatorIndex);
@@ -319,6 +323,7 @@ public class AccordMigrationTest extends AccordTestBase
         int startingCasReadBeginRejects = 
getCasReadBeginRejects(coordinatorIndex);
         int startingCasReadAcceptRejects = 
getCasReadAcceptRejects(coordinatorIndex);
         Object[][] result = query.apply(key);
+        validateKeyMigrations(expectedKeyMigrations);
         assertEquals("Accord reads", expectedAccordReadCount, 
getAccordReadCount(coordinatorIndex) - startingReadCount);
         assertEquals("CAS prepares", expectedCasPrepareCount, 
getCasPrepareCount(coordinatorIndex) - startingCasPrepareCount);
         assertEquals("Key Migrations", expectedKeyMigrationCount, 
getKeyMigrationCount(coordinatorIndex) - startingKeyMigrationCount);
@@ -327,7 +332,7 @@ public class AccordMigrationTest extends AccordTestBase
         return result;
     }
 
-    private static void assertTargetPaxosWrite(Consumer<Integer> query, int 
coordinatorIndex, int key, int expectedAccordWriteCount, int 
expectedCasWriteCount, int expectedKeyMigrationCount, int 
expectedMigrationRejects, int expectedSkippedReads)
+    private static void assertTargetPaxosWrite(Consumer<Integer> query, int 
coordinatorIndex, int key, List<Pair<ByteBuffer, UUID>> expectedKeyMigrations, 
int expectedAccordWriteCount, int expectedCasWriteCount, int 
expectedKeyMigrationCount, int expectedMigrationRejects, int 
expectedSkippedReads)
     {
         int startingWriteCount = getAccordWriteCount(coordinatorIndex);
         int startingCasWriteCount = getCasWriteCount(coordinatorIndex);
@@ -335,6 +340,7 @@ public class AccordMigrationTest extends AccordTestBase
         int startingMigrationRejectsCount = 
getAccordMigrationRejects(coordinatorIndex);
         int startingSkippedReadsCount = getAccordMigrationSkippedReads();
         query.accept(key);
+        validateKeyMigrations(expectedKeyMigrations);
         assertEquals("Accord writes", expectedAccordWriteCount, 
getAccordWriteCount(coordinatorIndex) - startingWriteCount);
         assertEquals("CAS writes", expectedCasWriteCount, 
getCasWriteCount(coordinatorIndex) - startingCasWriteCount);
         assertEquals("Key Migrations", expectedKeyMigrationCount, 
getKeyMigrationCount(coordinatorIndex) - startingKeyMigrationCount);
@@ -342,12 +348,60 @@ public class AccordMigrationTest extends AccordTestBase
         assertEquals("Accord skipped reads", expectedSkippedReads, 
getAccordMigrationSkippedReads() - startingSkippedReadsCount);
     }
 
+    private static void validateKeyMigrations(List<Pair<ByteBuffer, UUID>> 
expectedMigrations)
+    {
+        spinUntilSuccess(() -> {
+            try
+            {
+                List<byte[]> keys = expectedMigrations.stream().map(p -> 
p.left.array()).collect(Collectors.toList());
+                List<Integer> intKeys = expectedMigrations.stream().map(p -> 
ByteBufferUtil.toInt(p.left)).collect(Collectors.toList());
+                List<UUID> tables = expectedMigrations.stream().map(p -> 
p.right).collect(Collectors.toList());
+                for (int i = 1; i < SHARED_CLUSTER.size(); i++)
+                {
+                    int instanceIndex = i;
+                    IInvokableInstance instance = SHARED_CLUSTER.get(i);
+                    instance.runOnInstance(() -> {
+                        Map<Pair<ByteBuffer, UUID>, ConsensusMigratedAt> 
cacheMap = ConsensusKeyMigrationState.MIGRATION_STATE_CACHE.asMap();
+                        String cacheMessage = format("Instance %d Expected %s 
migrations but found in cache %s", instanceIndex, intKeys, cacheMap);
+                        assertEquals(cacheMessage, keys.size(), 
cacheMap.size());
+                        for (int j = 0; j < keys.size(); j++)
+                        {
+                            assertTrue(cacheMessage,
+                                       
cacheMap.containsKey(Pair.create(ByteBuffer.wrap(keys.get(j)), tables.get(j))));
+                        }
+
+                        UntypedResultSet result = 
QueryProcessor.executeInternal("SELECT * from " + SYSTEM_KEYSPACE_NAME + "." + 
CONSENSUS_MIGRATION_STATE);
+                        String tableMessage = format("Instance %d Expected %s 
migrations but found in system table %s", instanceIndex, intKeys, result);
+                        assertEquals(tableMessage, keys.size(), result.size());
+                        Iterator<UntypedResultSet.Row> resultIterator = 
result.iterator();
+                        for (int j = 0; j < result.size(); j++)
+                        {
+                            UntypedResultSet.Row row = resultIterator.next();
+                            boolean foundKey = false;
+                            for (byte[] expectedKey : keys)
+                                if 
(ByteBuffer.wrap(expectedKey).equals(row.getBytes("row_key")))
+                                    foundKey = true;
+                            assertTrue(tableMessage, foundKey);
+                        }
+                    });
+                }
+            }
+            catch (Throwable t)
+            {
+                // For some reason full stack trace wasn't displayed without 
rethrowing
+                throw new AssertionError(t);
+            }
+        });
+    }
+
     @Test
     public void testPaxosToAccordCAS() throws Exception
     {
         test(format(TABLE_FMT, qualifiedTableName),
           cluster -> {
+              List<Pair<ByteBuffer, UUID>> expectedKeyMigrations = new 
ArrayList<>();
               String table = tableName;
+              UUID tableUUID = cluster.get(1).callOnInstance(() -> 
ColumnFamilyStore.getIfExists(KEYSPACE, table).getTableId().asUUID());
               cluster.forEach(node -> node.runOnInstance(() -> {
                   TableMetadata tbl = 
Schema.instance.getTableMetadata(KEYSPACE, table);
                   Assert.assertEquals(TransactionalMode.off, 
tbl.params.transactionalMode);
@@ -373,24 +427,25 @@ public class AccordMigrationTest extends AccordTestBase
               List<Range<Token>> migratingRanges = 
ImmutableList.of(migratingRange);
 
               // Not actually migrating yet so should do nothing special
-              assertTargetAccordWrite(runCasNoApply, 1, migratingKey, 0, 1, 0, 
0, 0);
+              assertTargetAccordWrite(runCasNoApply, 1, migratingKey, 
expectedKeyMigrations, 0, 1, 0, 0, 0);
 
               // Mark ranges migrating and check migration state is correct
               nodetool(coordinator, "consensus_admin", "begin-migration", 
"-st", midToken.toString(), "-et", maxToken.toString(), "-tp", "accord", 
KEYSPACE, tableName);
               assertMigrationState(tableName, ConsensusMigrationTarget.accord, 
emptyList(), migratingRanges, 1);
 
               // Should be routed directly to Accord, and perform key 
migration, as well as key migration read in Accord
-              assertTargetAccordWrite(runCasNoApply, 1, migratingKey, 1, 0, 1, 
0, 0);
+              addExpectedMigratedKey(expectedKeyMigrations, migratingKey, 
tableUUID);
+              assertTargetAccordWrite(runCasNoApply, 1, migratingKey, 
expectedKeyMigrations, 1, 0, 1, 0, 0);
 
               // Should not repeat key migration, and should still do a 
migration read in Accord
-              assertTargetAccordWrite(runCasNoApply, 1, migratingKey, 1, 0, 0, 
0, 0);
+              assertTargetAccordWrite(runCasNoApply, 1, migratingKey, 
expectedKeyMigrations, 1, 0, 0, 0, 0);
 
               // Should run on Paxos since it is not in the migrating range
-              assertTargetAccordWrite(runCasNoApply, 1, notMigratingKey, 0, 1, 
0, 0, 0);
+              assertTargetAccordWrite(runCasNoApply, 1, notMigratingKey, 
expectedKeyMigrations, 0, 1, 0, 0, 0);
 
               // Check that the coordinator on the other node also has saved 
that the key migration was performed
               // and runs the query on Accord immediately without key migration
-              assertTargetAccordWrite(runCasOnSecondNode, 2, migratingKey, 1, 
0, 0, 0, 0);
+              assertTargetAccordWrite(runCasOnSecondNode, 2, migratingKey, 
expectedKeyMigrations, 1, 0, 0, 0, 0);
 
               // Forced repair while a node is down shouldn't work, use repair 
instead of finish-migration because repair exposes --force
               // and regular Cassandra repairs are eligible to drive migration 
so it's important they check --force and down nodes
@@ -411,10 +466,10 @@ public class AccordMigrationTest extends AccordTestBase
               assertMigrationState(tableName, ConsensusMigrationTarget.accord, 
migratingRanges, emptyList(), 0);
 
               // Should run on Accord, and not perform key migration nor 
should it need to perform a migration read in Accord now that it is repaired
-              assertTargetAccordWrite(runCasNoApply, 1, migratingKey, 1, 0, 0, 
0, 0);
+              assertTargetAccordWrite(runCasNoApply, 1, migratingKey, 
expectedKeyMigrations, 1, 0, 0, 0, 0);
 
               // Should run on Paxos, and not perform key migration
-              assertTargetAccordWrite(runCasNoApply, 1, notMigratingKey, 0, 1, 
0, 0, 0);
+              assertTargetAccordWrite(runCasNoApply, 1, notMigratingKey, 
expectedKeyMigrations, 0, 1, 0, 0, 0);
 
               // Pivot to testing repair with a subrange of the migrating 
range as well as key migration
               // Will use the unmigrated range between lowerMidToken and 
midToken
@@ -429,7 +484,8 @@ public class AccordMigrationTest extends AccordTestBase
               saveAcceptedPaxosProposal(tableName, ballotString, migratingKey);
               // PaxosRepair will have inserted a condition matching row, so 
it can apply, demonstrating repair and
               // key migration occurred
-              assertTargetAccordWrite(runCasApplies, 1, migratingKey, 1, 0, 1, 
0, 0);
+              addExpectedMigratedKey(expectedKeyMigrations, migratingKey, 
tableUUID);
+              assertTargetAccordWrite(runCasApplies, 1, migratingKey, 
expectedKeyMigrations, 1, 0, 1, 0, 0);
 
               // This will force the write to use the normal write patch
               cluster.get(1).runOnInstance(() -> 
ConsensusRequestRouter.setInstance(new 
PaxosToAccordMigrationNotHappeningUpToBegin()));
@@ -443,7 +499,8 @@ public class AccordMigrationTest extends AccordTestBase
               // This will force the request to run on Paxos up to Accept
               // and the accept will be rejected at both nodes and we are 
certain we need to retry the transaction
               cluster.get(1).runOnInstance(() -> 
ConsensusRequestRouter.setInstance(new 
PaxosToAccordMigrationNotHappeningUpToBegin()));
-              assertTargetAccordWrite(runCasApplies, 1, migratingKey, 1, 1, 1, 
0, 1);
+              addExpectedMigratedKey(expectedKeyMigrations, migratingKey, 
tableUUID);
+              assertTargetAccordWrite(runCasApplies, 1, migratingKey, 
expectedKeyMigrations, 1, 1, 1, 0, 1);
 
               // One node will now accept the other will reject and we are 
uncertain if we should retry the transaction
               // and should surface that as a timeout exception
@@ -467,7 +524,9 @@ public class AccordMigrationTest extends AccordTestBase
               // retry it on Accord
               cluster.get(1).runOnInstance(() -> 
ConsensusRequestRouter.setInstance(new RoutesToPaxosOnce()));
               // Should exit Paxos from begin, key migration should occur 
because it's a new key, and Accord will need to do a migration read
-              assertTargetAccordWrite(runCasNoApply, 1, testingKeys.next(), 1, 
1, 1, 1, 0);
+              migratingKey = testingKeys.next();
+              addExpectedMigratedKey(expectedKeyMigrations, migratingKey, 
tableUUID);
+              assertTargetAccordWrite(runCasNoApply, 1, migratingKey, 
expectedKeyMigrations,  1, 1, 1, 1, 0);
 
               // Now do two repairs to complete the migration repair, and we 
are done with black box integration testing
               // First repair is a range smack dab in the middle
@@ -494,6 +553,9 @@ public class AccordMigrationTest extends AccordTestBase
     {
         test(format(TABLE_FMT, qualifiedTableName),
           cluster -> {
+              String table = tableName;
+              UUID tableUUID = cluster.get(1).callOnInstance(() -> 
ColumnFamilyStore.getIfExists(KEYSPACE, table).getTableId().asUUID());
+              List<Pair<ByteBuffer, UUID>> expectedKeyMigrations = new 
ArrayList<>();
               cluster.schemaChange(format("ALTER TABLE %s.%s WITH 
transactional_mode='%s'", KEYSPACE, tableName, TransactionalMode.full));
               String readCQL = format("SELECT * FROM %s WHERE id = ? and c = 
%s", qualifiedTableName, CLUSTERING_VALUE);
               Function<Integer, Object[][]> runRead = key -> 
cluster.coordinator(1).execute(readCQL, SERIAL, key);
@@ -501,17 +563,21 @@ public class AccordMigrationTest extends AccordTestBase
               List<Range<Token>> migratingRanges = 
ImmutableList.of(migratingRange);
               int key = 0;
 
-              assertTargetAccordRead(runRead, 1, 0, 0, 1, 0, 0, 0);
+              assertTargetAccordRead(runRead, 1, key, expectedKeyMigrations, 
0, 1, 0, 0, 0);
               // Mark wrap around range as migrating
               nodetool(coordinator, "consensus_admin", "begin-migration", 
"-st", String.valueOf(Long.MIN_VALUE + 1), "-et", 
String.valueOf(Long.MIN_VALUE), "-tp", "accord", KEYSPACE, tableName);
               assertMigrationState(tableName, ConsensusMigrationTarget.accord, 
emptyList(), migratingRanges, 1);
               // Should run directly on accord, migrate the key, and perform a 
quorum read from Accord, Paxos repair will run prepare once
-              assertTargetAccordRead(runRead, 1, key++, 1, 1, 1, 0, 0);
+              addExpectedMigratedKey(expectedKeyMigrations, key, tableUUID);
+              assertTargetAccordRead(runRead, 1, key, expectedKeyMigrations, 
1, 1, 1, 0, 0);
+              key++;
 
               // Should run up to accept with both nodes refusing to accept
               savePromisedAndCommittedPaxosProposal(tableName, key);
               cluster.get(1).runOnInstance(() -> 
ConsensusRequestRouter.setInstance(new 
PaxosToAccordMigrationNotHappeningUpToBegin()));
-              assertTargetAccordRead(runRead, 1, key++, 1, 2, 1, 0, 1);
+              addExpectedMigratedKey(expectedKeyMigrations, key, tableUUID);
+              assertTargetAccordRead(runRead, 1, key, expectedKeyMigrations, 
1, 2, 1, 0, 1);
+              key++;
           });
     }
 
@@ -542,6 +608,7 @@ public class AccordMigrationTest extends AccordTestBase
                  String casCQL = format(CAS_FMT, qualifiedTableName, 
CLUSTERING_VALUE);
                  Consumer<Integer> runCasNoApply = key -> 
assertRowEquals(cluster, new Object[]{false}, casCQL, key);
                  String tableName = qualifiedTableName.split("\\.")[1];
+                 UUID tableUUID = cluster.get(1).callOnInstance(() -> 
ColumnFamilyStore.getIfExists(KEYSPACE, tableName).getTableId().asUUID());
 
                  alterTableTransactionalMode(TransactionalMode.mixed_reads);
                  assertTransactionalModes(TransactionalMode.mixed_reads, 
TransactionalMigrationFromMode.off);
@@ -562,25 +629,33 @@ public class AccordMigrationTest extends AccordTestBase
                  Iterator<Integer> paxosMigratingKeys = 
getKeysBetweenTokens(upperMidToken, maxToken);
                  Iterator<Integer> accordKeys = getKeysBetweenTokens(midToken, 
upperMidToken);
 
+                 List<Pair<ByteBuffer, UUID>> expectedKeyMigrations = new 
ArrayList<>();
+
                  // Paxos non-migrating keys should run on Paxos as per normal
-                 assertTargetPaxosWrite(runCasNoApply, 1, 
paxosNonMigratingKeys.next(), 0, 1, 0, 0, 0);
+                 assertTargetPaxosWrite(runCasNoApply, 1, 
paxosNonMigratingKeys.next(), expectedKeyMigrations, 0, 1, 0, 0, 0);
 
+                 Integer nextMigratingKey = paxosMigratingKeys.next();
+                 addExpectedMigratedKey(expectedKeyMigrations, 
nextMigratingKey, tableUUID);
                  // Paxos migrating keys should be key migrated which means a 
local barrier is run by Paxos during read at each replica, the key migration 
barrier is also counted as a write
-                 assertTargetPaxosWrite(runCasNoApply, 1, 
paxosMigratingKeys.next(), 1, 1, 1, 0, 0);
+                 assertTargetPaxosWrite(runCasNoApply, 1, nextMigratingKey, 
expectedKeyMigrations, 1, 1, 1, 0, 0);
 
                  // A key from a range migrated to Accord is now not 
migrating/migrated and should be accessed through Accord
-                 assertTargetPaxosWrite(runCasNoApply, 1, accordKeys.next(), 
1, 0, 0, 0, 0);
+                 assertTargetPaxosWrite(runCasNoApply, 1, accordKeys.next(), 
expectedKeyMigrations, 1, 0, 0, 0, 0);
 
                  // If an Accord transaction races with cluster metadata 
updates it should be rejected if the epoch it runs in contains the migration
                  cluster.get(1).runOnInstance(() -> 
ConsensusRequestRouter.setInstance(new RoutesToAccordOnce()));
-                 assertTargetPaxosWrite(runCasNoApply, 1, 
paxosMigratingKeys.next(), 2, 1, 1, 1, 1);
+                 nextMigratingKey = paxosMigratingKeys.next();
+                 addExpectedMigratedKey(expectedKeyMigrations, 
nextMigratingKey, tableUUID);
+                 assertTargetPaxosWrite(runCasNoApply, 1, nextMigratingKey, 
expectedKeyMigrations, 2, 1, 1, 1, 1);
 
                  // Repair the currently migrating range from when targets 
were switched, but it's not an Accord repair, this is to make sure the wrong 
repair type doesn't trigger progress
                  nodetool(coordinator, "repair", "-st", 
upperMidToken.toString(), "-et", maxAlignedWithLocalRanges.toString(), 
"--paxos-only");
                  assertMigrationState(tableName, 
ConsensusMigrationTarget.paxos, ImmutableList.of(new Range(minToken, midToken), 
new Range(maxToken, minToken)), ImmutableList.of(accordMigratingRange), 1);
 
                  // Paxos migrating keys should still need key migration after 
non-Accord repair
-                 assertTargetPaxosWrite(runCasNoApply, 1, 
paxosMigratingKeys.next(), 1, 1, 1, 0, 0);
+                 nextMigratingKey = paxosMigratingKeys.next();
+                 addExpectedMigratedKey(expectedKeyMigrations, 
nextMigratingKey, tableUUID);
+                 assertTargetPaxosWrite(runCasNoApply, 1, nextMigratingKey, 
expectedKeyMigrations, 1, 1, 1, 0, 0);
 
                  // Now do it with an Accord repair so key migration shouldn't 
be necessary
                  nodetool(coordinator, "consensus_admin", "finish-migration", 
"-st", upperMidToken.toString(), "-et", maxAlignedWithLocalRanges.toString());
@@ -591,40 +666,16 @@ public class AccordMigrationTest extends AccordTestBase
                  assertMigrationState(tableName, 
ConsensusMigrationTarget.paxos, ImmutableList.of(new Range(minToken, midToken), 
repairedRange, new Range(maxToken, minToken)), 
ImmutableList.of(remainingRange), 1);
 
                  // Paxos migrating keys shouldn't need key migration after 
Accord repair
-                 assertTargetPaxosWrite(runCasNoApply, 1, 
paxosMigratingKeys.next(), 0, 1, 0, 0, 0);
+                 assertTargetPaxosWrite(runCasNoApply, 1, 
paxosMigratingKeys.next(), expectedKeyMigrations, 0, 1, 0, 0, 0);
              });
     }
 
-    private static void assertCompletedMigrationState(String tableName) throws 
Throwable
+    private static void addExpectedMigratedKey(List<Pair<ByteBuffer, UUID>> 
expectedKeyMigrations, Integer nextMigratingKey, UUID tableUUID)
     {
-        // Validate nodetool consensus admin list output
-        String yamlResultString = nodetool(SHARED_CLUSTER.coordinator(1), 
"consensus_admin", "list");
-        Map<String, Object> yamlStateMap = new Yaml().load(yamlResultString);
-        String minifiedYamlResultString = 
nodetool(SHARED_CLUSTER.coordinator(1), "consensus_admin", "list", "-f", 
"minified-yaml");
-        Map<String, Object> minifiedYamlStateMap = new 
Yaml().load(minifiedYamlResultString);
-        String jsonResultString = nodetool(SHARED_CLUSTER.coordinator(1), 
"consensus_admin", "list", "-f", "json");
-        Map<String, Object> jsonStateMap = 
JsonUtils.JSON_OBJECT_MAPPER.readValue(jsonResultString, new 
TypeReference<Map<String, Object>>(){});
-        String minifiedJsonResultString = 
nodetool(SHARED_CLUSTER.coordinator(1), "consensus_admin", "list", "-f", 
"minified-json");
-        Map<String, Object> minifiedJsonStateMap = 
JsonUtils.JSON_OBJECT_MAPPER.readValue(minifiedJsonResultString, new 
TypeReference<Map<String, Object>>(){});
-
-        for (Map<String, Object> migrationStateMap : 
ImmutableList.of(yamlStateMap, jsonStateMap, minifiedYamlStateMap, 
minifiedJsonStateMap)) {
-            assertEquals(PojoToString.CURRENT_VERSION, 
migrationStateMap.get("version"));
-            assertTrue(Epoch.EMPTY.getEpoch() < ((Number) 
migrationStateMap.get("lastModifiedEpoch")).longValue());
-            List<Map<String, Object>> tableStates = (List<Map<String, 
Object>>) migrationStateMap.get("tableStates");
-            assertEquals(0, tableStates.size());
-        }
 
-        spinUntilSuccess(() -> {
-            for (IInvokableInstance instance : SHARED_CLUSTER)
-            {
-                ConsensusMigrationState snapshot = 
getMigrationStateSnapshot(instance);
-                assertEquals(0, snapshot.tableStates.size());
-                instance.runOnInstance(() -> {
-                    TableMetadata tbl = 
Schema.instance.getTableMetadata(KEYSPACE, tableName);
-                    Assert.assertEquals(TransactionalMigrationFromMode.none, 
tbl.params.transactionalMigrationFrom);
-                });
-            }
-        });
+        ByteBuffer key = ByteBuffer.allocate(4);
+        key.putInt(0, nextMigratingKey);
+        expectedKeyMigrations.add(Pair.create(key, tableUUID));
     }
 
     private static void assertMigrationState(String tableName, 
ConsensusMigrationTarget target, List<Range<Token>> migratedRanges, 
List<Range<Token>> migratingRanges, int numMigratingEpochs) throws Throwable


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to