This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 237208e850ae384c32558cf1da7cc1854ef25444
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Mon Dec 19 16:07:20 2022 -0800

    Refactor AccordTestBase to block retries on non-idempotent transactions.
    Some tests may be flaky now due to Preempted being thrown.
---
 .../cql3/statements/ModificationStatement.java     | 12 ++++
 .../cql3/statements/TransactionStatement.java      |  5 ++
 .../cql3/transactions/ReferenceOperation.java      | 10 +++
 .../distributed/test/accord/AccordCQLTest.java     | 10 +--
 .../distributed/test/accord/AccordTestBase.java    | 71 ++++++++++++++++++++--
 5 files changed, 97 insertions(+), 11 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index abf393f811..41ae242cf9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -809,6 +809,18 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
         return new TxnReferenceOperations(metadata, clustering, regularOps, 
staticOps);
     }
 
+    @VisibleForTesting
+    public void migrateReadRequiredOperations()
+    {
+        operations.migrateReadRequiredOperations();
+    }
+
+    @VisibleForTesting
+    public List<ReferenceOperation> getSubstitutions()
+    {
+        return operations.allSubstitutions();
+    }
+
     public TxnWrite.Fragment getTxnWriteFragment(int index, ClientState state, 
QueryOptions options)
     {
         // When an Operation requires a read, this cannot be done right away 
and must be done by the transaction itself,
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
index baead627dc..4d3c732da0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
@@ -131,6 +131,11 @@ public class TransactionStatement implements CQLStatement
         this.bindVariables = bindVariables;
     }
 
+    public List<ModificationStatement> getUpdates()
+    {
+        return updates;
+    }
+
     @Override
     public List<ColumnSpecification> getBindVariables()
     {
diff --git 
a/src/java/org/apache/cassandra/cql3/transactions/ReferenceOperation.java 
b/src/java/org/apache/cassandra/cql3/transactions/ReferenceOperation.java
index e89e616545..d4858eec8e 100644
--- a/src/java/org/apache/cassandra/cql3/transactions/ReferenceOperation.java
+++ b/src/java/org/apache/cassandra/cql3/transactions/ReferenceOperation.java
@@ -76,6 +76,16 @@ public class ReferenceOperation
         return new ReferenceOperation(receiver, kind, key, field, value);
     }
 
+    public TxnReferenceOperation.Kind getKind()
+    {
+        return kind;
+    }
+
+    public ReferenceValue getValue()
+    {
+        return value;
+    }
+
     public ColumnMetadata getReceiver()
     {
         return receiver;
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
index 50318a00d1..1315bed14f 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
@@ -400,7 +400,7 @@ public class AccordCQLTest extends AccordTestBase
                                  "  SELECT row1.v;\n" +
                                  "  UPDATE " + currentTable + " SET v " + 
operation + " 1 WHERE k = 1;\n" +
                                  "COMMIT TRANSACTION";
-                 assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 
startingValue }, update);
+                 assertRowEquals(cluster, new Object[] { startingValue }, 
update);
 
                  String check = "BEGIN TRANSACTION\n" +
                                 "  SELECT v FROM " + currentTable + " WHERE k 
= 1;\n" +
@@ -1424,7 +1424,7 @@ public class AccordCQLTest extends AccordTestBase
                                  "  SELECT row0.counter, 
row0.other_counter;\n" +
                                  "  UPDATE " + currentTable + " SET 
other_counter += 1, counter += row0.counter WHERE k = 0 AND c = 1;\n" +
                                  "COMMIT TRANSACTION";
-                 assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 1, 
1 }, update);
+                 assertRowEquals(cluster, new Object[] { 1, 1 }, update);
 
                  String check = "BEGIN TRANSACTION\n" +
                                 "  SELECT counter, other_counter FROM " + 
currentTable + " WHERE k = 0 AND c = 1;\n" +
@@ -1448,7 +1448,7 @@ public class AccordCQLTest extends AccordTestBase
                                  "  UPDATE " + currentTable + " SET 
