This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 0f4c6cec2851482e12fba1fe089a3452f5601548
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Mon Dec 19 13:03:41 2022 -0800

    Ninja for CASSANDRA-17719: 
accord.primitives.Range#someIntersectingRoutingKey was added but does not work 
in all cases in C* due to sentinal values, added logic to return a C* friendly 
token
---
 .../cassandra/service/accord/TokenRange.java       |   9 +
 .../service/accord/api/AccordRoutingKey.java       |   9 +
 .../distributed/test/accord/AccordCQLTest.java     | 154 +++++++++++++++
 .../test/accord/AccordIntegrationTest.java         | 208 ---------------------
 .../distributed/test/accord/AccordTestBase.java    |  10 +-
 5 files changed, 181 insertions(+), 209 deletions(-)

diff --git a/src/java/org/apache/cassandra/service/accord/TokenRange.java 
b/src/java/org/apache/cassandra/service/accord/TokenRange.java
index 22683d4b8e..7fb1ca8f34 100644
--- a/src/java/org/apache/cassandra/service/accord/TokenRange.java
+++ b/src/java/org/apache/cassandra/service/accord/TokenRange.java
@@ -47,6 +47,15 @@ public class TokenRange extends Range.EndInclusive
         return new TokenRange((AccordRoutingKey) start, (AccordRoutingKey) 
end);
     }
 
+    @Override
+    public RoutingKey someIntersectingRoutingKey()
+    {
+        RoutingKey pick = startInclusive() ? start() : end();
+        if (pick instanceof SentinelKey)
+            pick = ((SentinelKey) pick).toTokenKey();
+        return pick;
+    }
+
     public static final IVersionedSerializer<TokenRange> serializer = new 
IVersionedSerializer<TokenRange>()
     {
         @Override
diff --git 
a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java 
b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
index 055658a503..e9c600ae4a 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
@@ -25,6 +25,7 @@ import accord.api.Key;
 import accord.api.RoutingKey;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -95,6 +96,14 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
             return new SentinelKey(tableId, false);
         }
 
