This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 237208e850ae384c32558cf1da7cc1854ef25444 Author: David Capwell <dcapw...@apache.org> AuthorDate: Mon Dec 19 16:07:20 2022 -0800 Refactor AccordTestBase to block retries on non-idempotent transactions. Some tests may be flaky now due to Preempted being thrown. --- .../cql3/statements/ModificationStatement.java | 12 ++++ .../cql3/statements/TransactionStatement.java | 5 ++ .../cql3/transactions/ReferenceOperation.java | 10 +++ .../distributed/test/accord/AccordCQLTest.java | 10 +-- .../distributed/test/accord/AccordTestBase.java | 71 ++++++++++++++++++++-- 5 files changed, 97 insertions(+), 11 deletions(-) diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index abf393f811..41ae242cf9 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -809,6 +809,18 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa return new TxnReferenceOperations(metadata, clustering, regularOps, staticOps); } + @VisibleForTesting + public void migrateReadRequiredOperations() + { + operations.migrateReadRequiredOperations(); + } + + @VisibleForTesting + public List<ReferenceOperation> getSubstitutions() + { + return operations.allSubstitutions(); + } + public TxnWrite.Fragment getTxnWriteFragment(int index, ClientState state, QueryOptions options) { // When an Operation requires a read, this cannot be done right away and must be done by the transaction itself, diff --git a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java index baead627dc..4d3c732da0 100644 --- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java @@ -131,6 +131,11 @@ public class TransactionStatement implements CQLStatement this.bindVariables = bindVariables; } + public List<ModificationStatement> getUpdates() + { + return updates; + } + @Override public List<ColumnSpecification> getBindVariables() { diff --git a/src/java/org/apache/cassandra/cql3/transactions/ReferenceOperation.java b/src/java/org/apache/cassandra/cql3/transactions/ReferenceOperation.java index e89e616545..d4858eec8e 100644 --- a/src/java/org/apache/cassandra/cql3/transactions/ReferenceOperation.java +++ b/src/java/org/apache/cassandra/cql3/transactions/ReferenceOperation.java @@ -76,6 +76,16 @@ public class ReferenceOperation return new ReferenceOperation(receiver, kind, key, field, value); } + public TxnReferenceOperation.Kind getKind() + { + return kind; + } + + public ReferenceValue getValue() + { + return value; + } + public ColumnMetadata getReceiver() { return receiver; diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java index 50318a00d1..1315bed14f 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java @@ -400,7 +400,7 @@ public class AccordCQLTest extends AccordTestBase " SELECT row1.v;\n" + " UPDATE " + currentTable + " SET v " + operation + " 1 WHERE k = 1;\n" + "COMMIT TRANSACTION"; - assertRowEqualsWithPreemptedRetry(cluster, new Object[] { startingValue }, update); + assertRowEquals(cluster, new Object[] { startingValue }, update); String check = "BEGIN TRANSACTION\n" + " SELECT v FROM " + currentTable + " WHERE k = 1;\n" + @@ -1424,7 +1424,7 @@ public class AccordCQLTest extends AccordTestBase " SELECT row0.counter, row0.other_counter;\n" + " UPDATE " + currentTable + " SET other_counter += 1, counter += row0.counter WHERE k = 0 AND c = 1;\n" + "COMMIT TRANSACTION"; - assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 1, 1 }, update); + assertRowEquals(cluster, new Object[] { 1, 1 }, update); String check = "BEGIN TRANSACTION\n" + " SELECT counter, other_counter FROM " + currentTable + " WHERE k = 0 AND c = 1;\n" + @@ -1448,7 +1448,7 @@ public class AccordCQLTest extends AccordTestBase " UPDATE " + currentTable + " SET int_list[0] = 42 WHERE k = 0 AND c = 0;\n" + " UPDATE " + currentTable + " SET counter += 1 WHERE k = 0 AND c = 0;\n" + "COMMIT TRANSACTION"; - assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 0, Arrays.asList(1, 2) }, update); + assertRowEquals(cluster, new Object[] { 0, Arrays.asList(1, 2) }, update); String check = "BEGIN TRANSACTION\n" + " SELECT counter, int_list FROM " + currentTable + " WHERE k = 0 AND c = 0;\n" + @@ -2255,7 +2255,7 @@ public class AccordCQLTest extends AccordTestBase " UPDATE demo_ks.user_docs SET title='slides.key', permissions=777 WHERE user='scott' AND doc_id=101;\n" + " END IF\n" + "COMMIT TRANSACTION"; - assertRowEqualsWithPreemptedRetry(SHARED_CLUSTER, new Object[] { 5 }, addDoc); + assertRowEquals(SHARED_CLUSTER, new Object[] { 5 }, addDoc); String addUser = "BEGIN TRANSACTION\n" + " LET demo_doc = (SELECT * FROM demo_ks.org_docs WHERE org_name='demo' LIMIT 1);\n" + @@ -2267,7 +2267,7 @@ public class AccordCQLTest extends AccordTestBase " UPDATE demo_ks.user_docs SET title='slides.key', permissions=777 WHERE user='benedict' AND doc_id=101;\n" + " END IF\n" + "COMMIT TRANSACTION"; - assertRowEqualsWithPreemptedRetry(SHARED_CLUSTER, new Object[] { 6 }, addUser); + assertRowEquals(SHARED_CLUSTER, new Object[] { 6 }, addUser); } // TODO: Implement support for basic arithmetic on references in INSERT diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java index 2c9b2a46ae..38cca646fc 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java @@ -25,17 +25,20 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; +import accord.primitives.Txn; import net.bytebuddy.ByteBuddy; import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; import net.bytebuddy.implementation.MethodDelegation; import net.bytebuddy.implementation.bind.annotation.SuperCall; import net.bytebuddy.implementation.bind.annotation.This; +import org.apache.cassandra.cql3.transactions.ReferenceValue; import org.apache.cassandra.distributed.api.QueryResults; import org.apache.cassandra.distributed.util.QueryResultUtil; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import accord.coordinate.Preempted; import org.apache.cassandra.cql3.statements.ModificationStatement; @@ -46,8 +49,10 @@ import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.SimpleQueryResult; import org.apache.cassandra.distributed.test.TestBaseImpl; import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.AccordTestUtils; import org.apache.cassandra.utils.AssertionUtils; import org.apache.cassandra.utils.FailingConsumer; +import org.assertj.core.util.Arrays; import static net.bytebuddy.matcher.ElementMatchers.named; import static org.junit.Assert.assertArrayEquals; @@ -59,6 +64,7 @@ public abstract class AccordTestBase extends TestBaseImpl protected static Cluster SHARED_CLUSTER; protected String currentTable; + private final Logger logger = LoggerFactory.getLogger(getClass()); @BeforeClass public static void setupClass() throws IOException @@ -125,39 +131,92 @@ public abstract class AccordTestBase extends TestBaseImpl .start()); } + private static SimpleQueryResult execute(Cluster cluster, String check, Object... boundValues) + { + return cluster.coordinator(1).executeWithResult(check, ConsistencyLevel.ANY, boundValues); + } + + protected static SimpleQueryResult assertRowEquals(Cluster cluster, SimpleQueryResult expected, String check, Object... boundValues) + { + SimpleQueryResult result = execute(cluster, check, boundValues); + QueryResultUtil.assertThat(result).isEqualTo(expected); + return result; + } + + protected static SimpleQueryResult assertRowEquals(Cluster cluster, Object[] row, String check, Object... boundValues) + { + return assertRowEquals(cluster, QueryResults.builder().row(row).build(), check, boundValues); + } + // TODO: Retry on preemption may become unnecessary after the Unified Log is integrated. - protected static SimpleQueryResult assertRowEqualsWithPreemptedRetry(Cluster cluster, Object[] row, String check, Object... boundValues) + protected SimpleQueryResult assertRowEqualsWithPreemptedRetry(Cluster cluster, Object[] row, String check, Object... boundValues) { return assertRowWithPreemptedRetry(cluster, QueryResults.builder().row(row).build(), check, boundValues); } - protected static SimpleQueryResult assertEmptyWithPreemptedRetry(Cluster cluster, String check, Object... boundValues) + protected SimpleQueryResult assertEmptyWithPreemptedRetry(Cluster cluster, String check, Object... boundValues) { return assertRowWithPreemptedRetry(cluster, QueryResults.builder().build(), check, boundValues); } - private static SimpleQueryResult assertRowWithPreemptedRetry(Cluster cluster, SimpleQueryResult expected, String check, Object... boundValues) + private SimpleQueryResult assertRowWithPreemptedRetry(Cluster cluster, SimpleQueryResult expected, String check, Object... boundValues) { SimpleQueryResult result = executeWithRetry(cluster, check, boundValues); QueryResultUtil.assertThat(result).isEqualTo(expected); return result; } - protected static SimpleQueryResult executeWithRetry(Cluster cluster, String check, Object... boundValues) + private SimpleQueryResult executeWithRetry0(int count, Cluster cluster, String check, Object... boundValues) { try { - return cluster.coordinator(1).executeWithResult(check, ConsistencyLevel.ANY, boundValues); + return execute(cluster, check, boundValues); } catch (Throwable t) { if (AssertionUtils.rootCauseIs(Preempted.class).matches(t)) - return executeWithRetry(cluster, check, boundValues); + { + logger.warn("[Retry attempt={}] Preempted failure for {}", count, check); + return executeWithRetry0(count + 1, cluster, check, boundValues); + } throw t; } } + protected SimpleQueryResult executeWithRetry(Cluster cluster, String check, Object... boundValues) + { + // is this method safe? + cluster.get(1).runOnInstance(() -> { + TransactionStatement stmt = AccordTestUtils.parse(check); + if (!isIdempotent(stmt)) + throw new AssertionError("Unable to retry txn that is not idempotent: cql=" + check); + }); + return executeWithRetry0(0, cluster, check, boundValues); + } + + public static boolean isIdempotent(TransactionStatement statement) + { + for (ModificationStatement update : statement.getUpdates()) + { + if (!isIdempotent(update)) + return false; + } + return true; + } + + private static boolean isIdempotent(ModificationStatement update) + { + update.migrateReadRequiredOperations(); + // ReferenceValue.Constant is used during migration, which means a case like "a += 1" + // ReferenceValue.Substitution uses a LET reference, so rerunning would always just see the new state + long numConstants = update.getSubstitutions().stream() + .filter(f -> f.getValue() instanceof ReferenceValue.Constant) + .filter(f -> !f.getKind().name().contains("Setter")) + .count(); + return numConstants == 0; + } + public static class EnforceUpdateDoesNotPerformRead { public static void install(ClassLoader classLoader, Integer num) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org