int_list[0] = 42 WHERE k = 0 AND c = 0;\n" +
                                  "  UPDATE " + currentTable + " SET counter += 
1 WHERE k = 0 AND c = 0;\n" +
                                  "COMMIT TRANSACTION";
-                 assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 0, 
Arrays.asList(1, 2) }, update);
+                 assertRowEquals(cluster, new Object[] { 0, Arrays.asList(1, 
2) }, update);
 
                  String check = "BEGIN TRANSACTION\n" +
                                 "  SELECT counter, int_list FROM " + 
currentTable + " WHERE k = 0 AND c = 0;\n" +
@@ -2255,7 +2255,7 @@ public class AccordCQLTest extends AccordTestBase
                         "    UPDATE demo_ks.user_docs SET title='slides.key', 
permissions=777 WHERE user='scott' AND doc_id=101;\n" +
                         "  END IF\n" +
                         "COMMIT TRANSACTION";
-        assertRowEqualsWithPreemptedRetry(SHARED_CLUSTER, new Object[] { 5 }, 
addDoc);
+        assertRowEquals(SHARED_CLUSTER, new Object[] { 5 }, addDoc);
 
         String addUser = "BEGIN TRANSACTION\n" +
                          "  LET demo_doc = (SELECT * FROM demo_ks.org_docs 
WHERE org_name='demo' LIMIT 1);\n" +
@@ -2267,7 +2267,7 @@ public class AccordCQLTest extends AccordTestBase
                          "    UPDATE demo_ks.user_docs SET title='slides.key', 
permissions=777 WHERE user='benedict' AND doc_id=101;\n" +
                          "  END IF\n" +
                          "COMMIT TRANSACTION";
-        assertRowEqualsWithPreemptedRetry(SHARED_CLUSTER, new Object[] { 6 }, 
addUser);
+        assertRowEquals(SHARED_CLUSTER, new Object[] { 6 }, addUser);
     }
 
     // TODO: Implement support for basic arithmetic on references in INSERT
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
index 2c9b2a46ae..38cca646fc 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
@@ -25,17 +25,20 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import accord.primitives.Txn;
 import net.bytebuddy.ByteBuddy;
 import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import net.bytebuddy.implementation.bind.annotation.This;
+import org.apache.cassandra.cql3.transactions.ReferenceValue;
 import org.apache.cassandra.distributed.api.QueryResults;
 import org.apache.cassandra.distributed.util.QueryResultUtil;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import accord.coordinate.Preempted;
 import org.apache.cassandra.cql3.statements.ModificationStatement;
@@ -46,8 +49,10 @@ import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.AccordTestUtils;
 import org.apache.cassandra.utils.AssertionUtils;
 import org.apache.cassandra.utils.FailingConsumer;
+import org.assertj.core.util.Arrays;
 
 import static net.bytebuddy.matcher.ElementMatchers.named;
 import static org.junit.Assert.assertArrayEquals;
@@ -59,6 +64,7 @@ public abstract class AccordTestBase extends TestBaseImpl
     protected static Cluster SHARED_CLUSTER;
     
     protected String currentTable;
+    private final Logger logger = LoggerFactory.getLogger(getClass());
 
     @BeforeClass
     public static void setupClass() throws IOException
@@ -125,39 +131,92 @@ public abstract class AccordTestBase extends TestBaseImpl
                            .start());
     }
 
+    private static SimpleQueryResult execute(Cluster cluster, String check, 
Object... boundValues)
+    {
+        return cluster.coordinator(1).executeWithResult(check, 
ConsistencyLevel.ANY, boundValues);
+    }
+
+    protected static SimpleQueryResult assertRowEquals(Cluster cluster, 
SimpleQueryResult expected, String check, Object... boundValues)
+    {
+        SimpleQueryResult result = execute(cluster, check, boundValues);
+        QueryResultUtil.assertThat(result).isEqualTo(expected);
+        return result;
+    }
+
+    protected static SimpleQueryResult assertRowEquals(Cluster cluster, 
Object[] row, String check, Object... boundValues)
+    {
+        return assertRowEquals(cluster, 
QueryResults.builder().row(row).build(), check, boundValues);
+    }
+
     // TODO: Retry on preemption may become unnecessary after the Unified Log 