+        public TokenKey toTokenKey()
+        {
+            IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
+            return new TokenKey(tableId, isMin ?
+                                         
partitioner.getMinimumToken().increaseSlightly() :
+                                         
partitioner.getMaximumToken().decreaseSlightly());
+        }
+
         @Override
         public Token token()
         {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
index 1909fb69fb..50318a00d1 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
@@ -24,16 +24,25 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
+import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+
+import accord.primitives.Unseekables;
+import accord.topology.Topologies;
 import org.apache.cassandra.cql3.functions.types.utils.Bytes;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.service.accord.AccordTestUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.assertj.core.api.Assertions;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -68,6 +77,67 @@ public class AccordCQLTest extends AccordTestBase
         SHARED_CLUSTER.schemaChange("CREATE TYPE " + KEYSPACE + ".person 
(height int, age int)");
     }
 
+    @Test
+    public void testMultipleShards() throws Exception
+    {
+        String keyspace = "multipleShards";
+        String currentTable = keyspace + ".tbl";
+        List<String> ddls = Arrays.asList("CREATE KEYSPACE " + keyspace + " 
WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}",
+                                          "CREATE TABLE " + currentTable + " 
(k blob, c int, v int, primary key (k, c))");
+        List<String> tokens = SHARED_CLUSTER.stream()
+                                            .flatMap(i -> 
StreamSupport.stream(Splitter.on(",").split(i.config().getString("initial_token")).spliterator(),
 false))
+                                            .collect(Collectors.toList());
+
+        List<ByteBuffer> keys = tokens.stream()
+                                      .map(t -> (Murmur3Partitioner.LongToken) 
Murmur3Partitioner.instance.getTokenFactory().fromString(t))
+                                      
.map(Murmur3Partitioner.LongToken::keyForToken)
+                                      .collect(Collectors.toList());
+        List<String> keyStrings = keys.stream().map(bb -> "0x" + 
ByteBufferUtil.bytesToHex(bb)).collect(Collectors.toList());
+        StringBuilder query = new StringBuilder("BEGIN TRANSACTION\n");
+
+        for (int i = 0; i < keys.size(); i++)
+            query.append("  LET row" + i + " = (SELECT * FROM " + currentTable 
+ " WHERE k=" + keyStrings.get(i) + " AND c=0);\n");
+
+        query.append("  SELECT row0.v;\n")
+             .append("  IF ");
+
+        for (int i = 0; i < keys.size(); i++)
+            query.append((i > 0 ? " AND row" : "row") + i + " IS NULL");
+
+        query.append(" THEN\n");
+
+        for (int i = 0; i < keys.size(); i++)
+            query.append("    INSERT INTO " + currentTable + " (k, c, v) 
VALUES (" + keyStrings.get(i) + ", 0, " + i +");\n");
+
+        query.append("  END IF\n");
+        query.append("COMMIT TRANSACTION");
+
+        test(ddls, cluster -> {
+            // row0.v shouldn't have existed when the txn's SELECT was executed
+            assertRowEqualsWithPreemptedRetry(cluster, new Object[]{ null }, 
query.toString());
+
+            cluster.get(1).runOnInstance(() -> {
+                StringBuilder sb = new StringBuilder("BEGIN TRANSACTION\n");
+                for (int i = 0; i < keyStrings.size() - 1; i++)
+                    sb.append(String.format("LET row%d = (SELECT * FROM %s 
WHERE k=%s AND c=0);\n", i, currentTable, keyStrings.get(i)));
+                sb.append(String.format("SELECT * FROM %s WHERE k=%s AND 
c=0;\n", currentTable, keyStrings.get(keyStrings.size() - 1)));
+                sb.append("COMMIT TRANSACTION");
+
+                Unseekables<?, ?> routables = 
AccordTestUtils.createTxn(sb.toString()).keys().toUnseekables();
+                Topologies topology = 
AccordService.instance().node.topology().withUnsyncedEpochs(routables, 
AccordService.instance().node.topology().epoch());
+                // we don't detect out-of-bounds read/write yet, so use this 
to validate we reach different shards
+                Assertions.assertThat(topology.totalShards()).isEqualTo(2);
+            });
+
+            String check = "BEGIN TRANSACTION\n" +
+                           "  SELECT * FROM " + currentTable + " WHERE k = ? 
AND c = ?;\n" +
+                           "COMMIT TRANSACTION";
+
+            for (int i = 0; i < keys.size(); i++)
+                assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 
keys.get(i), 0, i}, check, keys.get(i), 0);
+        });
+    }
+
     @Test
     public void testScalarBindVariables() throws Throwable
     {
@@ -2116,6 +2186,90 @@ public class AccordCQLTest extends AccordTestBase
         );
     }
 
+    @Test
+    public void testMultiKeyQueryAndInsert() throws Throwable
+    {
+        test("CREATE TABLE " + currentTable + " (k int, c int, v int, primary 
key (k, c))",
+             cluster ->
+             {
+                 String query1 = "BEGIN TRANSACTION\n" +
+                                 "  LET select1 = (SELECT * FROM " + 
currentTable + " WHERE k=0 AND c=0);\n" +
+                                 "  LET select2 = (SELECT * FROM " + 
currentTable + " WHERE k=1 AND c=0);\n" +
+                                 "  SELECT v FROM " + currentTable + " WHERE 
k=0 AND c=0;\n" +
+                                 "  IF select1 IS NULL THEN\n" +
+                                 "    INSERT INTO " + currentTable + " (k, c, 
v) VALUES (0, 0, 0);\n" +
+                                 "    INSERT INTO " + currentTable + " (k, c, 
v) VALUES (1, 0, 0);\n" +
+                                 "  END IF\n" +
+                                 "COMMIT TRANSACTION";
+                 assertEmptyWithPreemptedRetry(cluster, query1);
+
+                 String check = "BEGIN TRANSACTION\n" +
+                                "  SELECT * FROM " + currentTable + " WHERE k 
= ? AND c = ?;\n" +
+                                "COMMIT TRANSACTION";
+                 assertRowEqualsWithPreemptedRetry(cluster, new Object[] {0, 
0, 0}, check, 0, 0);
+                 assertRowEqualsWithPreemptedRetry(cluster, new Object[] {1, 
0, 0}, check, 1, 0);
+
+                 String query2 = "BEGIN TRANSACTION\n" +
+                                 "  LET select1 = (SELECT * FROM " + 
currentTable + " WHERE k=1 AND c=0);\n" +
+                                 "  LET select2 = (SELECT * FROM " + 
currentTable + " WHERE k=2 AND c=0);\n" +
+                                 "  SELECT v FROM " + currentTable + " WHERE 
k=1 AND c=0;\n" +
+                                 "  IF select1.v = ? THEN\n" +
+                                 "    INSERT INTO " + currentTable + " (k, c, 
v) VALUES (1, 0, 1);\n" +
+                                 "    INSERT INTO " + currentTable + " (k, c, 
v) VALUES (2, 0, 1);\n" +
+                                 "  END IF\n" +
+                                 "COMMIT TRANSACTION";
+                 assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 0 
}, query2, 0);
+
+                 assertRowEqualsWithPreemptedRetry(cluster, new Object[] {0, 
0, 0}, check, 0, 0);
+                 assertRowEqualsWithPreemptedRetry(cluster, new Object[] {1, 
0, 1}, check, 1, 0);
+                 assertRowEqualsWithPreemptedRetry(cluster, new Object[] {2, 
0, 1}, check, 2, 0);
+             });
+    }
+
+    @Test
+    public void demoTest() throws Throwable
+    {
+        SHARED_CLUSTER.schemaChange("CREATE KEYSPACE demo_ks WITH 
REPLICATION={'class':'SimpleStrategy', 'replication_factor':2};");
+        SHARED_CLUSTER.schemaChange("CREATE TABLE demo_ks.org_docs ( org_name 
text, doc_id int, contents_version int static, title text, permissions int, 
PRIMARY KEY (org_name, doc_id) );");
+        SHARED_CLUSTER.schemaChange("CREATE TABLE demo_ks.org_users ( org_name 
text, user text, members_version int static, permissions int, PRIMARY KEY 
(org_name, user) );");
+        SHARED_CLUSTER.schemaChange("CREATE TABLE demo_ks.user_docs ( user 
text, doc_id int, title text, org_name text, permissions int, PRIMARY KEY 
(user, doc_id) );");
+
+        SHARED_CLUSTER.forEach(node -> node.runOnInstance(() -> 
AccordService.instance().createEpochFromConfigUnsafe()));
+        SHARED_CLUSTER.forEach(node -> node.runOnInstance(() -> 
AccordService.instance().setCacheSize(0)));
+
+        SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.org_users 
(org_name, user, members_version, permissions) VALUES ('demo', 'blake', 5, 
777);\n", ConsistencyLevel.ALL);
+        SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.org_users 
(org_name, user, members_version, permissions) VALUES ('demo', 'scott', 5, 
777);\n", ConsistencyLevel.ALL);
+        SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.org_docs 
(org_name, doc_id, contents_version, title, permissions) VALUES ('demo', 100, 
5, 'README', 644);\n", ConsistencyLevel.ALL);
+        SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.user_docs 
(user, doc_id, title, org_name, permissions) VALUES ('blake', 1, 'recipes', 
NULL, 777);\n", ConsistencyLevel.ALL);
+        SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.user_docs 
(user, doc_id, title, org_name, permissions) VALUES ('blake', 100, 'README', 
'demo', 644);\n", ConsistencyLevel.ALL);
+        SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.user_docs 
(user, doc_id, title, org_name, permissions) VALUES ('scott', 2, 'to do list', 
NULL, 777);\n", ConsistencyLevel.ALL);
+        SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.user_docs 
(user, doc_id, title, org_name, permissions) VALUES ('scott', 100, 'README', 
'demo', 644);\n", ConsistencyLevel.ALL);
+
+        String addDoc = "BEGIN TRANSACTION\n" +
+                        "  LET demo_user = (SELECT * FROM demo_ks.org_users 
WHERE org_name='demo' LIMIT 1);\n" +
+                        "  LET existing = (SELECT * FROM demo_ks.org_docs 
WHERE org_name='demo' AND doc_id=101);\n" +
+                        "  SELECT members_version FROM demo_ks.org_users WHERE 
org_name='demo' LIMIT 1;\n" +
+                        "  IF demo_user.members_version = 5 AND existing IS 
NULL THEN\n" +
+                        "    UPDATE demo_ks.org_docs SET title='slides.key', 
permissions=777, contents_version += 1 WHERE org_name='demo' AND doc_id=101;\n" 
+
+                        "    UPDATE demo_ks.user_docs SET title='slides.key', 
permissions=777 WHERE user='blake' AND doc_id=101;\n" +
+                        "    UPDATE demo_ks.user_docs SET title='slides.key', 
permissions=777 WHERE user='scott' AND doc_id=101;\n" +
+                        "  END IF\n" +
+                        "COMMIT TRANSACTION";
+        assertRowEqualsWithPreemptedRetry(SHARED_CLUSTER, new Object[] { 5 }, 
addDoc);
+
+        String addUser = "BEGIN TRANSACTION\n" +
+                         "  LET demo_doc = (SELECT * FROM demo_ks.org_docs 
WHERE org_name='demo' LIMIT 1);\n" +
+                         "  LET existing = (SELECT * FROM demo_ks.org_users 
WHERE org_name='demo' AND user='benedict');\n" +
+                         "  SELECT contents_version FROM demo_ks.org_docs 
WHERE org_name='demo' LIMIT 1;\n" +
+                         "  IF demo_doc.contents_version = 6 AND existing IS 
NULL THEN\n" +
+                         "    UPDATE demo_ks.org_users SET permissions=777, 
members_version += 1 WHERE org_name='demo' AND user='benedict';\n" +
+                         "    UPDATE demo_ks.user_docs SET title='README', 
permissions=644 WHERE user='benedict' AND doc_id=100;\n" +
+                         "    UPDATE demo_ks.user_docs SET title='slides.key', 
permissions=777 WHERE user='benedict' AND doc_id=101;\n" +
+                         "  END IF\n" +
+                         "COMMIT TRANSACTION";
+        assertRowEqualsWithPreemptedRetry(SHARED_CLUSTER, new Object[] { 6 }, 
addUser);
+    }
+
     // TODO: Implement support for basic arithmetic on references in INSERT
     @Ignore
     @Test
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
index f1aac8268a..d128ead489 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
@@ -18,31 +18,15 @@
 
 package org.apache.cassandra.distributed.test.accord;
 
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
-import com.google.common.base.Splitter;
-
-import org.assertj.core.api.Assertions;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.messages.Commit;
-import accord.primitives.Unseekables;
-import accord.topology.Topologies;
-import org.apache.cassandra.dht.Murmur3Partitioner;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IMessageFilters;
 import org.apache.cassandra.distributed.impl.Instance;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.Verb;
