This is an automated email from the ASF dual-hosted git repository. jlewandowski pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new 1e04ea4418 Fix null value handling for static columns 1e04ea4418 is described below commit 1e04ea44186b9bd22290db767a6c6ac7e8b05106 Author: Jacek Lewandowski <lewandowski.ja...@gmail.com> AuthorDate: Fri Feb 10 15:29:05 2023 +0100 Fix null value handling for static columns patch by <jacek-lewandowski>; reviewed by <maedhroz> and <dcapwell> for CASSANDRA-18241 --- CHANGES.txt | 1 + .../cql3/statements/TransactionStatement.java | 6 +- .../org/apache/cassandra/service/StorageProxy.java | 7 +- .../cassandra/service/accord/txn/TxnCondition.java | 2 +- .../cassandra/service/accord/txn/TxnNamedRead.java | 9 +- .../distributed/test/accord/AccordCQLTest.java | 122 +++++++++++++++++++-- .../distributed/test/accord/AccordTestBase.java | 55 ++++++++-- 7 files changed, 176 insertions(+), 26 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 9646a1ac5d..a7199d0f7e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ accord + * Fix null value handling for static columns (CASSANDRA-18241) * Feature Flag for Accord Transactions (CASSANDRA-18195) * CEP-15: Multi-Partition Transaction CQL Support (Alpha) (CASSANDRA-17719) * CEP-15 (C*): Messaging and storage engine integration (CASSANDRA-17103) diff --git a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java index 7d99cfa9ea..0348ab9618 100644 --- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java @@ -342,7 +342,8 @@ public class TransactionStatement implements CQLStatement if (selectQuery.queries.size() == 1) { FilteredPartition partition = data.get(TxnDataName.returning()); - returningSelect.select.processPartition(partition.rowIterator(), options, result, FBUtilities.nowInSeconds()); + if (partition != null) + returningSelect.select.processPartition(partition.rowIterator(), options, result, FBUtilities.nowInSeconds()); } else { @@ -350,7 +351,8 @@ public class TransactionStatement implements CQLStatement for (int i = 0; i < selectQuery.queries.size(); i++) { FilteredPartition partition = data.get(TxnDataName.returning(i)); - returningSelect.select.processPartition(partition.rowIterator(), options, result, nowInSec); + if (partition != null) + returningSelect.select.processPartition(partition.rowIterator(), options, result, nowInSec); } } return new ResultMessage.Rows(result.build()); diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 4f85776625..713204ec2b 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -59,6 +59,7 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.CounterMutation; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.EmptyIterators; import org.apache.cassandra.db.IMutation; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.MessageParams; @@ -1904,7 +1905,11 @@ public class StorageProxy implements StorageProxyMBean TxnRead read = TxnRead.createSerialRead(group.queries.get(0)); Txn txn = new Txn.InMemory(read.keys(), read, TxnQuery.ALL); TxnData data = AccordService.instance().coordinate(txn, consistencyLevel); - return PartitionIterators.singletonIterator(data.get(TxnRead.SERIAL_READ).rowIterator()); + FilteredPartition partition = data.get(TxnRead.SERIAL_READ); + if (partition != null) + return PartitionIterators.singletonIterator(partition.rowIterator()); + else + return EmptyIterators.partition(); } private static PartitionIterator legacyReadWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime) diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java b/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java index e0bfaa1f7b..7c60beae60 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java +++ b/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java @@ -337,7 +337,7 @@ public abstract class TxnCondition { checkNotNull(data); FilteredPartition partition = data.get(SERIAL_READ); - Row row = partition.getRow(clustering); + Row row = partition != null ? partition.getRow(clustering) : null; for (Bound bound : bounds) { if (!bound.appliesTo(row)) diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java index ea11312d72..534f4aa262 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java +++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java @@ -32,7 +32,6 @@ import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.db.SinglePartitionReadCommand; import org.apache.cassandra.db.partitions.FilteredPartition; import org.apache.cassandra.db.partitions.PartitionIterator; -import org.apache.cassandra.db.partitions.PartitionIterators; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.io.IVersionedSerializer; @@ -130,9 +129,13 @@ public class TxnNamedRead extends AbstractSerialized<ReadCommand> UnfilteredPartitionIterator partition = read.executeLocally(controller); PartitionIterator iterator = UnfilteredPartitionIterators.filter(partition, read.nowInSec())) { - FilteredPartition filtered = FilteredPartition.create(PartitionIterators.getOnlyElement(iterator, read)); TxnData result = new TxnData(); - result.put(name, filtered); + if (iterator.hasNext()) + { + FilteredPartition filtered = FilteredPartition.create(iterator.next()); + if (filtered.hasRows() || read.selectsFullPartition()) + result.put(name, filtered); + } return result; } }); diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java index 9d830b1d66..b0d61723e8 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -31,6 +32,8 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.distributed.Cluster; import org.assertj.core.api.Assertions; import org.junit.BeforeClass; import org.junit.Ignore; @@ -242,22 +245,22 @@ public class AccordCQLTest extends AccordTestBase cluster -> { String insertNull = "BEGIN TRANSACTION\n" + - " LET row0 = (SELECT v FROM " + currentTable + " WHERE k = 0 LIMIT 1);\n" + - " SELECT row0.v;\n" + + " LET row0 = (SELECT * FROM " + currentTable + " WHERE k = 0 LIMIT 1);\n" + + " SELECT row0.k, row0.v;\n" + " IF row0.v IS NULL THEN\n" + " INSERT INTO " + currentTable + " (k, c, v) VALUES (?, ?, null);\n" + " END IF\n" + "COMMIT TRANSACTION"; - assertRowEqualsWithPreemptedRetry(cluster, new Object[] { null }, insertNull, 0, 0); + assertRowEqualsWithPreemptedRetry(cluster, new Object[] { null, null }, insertNull, 0, 0); String insert = "BEGIN TRANSACTION\n" + - " LET row0 = (SELECT v FROM " + currentTable + " WHERE k = 0 LIMIT 1);\n" + - " SELECT row0.v;\n" + + " LET row0 = (SELECT * FROM " + currentTable + " WHERE k = 0 LIMIT 1);\n" + + " SELECT row0.k, row0.v;\n" + " IF row0.v IS NULL THEN\n" + " INSERT INTO " + currentTable + " (k, c, v) VALUES (?, ?, ?);\n" + " END IF\n" + "COMMIT TRANSACTION"; - assertRowEqualsWithPreemptedRetry(cluster, new Object[] { null }, insert, 0, 0, 1); + assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 0, null }, insert, 0, 0, 1); String check = "BEGIN TRANSACTION\n" + " SELECT k, c, v FROM " + currentTable + " WHERE k=0 AND c=0;\n" + @@ -266,6 +269,111 @@ public class AccordCQLTest extends AccordTestBase }); } + @Test + public void testQueryStaticColumn() throws Exception + { + test("CREATE TABLE " + currentTable + " (k int, c int, s int static, v int, primary key (k, c))", + cluster -> + { + // select partition key, clustering key and static column, restrict on partition and clustering + testQueryStaticColumn(cluster, + "LET row0 = (SELECT k, c, s, v FROM " + currentTable + " WHERE k = ? AND c = 0);\n" + + "SELECT row0.k, row0.c, row0.s, row0.v;\n", + + "SELECT k, c, s, v FROM " + currentTable + " WHERE k = ? AND c = 0"); + + // select partition key, clustering key and static column, restrict on partition and limit to 1 row + testQueryStaticColumn(cluster, + "LET row0 = (SELECT k, c, s, v FROM " + currentTable + " WHERE k = ? LIMIT 1);\n" + + "SELECT row0.k, row0.c, row0.s, row0.v;\n", + + "SELECT k, c, s, v FROM " + currentTable + " WHERE k = ? LIMIT 1"); + + // select static column and regular column, restrict on partition and clustering + testQueryStaticColumn(cluster, + "LET row0 = (SELECT s, v FROM " + currentTable + " WHERE k = ? AND c = 0);\n" + + "SELECT row0.s, row0.v;\n", + + "SELECT s, v FROM " + currentTable + " WHERE k = ? AND c = 0"); + + // select just static column, restrict on partition and limit to 1 row + testQueryStaticColumn(cluster, + "LET row0 = (SELECT s FROM " + currentTable + " WHERE k = ? LIMIT 1);\n" + + "SELECT row0.s;\n", + + "SELECT s FROM " + currentTable + " WHERE k = ? LIMIT 1"); + }); + } + + private void testQueryStaticColumn(Cluster cluster, String accordReadQuery, String simpleReadQuery) + { + logger().info("Empty table"); + int key = 10; + assertResultsFromAccordMatches(cluster, accordReadQuery, simpleReadQuery, key++); + + cluster.get(1).coordinator().execute("INSERT INTO " + currentTable + " (k, s) VALUES (?, null);", ConsistencyLevel.ALL, key); + logger().info("null -> static column"); + assertResultsFromAccordMatches(cluster, accordReadQuery, simpleReadQuery, key++); + + cluster.get(1).coordinator().execute("INSERT INTO " + currentTable + " (k, s) VALUES (?, 1);", ConsistencyLevel.ALL, key); + logger().info("Inserted 1 -> static column"); + assertResultsFromAccordMatches(cluster, accordReadQuery, simpleReadQuery, key++); + + cluster.get(1).coordinator().execute("INSERT INTO " + currentTable + " (k, c) VALUES (?, 0);", ConsistencyLevel.ALL, key); + logger().info("Inserted 0 -> clustering"); + assertResultsFromAccordMatches(cluster, accordReadQuery, simpleReadQuery, key); + } + + @Test + public void testUpdateStaticColumn() throws Exception { + test("CREATE TABLE " + currentTable + " (k int, c int, s int static, v int, primary key (k, c))", + cluster -> + { + checkUpdateStatic(cluster, "SET s=1 WHERE k=?", 101, "[[101, null, 1, null]]", "[]"); + checkUpdateStatic(cluster, "SET s=1, v=11 WHERE k=? AND c=0", 101, "[[101, 0, 1, 11]]", "[[101, 0, 1, 11]]"); + + // commented out until org.apache.cassandra.cql3.statements.ModificationStatement.createSelectForTxn is fixed + // checkUpdateStatic(cluster, "SET s+=1 WHERE k=?", 101, "[]", "[]"); + + checkUpdateStatic(cluster, "SET s+=1, v+=11 WHERE k=? AND c=0", 101, "[]", "[]"); + }); + } + + private void checkUpdateStatic(Cluster cluster, String update, int key, String expPart, String expClust) + { + Object[][] r1, r2, r3, r4, r; + r = cluster.get(1).coordinator().execute("UPDATE " + currentTable + " " + update + " IF s = NULL;", ConsistencyLevel.QUORUM, key); + Assertions.assertThat(Arrays.deepToString(r)).isEqualTo("[[true]]"); + r1 = cluster.get(1).coordinator().execute("SELECT * FROM " + currentTable + " WHERE k = ? LIMIT 1;", ConsistencyLevel.SERIAL, key); + r2 = cluster.get(1).coordinator().execute("SELECT * FROM " + currentTable + " WHERE k = ? AND c = 0;", ConsistencyLevel.SERIAL, key); + cluster.get(1).coordinator().execute("TRUNCATE " + currentTable, ConsistencyLevel.ALL); + + executeAsTxn(cluster, "UPDATE " + currentTable + " " + update + ";", key); + r3 = executeAsTxn(cluster, "SELECT * FROM " + currentTable + " WHERE k = ? LIMIT 1;", key).toObjectArrays(); + r4 = executeAsTxn(cluster, "SELECT * FROM " + currentTable + " WHERE k = ? AND c = 0;", key).toObjectArrays(); + cluster.get(1).coordinator().execute("TRUNCATE " + currentTable, ConsistencyLevel.ALL); + + Assertions.assertThat(Arrays.deepToString(r1)).isEqualTo(expPart); + Assertions.assertThat(Arrays.deepToString(r2)).isEqualTo(expClust); + Assertions.assertThat(Arrays.deepToString(r3)).isEqualTo(expPart); + Assertions.assertThat(Arrays.deepToString(r4)).isEqualTo(expClust); + } + + private void assertResultsFromAccordMatches(Cluster cluster, String accordRead, String simpleRead, int key) + { + Object[][] simpleReadResult = cluster.get(1).executeInternal(simpleRead, key); + Object[][] accordReadResult = executeWithRetry(cluster, accordRead, key).toObjectArrays(); + + Assertions.assertThat(withRemovedNullOnlyRows(accordReadResult)).isEqualTo(withRemovedNullOnlyRows(simpleReadResult)); + } + + private static Object[][] withRemovedNullOnlyRows(Object[][] results) + { + return Arrays.stream(results) + .filter(row -> !Arrays.stream(row).allMatch(Objects::isNull)) + .toArray(Object[][]::new); + } + @Test public void testScalarEQ() throws Throwable { @@ -2305,7 +2413,7 @@ public class AccordCQLTest extends AccordTestBase " LET existing = (SELECT * FROM demo_ks.org_docs WHERE org_name='demo' AND doc_id=101);\n" + " SELECT members_version FROM demo_ks.org_users WHERE org_name='demo' LIMIT 1;\n" + " IF demo_user.members_version = 5 AND existing IS NULL THEN\n" + - " UPDATE demo_ks.org_docs SET title='slides.key', permissions=777, contents_version += 1 WHERE org_name='demo' AND doc_id=101;\n" + + " UPDATE demo_ks.org_docs SET title='slides.key', permissions=777, contents_version = 6 WHERE org_name='demo' AND doc_id=101;\n" + " UPDATE demo_ks.user_docs SET title='slides.key', permissions=777 WHERE user='blake' AND doc_id=101;\n" + " UPDATE demo_ks.user_docs SET title='slides.key', permissions=777 WHERE user='scott' AND doc_id=101;\n" + " END IF\n" + diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java index 7590fa4136..1a7d59f8fe 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java @@ -19,11 +19,13 @@ package org.apache.cassandra.distributed.test.accord; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.junit.AfterClass; import org.junit.Before; @@ -60,12 +62,14 @@ import static org.junit.Assert.assertArrayEquals; public abstract class AccordTestBase extends TestBaseImpl { + private static final Logger logger = LoggerFactory.getLogger(AccordTestBase.class); + private static final int MAX_RETRIES = 10; + protected static final AtomicInteger COUNTER = new AtomicInteger(0); protected static Cluster SHARED_CLUSTER; - + protected String currentTable; - private final Logger logger = LoggerFactory.getLogger(getClass()); @BeforeClass public static void setupClass() throws IOException @@ -140,8 +144,16 @@ public abstract class AccordTestBase extends TestBaseImpl .start()); } - private static SimpleQueryResult execute(Cluster cluster, String check, Object... boundValues) + protected static SimpleQueryResult executeAsTxn(Cluster cluster, String check, Object... boundValues) + { + String normalized = wrapInTxn(check); + logger.info("Executing transaction statement:\n{}", normalized); + return cluster.coordinator(1).executeWithResult(normalized, ConsistencyLevel.ANY, boundValues); + } + + protected static SimpleQueryResult execute(Cluster cluster, String check, Object... boundValues) { + logger.info("Executing statement:\n{}", check); return cluster.coordinator(1).executeWithResult(check, ConsistencyLevel.ANY, boundValues); } @@ -181,29 +193,48 @@ public abstract class AccordTestBase extends TestBaseImpl { return execute(cluster, check, boundValues); } - catch (Throwable t) + catch (RuntimeException ex) { - if (AssertionUtils.rootCauseIs(Preempted.class).matches(t)) + if (count <= MAX_RETRIES && AssertionUtils.rootCauseIs(Preempted.class).matches(ex)) { - logger.warn("[Retry attempt={}] Preempted failure for {}", count, check); + logger.warn("[Retry attempt={}] Preempted failure for\n{}", count, check); return executeWithRetry0(count + 1, cluster, check, boundValues); } - throw t; + throw ex; } } protected SimpleQueryResult executeWithRetry(Cluster cluster, String check, Object... boundValues) { + check = wrapInTxn(check); + // is this method safe? - cluster.get(1).runOnInstance(() -> { - TransactionStatement stmt = AccordTestUtils.parse(check); - if (!isIdempotent(stmt)) - throw new AssertionError("Unable to retry txn that is not idempotent: cql=" + check); - }); + + if (!isIdempotent(cluster, check)) + throw new AssertionError("Unable to retry txn that is not idempotent: cql=\n" + check); + return executeWithRetry0(0, cluster, check, boundValues); } + private boolean isIdempotent(Cluster cluster, String cql) + { + return cluster.get(1).callOnInstance(() -> { + TransactionStatement stmt = AccordTestUtils.parse(cql); + return isIdempotent(stmt); + }); + } + + private static String wrapInTxn(String statement) + { + if (!statement.trim().toUpperCase().startsWith("BEGIN TRANSACTION")) + { + statement = statement.trim(); + statement = Arrays.stream(statement.split("\\n")).collect(Collectors.joining("\n ", "BEGIN TRANSACTION\n ", "\nCOMMIT TRANSACTION")); + } + return statement; + } + public static boolean isIdempotent(TransactionStatement statement) { for (ModificationStatement update : statement.getUpdates()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org