This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 0f4c6cec2851482e12fba1fe089a3452f5601548 Author: David Capwell <dcapw...@apache.org> AuthorDate: Mon Dec 19 13:03:41 2022 -0800 Ninja for CASSANDRA-17719: accord.primitives.Range#someIntersectingRoutingKey was added but does not work in all cases in C* due to sentinal values, added logic to return a C* friendly token --- .../cassandra/service/accord/TokenRange.java | 9 + .../service/accord/api/AccordRoutingKey.java | 9 + .../distributed/test/accord/AccordCQLTest.java | 154 +++++++++++++++ .../test/accord/AccordIntegrationTest.java | 208 --------------------- .../distributed/test/accord/AccordTestBase.java | 10 +- 5 files changed, 181 insertions(+), 209 deletions(-) diff --git a/src/java/org/apache/cassandra/service/accord/TokenRange.java b/src/java/org/apache/cassandra/service/accord/TokenRange.java index 22683d4b8e..7fb1ca8f34 100644 --- a/src/java/org/apache/cassandra/service/accord/TokenRange.java +++ b/src/java/org/apache/cassandra/service/accord/TokenRange.java @@ -47,6 +47,15 @@ public class TokenRange extends Range.EndInclusive return new TokenRange((AccordRoutingKey) start, (AccordRoutingKey) end); } + @Override + public RoutingKey someIntersectingRoutingKey() + { + RoutingKey pick = startInclusive() ? start() : end(); + if (pick instanceof SentinelKey) + pick = ((SentinelKey) pick).toTokenKey(); + return pick; + } + public static final IVersionedSerializer<TokenRange> serializer = new IVersionedSerializer<TokenRange>() { @Override diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java index 055658a503..e9c600ae4a 100644 --- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java +++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java @@ -25,6 +25,7 @@ import accord.api.Key; import accord.api.RoutingKey; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; @@ -95,6 +96,14 @@ public abstract class AccordRoutingKey extends AccordRoutableKey implements Rout return new SentinelKey(tableId, false); } + public TokenKey toTokenKey() + { + IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); + return new TokenKey(tableId, isMin ? + partitioner.getMinimumToken().increaseSlightly() : + partitioner.getMaximumToken().decreaseSlightly()); + } + @Override public Token token() { diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java index 1909fb69fb..50318a00d1 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java @@ -24,16 +24,25 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; + +import accord.primitives.Unseekables; +import accord.topology.Topologies; import org.apache.cassandra.cql3.functions.types.utils.Bytes; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.ListType; import org.apache.cassandra.db.marshal.MapType; import org.apache.cassandra.db.marshal.SetType; import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.service.accord.AccordTestUtils; +import org.apache.cassandra.utils.ByteBufferUtil; import org.assertj.core.api.Assertions; import org.junit.BeforeClass; import org.junit.Ignore; @@ -68,6 +77,67 @@ public class AccordCQLTest extends AccordTestBase SHARED_CLUSTER.schemaChange("CREATE TYPE " + KEYSPACE + ".person (height int, age int)"); } + @Test + public void testMultipleShards() throws Exception + { + String keyspace = "multipleShards"; + String currentTable = keyspace + ".tbl"; + List<String> ddls = Arrays.asList("CREATE KEYSPACE " + keyspace + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}", + "CREATE TABLE " + currentTable + " (k blob, c int, v int, primary key (k, c))"); + List<String> tokens = SHARED_CLUSTER.stream() + .flatMap(i -> StreamSupport.stream(Splitter.on(",").split(i.config().getString("initial_token")).spliterator(), false)) + .collect(Collectors.toList()); + + List<ByteBuffer> keys = tokens.stream() + .map(t -> (Murmur3Partitioner.LongToken) Murmur3Partitioner.instance.getTokenFactory().fromString(t)) + .map(Murmur3Partitioner.LongToken::keyForToken) + .collect(Collectors.toList()); + List<String> keyStrings = keys.stream().map(bb -> "0x" + ByteBufferUtil.bytesToHex(bb)).collect(Collectors.toList()); + StringBuilder query = new StringBuilder("BEGIN TRANSACTION\n"); + + for (int i = 0; i < keys.size(); i++) + query.append(" LET row" + i + " = (SELECT * FROM " + currentTable + " WHERE k=" + keyStrings.get(i) + " AND c=0);\n"); + + query.append(" SELECT row0.v;\n") + .append(" IF "); + + for (int i = 0; i < keys.size(); i++) + query.append((i > 0 ? " AND row" : "row") + i + " IS NULL"); + + query.append(" THEN\n"); + + for (int i = 0; i < keys.size(); i++) + query.append(" INSERT INTO " + currentTable + " (k, c, v) VALUES (" + keyStrings.get(i) + ", 0, " + i +");\n"); + + query.append(" END IF\n"); + query.append("COMMIT TRANSACTION"); + + test(ddls, cluster -> { + // row0.v shouldn't have existed when the txn's SELECT was executed + assertRowEqualsWithPreemptedRetry(cluster, new Object[]{ null }, query.toString()); + + cluster.get(1).runOnInstance(() -> { + StringBuilder sb = new StringBuilder("BEGIN TRANSACTION\n"); + for (int i = 0; i < keyStrings.size() - 1; i++) + sb.append(String.format("LET row%d = (SELECT * FROM %s WHERE k=%s AND c=0);\n", i, currentTable, keyStrings.get(i))); + sb.append(String.format("SELECT * FROM %s WHERE k=%s AND c=0;\n", currentTable, keyStrings.get(keyStrings.size() - 1))); + sb.append("COMMIT TRANSACTION"); + + Unseekables<?, ?> routables = AccordTestUtils.createTxn(sb.toString()).keys().toUnseekables(); + Topologies topology = AccordService.instance().node.topology().withUnsyncedEpochs(routables, AccordService.instance().node.topology().epoch()); + // we don't detect out-of-bounds read/write yet, so use this to validate we reach different shards + Assertions.assertThat(topology.totalShards()).isEqualTo(2); + }); + + String check = "BEGIN TRANSACTION\n" + + " SELECT * FROM " + currentTable + " WHERE k = ? AND c = ?;\n" + + "COMMIT TRANSACTION"; + + for (int i = 0; i < keys.size(); i++) + assertRowEqualsWithPreemptedRetry(cluster, new Object[] { keys.get(i), 0, i}, check, keys.get(i), 0); + }); + } + @Test public void testScalarBindVariables() throws Throwable { @@ -2116,6 +2186,90 @@ public class AccordCQLTest extends AccordTestBase ); } + @Test + public void testMultiKeyQueryAndInsert() throws Throwable + { + test("CREATE TABLE " + currentTable + " (k int, c int, v int, primary key (k, c))", + cluster -> + { + String query1 = "BEGIN TRANSACTION\n" + + " LET select1 = (SELECT * FROM " + currentTable + " WHERE k=0 AND c=0);\n" + + " LET select2 = (SELECT * FROM " + currentTable + " WHERE k=1 AND c=0);\n" + + " SELECT v FROM " + currentTable + " WHERE k=0 AND c=0;\n" + + " IF select1 IS NULL THEN\n" + + " INSERT INTO " + currentTable + " (k, c, v) VALUES (0, 0, 0);\n" + + " INSERT INTO " + currentTable + " (k, c, v) VALUES (1, 0, 0);\n" + + " END IF\n" + + "COMMIT TRANSACTION"; + assertEmptyWithPreemptedRetry(cluster, query1); + + String check = "BEGIN TRANSACTION\n" + + " SELECT * FROM " + currentTable + " WHERE k = ? AND c = ?;\n" + + "COMMIT TRANSACTION"; + assertRowEqualsWithPreemptedRetry(cluster, new Object[] {0, 0, 0}, check, 0, 0); + assertRowEqualsWithPreemptedRetry(cluster, new Object[] {1, 0, 0}, check, 1, 0); + + String query2 = "BEGIN TRANSACTION\n" + + " LET select1 = (SELECT * FROM " + currentTable + " WHERE k=1 AND c=0);\n" + + " LET select2 = (SELECT * FROM " + currentTable + " WHERE k=2 AND c=0);\n" + + " SELECT v FROM " + currentTable + " WHERE k=1 AND c=0;\n" + + " IF select1.v = ? THEN\n" + + " INSERT INTO " + currentTable + " (k, c, v) VALUES (1, 0, 1);\n" + + " INSERT INTO " + currentTable + " (k, c, v) VALUES (2, 0, 1);\n" + + " END IF\n" + + "COMMIT TRANSACTION"; + assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 0 }, query2, 0); + + assertRowEqualsWithPreemptedRetry(cluster, new Object[] {0, 0, 0}, check, 0, 0); + assertRowEqualsWithPreemptedRetry(cluster, new Object[] {1, 0, 1}, check, 1, 0); + assertRowEqualsWithPreemptedRetry(cluster, new Object[] {2, 0, 1}, check, 2, 0); + }); + } + + @Test + public void demoTest() throws Throwable + { + SHARED_CLUSTER.schemaChange("CREATE KEYSPACE demo_ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor':2};"); + SHARED_CLUSTER.schemaChange("CREATE TABLE demo_ks.org_docs ( org_name text, doc_id int, contents_version int static, title text, permissions int, PRIMARY KEY (org_name, doc_id) );"); + SHARED_CLUSTER.schemaChange("CREATE TABLE demo_ks.org_users ( org_name text, user text, members_version int static, permissions int, PRIMARY KEY (org_name, user) );"); + SHARED_CLUSTER.schemaChange("CREATE TABLE demo_ks.user_docs ( user text, doc_id int, title text, org_name text, permissions int, PRIMARY KEY (user, doc_id) );"); + + SHARED_CLUSTER.forEach(node -> node.runOnInstance(() -> AccordService.instance().createEpochFromConfigUnsafe())); + SHARED_CLUSTER.forEach(node -> node.runOnInstance(() -> AccordService.instance().setCacheSize(0))); + + SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.org_users (org_name, user, members_version, permissions) VALUES ('demo', 'blake', 5, 777);\n", ConsistencyLevel.ALL); + SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.org_users (org_name, user, members_version, permissions) VALUES ('demo', 'scott', 5, 777);\n", ConsistencyLevel.ALL); + SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.org_docs (org_name, doc_id, contents_version, title, permissions) VALUES ('demo', 100, 5, 'README', 644);\n", ConsistencyLevel.ALL); + SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.user_docs (user, doc_id, title, org_name, permissions) VALUES ('blake', 1, 'recipes', NULL, 777);\n", ConsistencyLevel.ALL); + SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.user_docs (user, doc_id, title, org_name, permissions) VALUES ('blake', 100, 'README', 'demo', 644);\n", ConsistencyLevel.ALL); + SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.user_docs (user, doc_id, title, org_name, permissions) VALUES ('scott', 2, 'to do list', NULL, 777);\n", ConsistencyLevel.ALL); + SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.user_docs (user, doc_id, title, org_name, permissions) VALUES ('scott', 100, 'README', 'demo', 644);\n", ConsistencyLevel.ALL); + + String addDoc = "BEGIN TRANSACTION\n" + + " LET demo_user = (SELECT * FROM demo_ks.org_users WHERE org_name='demo' LIMIT 1);\n" + + " LET existing = (SELECT * FROM demo_ks.org_docs WHERE org_name='demo' AND doc_id=101);\n" + + " SELECT members_version FROM demo_ks.org_users WHERE org_name='demo' LIMIT 1;\n" + + " IF demo_user.members_version = 5 AND existing IS NULL THEN\n" + + " UPDATE demo_ks.org_docs SET title='slides.key', permissions=777, contents_version += 1 WHERE org_name='demo' AND doc_id=101;\n" + + " UPDATE demo_ks.user_docs SET title='slides.key', permissions=777 WHERE user='blake' AND doc_id=101;\n" + + " UPDATE demo_ks.user_docs SET title='slides.key', permissions=777 WHERE user='scott' AND doc_id=101;\n" + + " END IF\n" + + "COMMIT TRANSACTION"; + assertRowEqualsWithPreemptedRetry(SHARED_CLUSTER, new Object[] { 5 }, addDoc); + + String addUser = "BEGIN TRANSACTION\n" + + " LET demo_doc = (SELECT * FROM demo_ks.org_docs WHERE org_name='demo' LIMIT 1);\n" + + " LET existing = (SELECT * FROM demo_ks.org_users WHERE org_name='demo' AND user='benedict');\n" + + " SELECT contents_version FROM demo_ks.org_docs WHERE org_name='demo' LIMIT 1;\n" + + " IF demo_doc.contents_version = 6 AND existing IS NULL THEN\n" + + " UPDATE demo_ks.org_users SET permissions=777, members_version += 1 WHERE org_name='demo' AND user='benedict';\n" + + " UPDATE demo_ks.user_docs SET title='README', permissions=644 WHERE user='benedict' AND doc_id=100;\n" + + " UPDATE demo_ks.user_docs SET title='slides.key', permissions=777 WHERE user='benedict' AND doc_id=101;\n" + + " END IF\n" + + "COMMIT TRANSACTION"; + assertRowEqualsWithPreemptedRetry(SHARED_CLUSTER, new Object[] { 6 }, addUser); + } + // TODO: Implement support for basic arithmetic on references in INSERT @Ignore @Test diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java index f1aac8268a..d128ead489 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java @@ -18,31 +18,15 @@ package org.apache.cassandra.distributed.test.accord; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; - -import com.google.common.base.Splitter; - -import org.assertj.core.api.Assertions; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.messages.Commit; -import accord.primitives.Unseekables; -import accord.topology.Topologies; -import org.apache.cassandra.dht.Murmur3Partitioner; -import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.IMessageFilters; import org.apache.cassandra.distributed.impl.Instance; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.Verb; -import org.apache.cassandra.service.accord.AccordService; -import org.apache.cassandra.service.accord.AccordTestUtils; -import org.apache.cassandra.utils.ByteBufferUtil; @SuppressWarnings("Convert2MethodRef") public class AccordIntegrationTest extends AccordTestBase @@ -103,90 +87,6 @@ public class AccordIntegrationTest extends AccordTestBase }); } - /* - Sporadically fails with someone asking for a token() from SentinelKey, which apparently is unsupported. - - ERROR 19:07:47 Exception in thread Thread[AccordStage-1,5,SharedPool] - java.lang.UnsupportedOperationException: null - at org.apache.cassandra.service.accord.api.AccordRoutingKey$SentinelKey.token(AccordRoutingKey.java:152) - at org.apache.cassandra.service.accord.api.AccordRoutingKey.routingHash(AccordRoutingKey.java:84) - at accord.local.CommandStores$ShardedRanges.keyIndex(CommandStores.java:191) - at accord.local.CommandStores$ShardedRanges.addKeyIndex(CommandStores.java:196) - at accord.primitives.AbstractKeys.foldl(AbstractKeys.java:203) - at accord.local.CommandStores$ShardedRanges.shards(CommandStores.java:179) - at accord.local.CommandStores.mapReduce(CommandStores.java:426) - at accord.local.CommandStores.mapReduceConsume(CommandStores.java:409) - at accord.local.AsyncCommandStores.mapReduceConsume(AsyncCommandStores.java:66) - at accord.local.Node.mapReduceConsumeLocal(Node.java:276) - at accord.messages.PreAccept.process(PreAccept.java:90) - at accord.messages.TxnRequest.process(TxnRequest.java:145) - at org.apache.cassandra.service.accord.AccordVerbHandler.doVerb(AccordVerbHandler.java:46) - at org.apache.cassandra.net.InboundSink.lambda$new$0(InboundSink.java:78) - */ - @Ignore - @Test - public void multipleShards() - { - // can't reuse test() due to it using "int" for pk; this test needs "blob" - String keyspace = "multipleShards"; - - SHARED_CLUSTER.schemaChange("CREATE KEYSPACE " + keyspace + " WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}"); - SHARED_CLUSTER.schemaChange("CREATE TABLE " + keyspace + ".tbl (k blob, c int, v int, primary key (k, c))"); - SHARED_CLUSTER.forEach(node -> node.runOnInstance(() -> AccordService.instance().createEpochFromConfigUnsafe())); - SHARED_CLUSTER.forEach(node -> node.runOnInstance(() -> AccordService.instance().setCacheSize(0))); - - List<String> tokens = SHARED_CLUSTER.stream() - .flatMap(i -> StreamSupport.stream(Splitter.on(",").split(i.config().getString("initial_token")).spliterator(), false)) - .collect(Collectors.toList()); - - List<ByteBuffer> keys = tokens.stream() - .map(t -> (Murmur3Partitioner.LongToken) Murmur3Partitioner.instance.getTokenFactory().fromString(t)) - .map(Murmur3Partitioner.LongToken::keyForToken) - .collect(Collectors.toList()); - - List<String> keyStrings = keys.stream().map(bb -> "0x" + ByteBufferUtil.bytesToHex(bb)).collect(Collectors.toList()); - StringBuilder query = new StringBuilder("BEGIN TRANSACTION\n"); - - for (int i = 0; i < keys.size(); i++) - query.append(" LET row" + i + " = (SELECT * FROM " + keyspace + ".tbl WHERE k=" + keyStrings.get(i) + " AND c=0);\n"); - - query.append(" SELECT row0.v;\n") - .append(" IF "); - - for (int i = 0; i < keys.size(); i++) - query.append((i > 0 ? " AND row" : "row") + i + " IS NULL"); - - query.append(" THEN\n"); - - for (int i = 0; i < keys.size(); i++) - query.append(" INSERT INTO " + keyspace + ".tbl (k, c, v) VALUES (" + keyStrings.get(i) + ", 0, " + i +");\n"); - - query.append(" END IF\n"); - query.append("COMMIT TRANSACTION"); - - // row0.v shouldn't have existed when the txn's SELECT was executed - assertRowEqualsWithPreemptedRetry(SHARED_CLUSTER, null, query.toString()); - - SHARED_CLUSTER.get(1).runOnInstance(() -> { - StringBuilder sb = new StringBuilder("BEGIN TRANSACTION\n"); - for (int i = 0; i < keyStrings.size(); i++) - sb.append(String.format("LET row%d = (SELECT * FROM ks.tbl WHERE k=%s AND c=0);\n", i, keyStrings.get(i))); - sb.append("COMMIT TRANSACTION"); - - Unseekables<?, ?> routables = AccordTestUtils.createTxn(sb.toString()).keys().toUnseekables(); - Topologies topology = AccordService.instance().node.topology().withUnsyncedEpochs(routables, 1); - // we don't detect out-of-bounds read/write yet, so use this to validate we reach different shards - Assertions.assertThat(topology.totalShards()).isEqualTo(2); - }); - - String check = "BEGIN TRANSACTION\n" + - " SELECT * FROM " + keyspace + ".tbl WHERE k = ? AND c = ?;\n" + - "COMMIT TRANSACTION"; - - for (int i = 0; i < keys.size(); i++) - assertRowEqualsWithPreemptedRetry(SHARED_CLUSTER, new Object[] { keys.get(i), 0, i}, check, keys.get(i), 0); - } - @Test public void testLostCommitReadTriggersFallbackRead() throws Exception { @@ -214,112 +114,4 @@ public class AccordIntegrationTest extends AccordTestBase assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 0, 0, 1 }, check, 0, 0); }); } - - @Test - public void testMultiKeyQueryAndInsert() throws Throwable - { - test("CREATE TABLE " + currentTable + " (k int, c int, v int, primary key (k, c))", - cluster -> - { - String query1 = "BEGIN TRANSACTION\n" + - " LET select1 = (SELECT * FROM " + currentTable + " WHERE k=0 AND c=0);\n" + - " LET select2 = (SELECT * FROM " + currentTable + " WHERE k=1 AND c=0);\n" + - " SELECT v FROM " + currentTable + " WHERE k=0 AND c=0;\n" + - " IF select1 IS NULL THEN\n" + - " INSERT INTO " + currentTable + " (k, c, v) VALUES (0, 0, 0);\n" + - " INSERT INTO " + currentTable + " (k, c, v) VALUES (1, 0, 0);\n" + - " END IF\n" + - "COMMIT TRANSACTION"; - assertEmptyWithPreemptedRetry(cluster, query1); - - String check = "BEGIN TRANSACTION\n" + - " SELECT * FROM " + currentTable + " WHERE k = ? AND c = ?;\n" + - "COMMIT TRANSACTION"; - assertRowEqualsWithPreemptedRetry(cluster, new Object[] {0, 0, 0}, check, 0, 0); - assertRowEqualsWithPreemptedRetry(cluster, new Object[] {1, 0, 0}, check, 1, 0); - - String query2 = "BEGIN TRANSACTION\n" + - " LET select1 = (SELECT * FROM " + currentTable + " WHERE k=1 AND c=0);\n" + - " LET select2 = (SELECT * FROM " + currentTable + " WHERE k=2 AND c=0);\n" + - " SELECT v FROM " + currentTable + " WHERE k=1 AND c=0;\n" + - " IF select1.v = ? THEN\n" + - " INSERT INTO " + currentTable + " (k, c, v) VALUES (1, 0, 1);\n" + - " INSERT INTO " + currentTable + " (k, c, v) VALUES (2, 0, 1);\n" + - " END IF\n" + - "COMMIT TRANSACTION"; - assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 0 }, query2, 0); - - assertRowEqualsWithPreemptedRetry(cluster, new Object[] {0, 0, 0}, check, 0, 0); - assertRowEqualsWithPreemptedRetry(cluster, new Object[] {1, 0, 1}, check, 1, 0); - assertRowEqualsWithPreemptedRetry(cluster, new Object[] {2, 0, 1}, check, 2, 0); - }); - } - - @Test - public void demoTest() throws Throwable - { - SHARED_CLUSTER.schemaChange("CREATE KEYSPACE demo_ks WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor':2};"); - SHARED_CLUSTER.schemaChange("CREATE TABLE demo_ks.org_docs ( org_name text, doc_id int, contents_version int static, title text, permissions int, PRIMARY KEY (org_name, doc_id) );"); - SHARED_CLUSTER.schemaChange("CREATE TABLE demo_ks.org_users ( org_name text, user text, members_version int static, permissions int, PRIMARY KEY (org_name, user) );"); - SHARED_CLUSTER.schemaChange("CREATE TABLE demo_ks.user_docs ( user text, doc_id int, title text, org_name text, permissions int, PRIMARY KEY (user, doc_id) );"); - - SHARED_CLUSTER.forEach(node -> node.runOnInstance(() -> AccordService.instance().createEpochFromConfigUnsafe())); - SHARED_CLUSTER.forEach(node -> node.runOnInstance(() -> AccordService.instance().setCacheSize(0))); - - SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.org_users (org_name, user, members_version, permissions) VALUES ('demo', 'blake', 5, 777);\n", ConsistencyLevel.ALL); - SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.org_users (org_name, user, members_version, permissions) VALUES ('demo', 'scott', 5, 777);\n", ConsistencyLevel.ALL); - SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.org_docs (org_name, doc_id, contents_version, title, permissions) VALUES ('demo', 100, 5, 'README', 644);\n", ConsistencyLevel.ALL); - SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.user_docs (user, doc_id, title, org_name, permissions) VALUES ('blake', 1, 'recipes', NULL, 777);\n", ConsistencyLevel.ALL); - SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.user_docs (user, doc_id, title, org_name, permissions) VALUES ('blake', 100, 'README', 'demo', 644);\n", ConsistencyLevel.ALL); - SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.user_docs (user, doc_id, title, org_name, permissions) VALUES ('scott', 2, 'to do list', NULL, 777);\n", ConsistencyLevel.ALL); - SHARED_CLUSTER.coordinator(1).execute("INSERT INTO demo_ks.user_docs (user, doc_id, title, org_name, permissions) VALUES ('scott', 100, 'README', 'demo', 644);\n", ConsistencyLevel.ALL); - - String addDoc = "BEGIN TRANSACTION\n" + - " LET demo_user = (SELECT * FROM demo_ks.org_users WHERE org_name='demo' LIMIT 1);\n" + - " LET existing = (SELECT * FROM demo_ks.org_docs WHERE org_name='demo' AND doc_id=101);\n" + - " SELECT members_version FROM demo_ks.org_users WHERE org_name='demo' LIMIT 1;\n" + - " IF demo_user.members_version = 5 AND existing IS NULL THEN\n" + - " UPDATE demo_ks.org_docs SET title='slides.key', permissions=777, contents_version += 1 WHERE org_name='demo' AND doc_id=101;\n" + - " UPDATE demo_ks.user_docs SET title='slides.key', permissions=777 WHERE user='blake' AND doc_id=101;\n" + - " UPDATE demo_ks.user_docs SET title='slides.key', permissions=777 WHERE user='scott' AND doc_id=101;\n" + - " END IF\n" + - "COMMIT TRANSACTION"; - assertRowEqualsWithPreemptedRetry(SHARED_CLUSTER, new Object[] { 5 }, addDoc); - - String addUser = "BEGIN TRANSACTION\n" + - " LET demo_doc = (SELECT * FROM demo_ks.org_docs WHERE org_name='demo' LIMIT 1);\n" + - " LET existing = (SELECT * FROM demo_ks.org_users WHERE org_name='demo' AND user='benedict');\n" + - " SELECT contents_version FROM demo_ks.org_docs WHERE org_name='demo' LIMIT 1;\n" + - " IF demo_doc.contents_version = 6 AND existing IS NULL THEN\n" + - " UPDATE demo_ks.org_users SET permissions=777, members_version += 1 WHERE org_name='demo' AND user='benedict';\n" + - " UPDATE demo_ks.user_docs SET title='README', permissions=644 WHERE user='benedict' AND doc_id=100;\n" + - " UPDATE demo_ks.user_docs SET title='slides.key', permissions=777 WHERE user='benedict' AND doc_id=101;\n" + - " END IF\n" + - "COMMIT TRANSACTION"; - assertRowEqualsWithPreemptedRetry(SHARED_CLUSTER, new Object[] { 6 }, addUser); - } - -// @Test -// public void acceptInvalidationTest() -// { -// -// } -// -// @Test -// public void applyAndCheckTest() -// { -// -// } -// -// @Test -// public void beginInvalidationTest() -// { -// -// } -// -// @Test -// public void checkStatusTest() -// { -// -// } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java index 8f35a59f2a..2c9b2a46ae 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java @@ -19,6 +19,8 @@ package org.apache.cassandra.distributed.test.accord; import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; @@ -85,7 +87,13 @@ public abstract class AccordTestBase extends TestBaseImpl protected void test(String tableDDL, FailingConsumer<Cluster> fn) throws Exception { - SHARED_CLUSTER.schemaChange(tableDDL); + test(Collections.singletonList(tableDDL), fn); + } + + protected void test(List<String> ddls, FailingConsumer<Cluster> fn) throws Exception + { + for (String ddl : ddls) + SHARED_CLUSTER.schemaChange(ddl); SHARED_CLUSTER.forEach(node -> node.runOnInstance(() -> AccordService.instance().createEpochFromConfigUnsafe())); // Evict commands from the cache immediately to expose problems loading from disk. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org