-import org.apache.cassandra.service.accord.AccordService;
-import org.apache.cassandra.service.accord.AccordTestUtils;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 @SuppressWarnings("Convert2MethodRef")
 public class AccordIntegrationTest extends AccordTestBase
@@ -103,90 +87,6 @@ public class AccordIntegrationTest extends AccordTestBase
         });
     }
 
-    /*
-    Sporadically fails with someone asking for a token() from SentinelKey, 
which apparently is unsupported.
-    
-    ERROR 19:07:47 Exception in thread Thread[AccordStage-1,5,SharedPool]
-    java.lang.UnsupportedOperationException: null
-       at 
org.apache.cassandra.service.accord.api.AccordRoutingKey$SentinelKey.token(AccordRoutingKey.java:152)
-       at 
org.apache.cassandra.service.accord.api.AccordRoutingKey.routingHash(AccordRoutingKey.java:84)
-       at 
accord.local.CommandStores$ShardedRanges.keyIndex(CommandStores.java:191)
-       at 
accord.local.CommandStores$ShardedRanges.addKeyIndex(CommandStores.java:196)
-       at accord.primitives.AbstractKeys.foldl(AbstractKeys.java:203)
-       at 
accord.local.CommandStores$ShardedRanges.shards(CommandStores.java:179)
-       at accord.local.CommandStores.mapReduce(CommandStores.java:426)
-       at accord.local.CommandStores.mapReduceConsume(CommandStores.java:409)
-       at 
accord.local.AsyncCommandStores.mapReduceConsume(AsyncCommandStores.java:66)
-       at accord.local.Node.mapReduceConsumeLocal(Node.java:276)
-       at accord.messages.PreAccept.process(PreAccept.java:90)
-       at accord.messages.TxnRequest.process(TxnRequest.java:145)
-       at 
org.apache.cassandra.service.accord.AccordVerbHandler.doVerb(AccordVerbHandler.java:46)
-       at 
org.apache.cassandra.net.InboundSink.lambda$new$0(InboundSink.java:78)
-     */
-    @Ignore
-    @Test
-    public void multipleShards()
-    {
-        // can't reuse test() due to it using "int" for pk; this test needs 
"blob"
-        String keyspace = "multipleShards";
-        
-        SHARED_CLUSTER.schemaChange("CREATE KEYSPACE " + keyspace + " WITH 
REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}");
-        SHARED_CLUSTER.schemaChange("CREATE TABLE " + keyspace + ".tbl (k 
blob, c int, v int, primary key (k, c))");
-        SHARED_CLUSTER.forEach(node -> node.runOnInstance(() -> 
AccordService.instance().createEpochFromConfigUnsafe()));
-        SHARED_CLUSTER.forEach(node -> node.runOnInstance(() -> 
AccordService.instance().setCacheSize(0)));
-
-        List<String> tokens = SHARED_CLUSTER.stream()
-                                           .flatMap(i -> 
StreamSupport.stream(Splitter.on(",").split(i.config().getString("initial_token")).spliterator(),
 false))
-                                           .collect(Collectors.toList());
-
-        List<ByteBuffer> keys = tokens.stream()
-                                      .map(t -> (Murmur3Partitioner.LongToken) 
Murmur3Partitioner.instance.getTokenFactory().fromString(t))
-                                      
.map(Murmur3Partitioner.LongToken::keyForToken)
-                                      .collect(Collectors.toList());
-
-        List<String> keyStrings = keys.stream().map(bb -> "0x" + 
ByteBufferUtil.bytesToHex(bb)).collect(Collectors.toList());
-        StringBuilder query = new StringBuilder("BEGIN TRANSACTION\n");
-        
-        for (int i = 0; i < keys.size(); i++)
-            query.append("  LET row" + i + " = (SELECT * FROM " + keyspace + 
".tbl WHERE k=" + keyStrings.get(i) + " AND c=0);\n");
-
-        query.append("  SELECT row0.v;\n")
-             .append("  IF ");
-
-        for (int i = 0; i < keys.size(); i++)
-            query.append((i > 0 ? " AND row" : "row") + i + " IS NULL");
-
-        query.append(" THEN\n");
-
-        for (int i = 0; i < keys.size(); i++)
-            query.append("    INSERT INTO " + keyspace + ".tbl (k, c, v) 
VALUES (" + keyStrings.get(i) + ", 0, " + i +");\n");
-        
-        query.append("  END IF\n");
-        query.append("COMMIT TRANSACTION");
-
-        // row0.v shouldn't have existed when the txn's SELECT was executed
-        assertRowEqualsWithPreemptedRetry(SHARED_CLUSTER, null, 
query.toString());
-
-        SHARED_CLUSTER.get(1).runOnInstance(() -> {
-            StringBuilder sb = new StringBuilder("BEGIN TRANSACTION\n");
-            for (int i = 0; i < keyStrings.size(); i++)
-                sb.append(String.format("LET row%d = (SELECT * FROM ks.tbl 
WHERE k=%s AND c=0);\n", i, keyStrings.get(i)));
-            sb.append("COMMIT TRANSACTION");
-
-            Unseekables<?, ?> routables = 
AccordTestUtils.createTxn(sb.toString()).keys().toUnseekables();
-            Topologies topology = 
AccordService.instance().node.topology().withUnsyncedEpochs(routables, 1);
-            // we don't detect out-of-bounds read/write yet, so use this to 
validate we reach different shards
-            Assertions.assertThat(topology.totalShards()).isEqualTo(2);
-        });
-
-        String check = "BEGIN TRANSACTION\n" +
-                       "  SELECT * FROM " + keyspace + ".tbl WHERE k = ? AND c 
= ?;\n" +
-                       "COMMIT TRANSACTION";
-
-        for (int i = 0; i < keys.size(); i++)
-            assertRowEqualsWithPreemptedRetry(SHARED_CLUSTER, new Object[] { 
keys.get(i), 0, i}, check, keys.get(i), 0);
-    }
-
     @Test
     public void testLostCommitReadTriggersFallbackRead() throws Exception
     {
@@ -214,112 +114,4 @@ public class AccordIntegrationTest extends AccordTestBase
             assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 0, 0, 1 
}, check, 0, 0);
         });
     }