is integrated.
-    protected static SimpleQueryResult 
assertRowEqualsWithPreemptedRetry(Cluster cluster, Object[] row, String check, 
Object... boundValues)
+    protected SimpleQueryResult assertRowEqualsWithPreemptedRetry(Cluster 
cluster, Object[] row, String check, Object... boundValues)
     {
         return assertRowWithPreemptedRetry(cluster, 
QueryResults.builder().row(row).build(), check, boundValues);
     }
 
-    protected static SimpleQueryResult assertEmptyWithPreemptedRetry(Cluster 
cluster, String check, Object... boundValues)
+    protected SimpleQueryResult assertEmptyWithPreemptedRetry(Cluster cluster, 
String check, Object... boundValues)
     {
         return assertRowWithPreemptedRetry(cluster, 
QueryResults.builder().build(), check, boundValues);
     }
 
-    private static SimpleQueryResult assertRowWithPreemptedRetry(Cluster 
cluster, SimpleQueryResult expected, String check, Object... boundValues)
+    private SimpleQueryResult assertRowWithPreemptedRetry(Cluster cluster, 
SimpleQueryResult expected, String check, Object... boundValues)
     {
         SimpleQueryResult result = executeWithRetry(cluster, check, 
boundValues);
         QueryResultUtil.assertThat(result).isEqualTo(expected);
         return result;
     }
 
-    protected static SimpleQueryResult executeWithRetry(Cluster cluster, 
String check, Object... boundValues)
+    private SimpleQueryResult executeWithRetry0(int count, Cluster cluster, 
String check, Object... boundValues)
     {
         try
         {
-            return cluster.coordinator(1).executeWithResult(check, 
ConsistencyLevel.ANY, boundValues);
+            return execute(cluster, check, boundValues);
         }
         catch (Throwable t)
         {
             if (AssertionUtils.rootCauseIs(Preempted.class).matches(t))
-                return executeWithRetry(cluster, check, boundValues);
+            {
+                logger.warn("[Retry attempt={}] Preempted failure for {}", 
count, check);
+                return executeWithRetry0(count + 1, cluster, check, 
boundValues);
+            }
 
             throw t;
         }
     }
 
+    protected SimpleQueryResult executeWithRetry(Cluster cluster, String 
check, Object... boundValues)
+    {
+        // is this method safe?
+        cluster.get(1).runOnInstance(() -> {
+            TransactionStatement stmt = AccordTestUtils.parse(check);
+            if (!isIdempotent(stmt))
+                throw new AssertionError("Unable to retry txn that is not 
idempotent: cql=" + check);
+        });
+        return executeWithRetry0(0, cluster, check, boundValues);
+    }
+
+    public static boolean isIdempotent(TransactionStatement statement)
+    {
+        for (ModificationStatement update : statement.getUpdates())
+        {
+            if (!isIdempotent(update))
+                return false;
+        }
+        return true;
+    }
+
+    private static boolean isIdempotent(ModificationStatement update)
+    {
+        update.migrateReadRequiredOperations();
+        // ReferenceValue.Constant is used during migration, which means a 
case like "a += 1"
+        // ReferenceValue.Substitution uses a LET reference, so rerunning 
would always just see the new state
+        long numConstants = update.getSubstitutions().stream()
+                                  .filter(f -> f.getValue() instanceof 
ReferenceValue.Constant)
+                                  .filter(f -> 
!f.getKind().name().contains("Setter"))
+                                  .count();
+        return numConstants == 0;
+    }
+
     public static class EnforceUpdateDoesNotPerformRead
     {
         public static void install(ClassLoader classLoader, Integer num)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to