-
-    @Test
-    public void testMultiKeyQueryAndInsert() throws Throwable
-    {
-        test("CREATE TABLE " + currentTable + " (k int, c int, v int, primary 
key (k, c))",
-             cluster -> 
-             {
-                 String query1 = "BEGIN TRANSACTION\n" +
-                                 "  LET select1 = (SELECT * FROM " + 
currentTable + " WHERE k=0 AND c=0);\n" +
-                                 "  LET select2 = (SELECT * FROM " + 
currentTable + " WHERE k=1 AND c=0);\n" +
-                                 "  SELECT v FROM " + currentTable + " WHERE 
k=0 AND c=0;\n" +
-                                 "  IF select1 IS NULL THEN\n" +
-                                 "    INSERT INTO " + currentTable + " (k, c, 
v) VALUES (0, 0, 0);\n" +
-                                 "    INSERT INTO " + currentTable + " (k, c, 
v) VALUES (1, 0, 0);\n" +
-                                 "  END IF\n" +
-                                 "COMMIT TRANSACTION";
-                 assertEmptyWithPreemptedRetry(cluster, query1);
-
-                 String check = "BEGIN TRANSACTION\n" +
-                                "  SELECT * FROM " + currentTable + " WHERE k 
= ? AND c = ?;\n" +
-                                "COMMIT TRANSACTION";
-                 assertRowEqualsWithPreemptedRetry(cluster, new Object[] {0, 
0, 0}, check, 0, 0);
-                 assertRowEqualsWithPreemptedRetry(cluster, new Object[] {1, 
0, 0}, check, 1, 0);
-
-                 String query2 = "BEGIN TRANSACTION\n" +
-                                 "  LET select1 = (SELECT * FROM " + 
currentTable + " WHERE k=1 AND c=0);\n" +
-                                 "  LET select2 = (SELECT * FROM " + 
currentTable + " WHERE k=2 AND c=0);\n" +
-                                 "  SELECT v FROM " + currentTable + " WHERE 
k=1 AND c=0;\n" +
-                                 "  IF select1.v = ? THEN\n" +
-                                 "    INSERT INTO " + currentTable + " (k, c, 
v) VALUES (1, 0, 1);\n" +
-                                 "    INSERT INTO " + currentTable + " (k, c, 
v) VALUES (2, 0, 1);\n" +
-                                 "  END IF\n" +
-                                 "COMMIT TRANSACTION";
-                 assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 0 
}, query2, 0);
-
-                 assertRowEqualsWithPreemptedRetry(cluster, new Object[] {0, 
0, 0}, check, 0, 0);
-                 assertRowEqualsWithPreemptedRetry(cluster, new Object[] {1, 
0, 1}, check, 1, 0);
-                 assertRowEqualsWithPreemptedRetry(cluster, new Object[] {2, 
0, 1}, check, 2, 0);
-             });
-    }
-
-    @Test
-    public void demoTest() throws Throwable
-    {
-        SHARED_CLUSTER.schemaChange("CREATE KEYSPACE demo_ks WITH 
REPLICATION={'class':'SimpleStrategy', 'replication_factor':2};");
-        SHARED_CLUSTER.schemaChange("CREATE TABLE demo_ks.org_docs ( org_name 
text, doc_id int, contents_version int static, title text, permissions int, 
PRIMARY KEY (org_name, doc_id) );");
-        SHARED_CLUSTER.schemaChange("CREATE TABLE demo_ks.org_users ( org_name 
text, user text, members_version int static, permissions int, PRIMARY KEY 
(org_name, user) );");
-        SHARED_CLUSTER.schemaChange("CREATE TABLE demo_ks.user_docs ( user 
text, doc_id int, title text, org_name text, permissions int, PRIMARY KEY 
(user, doc_id) );");
-
-        SHARED_CLUSTER.forEach(node -> node.runOnInstance(() -> 
AccordService.instance().createEpochFromConfigUnsafe()));
-        SHARED_CLUSTER.forEach(node -> node.runOnInstance(() -> 
AccordService.instance().setCacheSize(0)));
-
-        SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.org_users 
(org_name, user, members_version, permissions) VALUES ('demo', 'blake', 5, 
777);\n", ConsistencyLevel.ALL);
-        SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.org_users 
(org_name, user, members_version, permissions) VALUES ('demo', 'scott', 5, 
777);\n", ConsistencyLevel.ALL);
-        SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.org_docs 
(org_name, doc_id, contents_version, title, permissions) VALUES ('demo', 100, 
5, 'README', 644);\n", ConsistencyLevel.ALL);
-        SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.user_docs 
(user, doc_id, title, org_name, permissions) VALUES ('blake', 1, 'recipes', 
NULL, 777);\n", ConsistencyLevel.ALL);
-        SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.user_docs 
(user, doc_id, title, org_name, permissions) VALUES ('blake', 100, 'README', 
'demo', 644);\n", ConsistencyLevel.ALL);
-        SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.user_docs 
(user, doc_id, title, org_name, permissions) VALUES ('scott', 2, 'to do list', 
NULL, 777);\n", ConsistencyLevel.ALL);
-        SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.user_docs 
(user, doc_id, title, org_name, permissions) VALUES ('scott', 100, 'README', 
'demo', 644);\n", ConsistencyLevel.ALL);
-
-        String addDoc = "BEGIN TRANSACTION\n" +
-                        "  LET demo_user = (SELECT * FROM demo_ks.org_users 
WHERE org_name='demo' LIMIT 1);\n" +
-                        "  LET existing = (SELECT * FROM demo_ks.org_docs 
WHERE org_name='demo' AND doc_id=101);\n" +
-                        "  SELECT members_version FROM demo_ks.org_users WHERE 
org_name='demo' LIMIT 1;\n" +
-                        "  IF demo_user.members_version = 5 AND existing IS 
NULL THEN\n" +
-                        "    UPDATE demo_ks.org_docs SET title='slides.key', 
permissions=777, contents_version += 1 WHERE org_name='demo' AND doc_id=101;\n" 
+
-                        "    UPDATE demo_ks.user_docs SET title='slides.key', 
permissions=777 WHERE user='blake' AND doc_id=101;\n" +
-                        "    UPDATE demo_ks.user_docs SET title='slides.key', 
permissions=777 WHERE user='scott' AND doc_id=101;\n" +
-                        "  END IF\n" +
-                        "COMMIT TRANSACTION";
-        assertRowEqualsWithPreemptedRetry(SHARED_CLUSTER, new Object[] { 5 }, 
addDoc);
-
-        String addUser = "BEGIN TRANSACTION\n" +
-                         "  LET demo_doc = (SELECT * FROM demo_ks.org_docs 
WHERE org_name='demo' LIMIT 1);\n" +
-                         "  LET existing = (SELECT * FROM demo_ks.org_users 
WHERE org_name='demo' AND user='benedict');\n" +
-                         "  SELECT contents_version FROM demo_ks.org_docs 
WHERE org_name='demo' LIMIT 1;\n" +
-                         "  IF demo_doc.contents_version = 6 AND existing IS 
NULL THEN\n" +
-                         "    UPDATE demo_ks.org_users SET permissions=777, 
members_version += 1 WHERE org_name='demo' AND user='benedict';\n" +
-                         "    UPDATE demo_ks.user_docs SET title='README', 
permissions=644 WHERE user='benedict' AND doc_id=100;\n" +
-                         "    UPDATE demo_ks.user_docs SET title='slides.key', 
permissions=777 WHERE user='benedict' AND doc_id=101;\n" +
-                         "  END IF\n" +
-                         "COMMIT TRANSACTION";
-        assertRowEqualsWithPreemptedRetry(SHARED_CLUSTER, new Object[] { 6 }, 
addUser);
-    }
-
-//    @Test
-//    public void acceptInvalidationTest()
-//    {
-//
-//    }
-//
-//    @Test
-//    public void applyAndCheckTest()
-//    {
-//
-//    }
-//
-//    @Test
-//    public void beginInvalidationTest()
-//    {
-//
-//    }
-//
-//    @Test
-//    public void checkStatusTest()
-//    {
-//
-//    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
index 8f35a59f2a..2c9b2a46ae 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
@@ -19,6 +19,8 @@
 package org.apache.cassandra.distributed.test.accord;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -85,7 +87,13 @@ public abstract class AccordTestBase extends TestBaseImpl
 
     protected void test(String tableDDL, FailingConsumer<Cluster> fn) throws 
Exception
     {
-        SHARED_CLUSTER.schemaChange(tableDDL);
+        test(Collections.singletonList(tableDDL), fn);
+    }
+
+    protected void test(List<String> ddls, FailingConsumer<Cluster> fn) throws 
Exception
+    {
+        for (String ddl : ddls)
+            SHARED_CLUSTER.schemaChange(ddl);
         SHARED_CLUSTER.forEach(node -> node.runOnInstance(() -> 
AccordService.instance().createEpochFromConfigUnsafe()));
 
         // Evict commands from the cache immediately to expose problems 
loading from disk.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to