This is an automated email from the ASF dual-hosted git repository.

jlewandowski pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 1e04ea4418 Fix null value handling for static columns
1e04ea4418 is described below

commit 1e04ea44186b9bd22290db767a6c6ac7e8b05106
Author: Jacek Lewandowski <lewandowski.ja...@gmail.com>
AuthorDate: Fri Feb 10 15:29:05 2023 +0100

    Fix null value handling for static columns
    
    patch by <jacek-lewandowski>; reviewed by <maedhroz> and <dcapwell> for 
CASSANDRA-18241
---
 CHANGES.txt                                        |   1 +
 .../cql3/statements/TransactionStatement.java      |   6 +-
 .../org/apache/cassandra/service/StorageProxy.java |   7 +-
 .../cassandra/service/accord/txn/TxnCondition.java |   2 +-
 .../cassandra/service/accord/txn/TxnNamedRead.java |   9 +-
 .../distributed/test/accord/AccordCQLTest.java     | 122 +++++++++++++++++++--
 .../distributed/test/accord/AccordTestBase.java    |  55 ++++++++--
 7 files changed, 176 insertions(+), 26 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 9646a1ac5d..a7199d0f7e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 accord
+ * Fix null value handling for static columns (CASSANDRA-18241)
  * Feature Flag for Accord Transactions (CASSANDRA-18195)
  * CEP-15: Multi-Partition Transaction CQL Support (Alpha) (CASSANDRA-17719)
  * CEP-15 (C*): Messaging and storage engine integration (CASSANDRA-17103)
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
index 7d99cfa9ea..0348ab9618 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
@@ -342,7 +342,8 @@ public class TransactionStatement implements CQLStatement
                 if (selectQuery.queries.size() == 1)
                 {
                     FilteredPartition partition = 
data.get(TxnDataName.returning());
-                    
returningSelect.select.processPartition(partition.rowIterator(), options, 
result, FBUtilities.nowInSeconds());
+                    if (partition != null)
+                        
returningSelect.select.processPartition(partition.rowIterator(), options, 
result, FBUtilities.nowInSeconds());
                 }
                 else
                 {
@@ -350,7 +351,8 @@ public class TransactionStatement implements CQLStatement
                     for (int i = 0; i < selectQuery.queries.size(); i++)
                     {
                         FilteredPartition partition = 
data.get(TxnDataName.returning(i));
-                        
returningSelect.select.processPartition(partition.rowIterator(), options, 
result, nowInSec);
+                        if (partition != null)
+                            
returningSelect.select.processPartition(partition.rowIterator(), options, 
result, nowInSec);
                     }
                 }
                 return new ResultMessage.Rows(result.build());
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 4f85776625..713204ec2b 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -59,6 +59,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.CounterMutation;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.EmptyIterators;
 import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.MessageParams;
@@ -1904,7 +1905,11 @@ public class StorageProxy implements StorageProxyMBean
         TxnRead read = TxnRead.createSerialRead(group.queries.get(0));
         Txn txn = new Txn.InMemory(read.keys(), read, TxnQuery.ALL);
         TxnData data = AccordService.instance().coordinate(txn, 
consistencyLevel);
-        return 
PartitionIterators.singletonIterator(data.get(TxnRead.SERIAL_READ).rowIterator());
+        FilteredPartition partition = data.get(TxnRead.SERIAL_READ);
+        if (partition != null)
+            return 
PartitionIterators.singletonIterator(partition.rowIterator());
+        else
+            return EmptyIterators.partition();
     }
 
     private static PartitionIterator 
legacyReadWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel 
consistencyLevel, long queryStartNanoTime)
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
index e0bfaa1f7b..7c60beae60 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
@@ -337,7 +337,7 @@ public abstract class TxnCondition
         {
             checkNotNull(data);
             FilteredPartition partition = data.get(SERIAL_READ);
-            Row row = partition.getRow(clustering);
+            Row row = partition != null ? partition.getRow(clustering) : null;
             for (Bound bound : bounds)
             {
                 if (!bound.appliesTo(row))
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
index ea11312d72..534f4aa262 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.ReadExecutionController;
 import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.partitions.FilteredPartition;
 import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.PartitionIterators;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -130,9 +129,13 @@ public class TxnNamedRead extends 
AbstractSerialized<ReadCommand>
                  UnfilteredPartitionIterator partition = 
read.executeLocally(controller);
                  PartitionIterator iterator = 
UnfilteredPartitionIterators.filter(partition, read.nowInSec()))
             {
-                FilteredPartition filtered = 
FilteredPartition.create(PartitionIterators.getOnlyElement(iterator, read));
                 TxnData result = new TxnData();
-                result.put(name, filtered);
+                if (iterator.hasNext())
+                {
+                    FilteredPartition filtered = 
FilteredPartition.create(iterator.next());
+                    if (filtered.hasRows() || read.selectsFullPartition())
+                        result.put(name, filtered);
+                }
                 return result;
             }
         });
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
index 9d830b1d66..b0d61723e8 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -31,6 +32,8 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.distributed.Cluster;
 import org.assertj.core.api.Assertions;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -242,22 +245,22 @@ public class AccordCQLTest extends AccordTestBase
              cluster ->
              {
                  String insertNull = "BEGIN TRANSACTION\n" +
-                                     "  LET row0 = (SELECT v FROM " + 
currentTable + " WHERE k = 0 LIMIT 1);\n" +
-                                     "  SELECT row0.v;\n" +
+                                     "  LET row0 = (SELECT * FROM " + 
currentTable + " WHERE k = 0 LIMIT 1);\n" +
+                                     "  SELECT row0.k, row0.v;\n" +
                                      "  IF row0.v IS NULL THEN\n" +
                                      "    INSERT INTO " + currentTable + " (k, 
c, v) VALUES (?, ?, null);\n" +
                                      "  END IF\n" +
                                      "COMMIT TRANSACTION";
-                 assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 
null }, insertNull, 0, 0);
+                 assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 
null, null }, insertNull, 0, 0);
 
                  String insert = "BEGIN TRANSACTION\n" +
-                                 "  LET row0 = (SELECT v FROM " + currentTable 
+ " WHERE k = 0 LIMIT 1);\n" +
-                                 "  SELECT row0.v;\n" +
+                                 "  LET row0 = (SELECT * FROM " + currentTable 
+ " WHERE k = 0 LIMIT 1);\n" +
+                                 "  SELECT row0.k, row0.v;\n" +
                                  "  IF row0.v IS NULL THEN\n" +
                                  "    INSERT INTO " + currentTable + " (k, c, 
v) VALUES (?, ?, ?);\n" +
                                  "  END IF\n" +
                                  "COMMIT TRANSACTION";
-                 assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 
null }, insert, 0, 0, 1);
+                 assertRowEqualsWithPreemptedRetry(cluster, new Object[] { 0, 
null }, insert, 0, 0, 1);
 
                  String check = "BEGIN TRANSACTION\n" +
                                 "  SELECT k, c, v  FROM " + currentTable + " 
WHERE k=0 AND c=0;\n" +
@@ -266,6 +269,111 @@ public class AccordCQLTest extends AccordTestBase
              });
     }
 
+    @Test
+    public void testQueryStaticColumn() throws Exception
+    {
+        test("CREATE TABLE " + currentTable + " (k int, c int, s int static, v 
int, primary key (k, c))",
+             cluster ->
+             {
+                 // select partition key, clustering key and static column, 
restrict on partition and clustering
+                 testQueryStaticColumn(cluster,
+                                       "LET row0 = (SELECT k, c, s, v FROM " + 
currentTable + " WHERE k = ? AND c = 0);\n" +
+                                       "SELECT row0.k, row0.c, row0.s, 
row0.v;\n",
+
+                                       "SELECT k, c, s, v FROM " + 
currentTable + " WHERE k = ? AND c = 0");
+
+                 // select partition key, clustering key and static column, 
restrict on partition and limit to 1 row
+                 testQueryStaticColumn(cluster,
+                                       "LET row0 = (SELECT k, c, s, v FROM " + 
currentTable + " WHERE k = ? LIMIT 1);\n" +
+                                       "SELECT row0.k, row0.c, row0.s, 
row0.v;\n",
+
+                                       "SELECT k, c, s, v FROM " + 
currentTable + " WHERE k = ? LIMIT 1");
+
+                 // select static column and regular column, restrict on 
partition and clustering
+                 testQueryStaticColumn(cluster,
+                                       "LET row0 = (SELECT s, v FROM " + 
currentTable + " WHERE k = ? AND c = 0);\n" +
+                                       "SELECT row0.s, row0.v;\n",
+
+                                       "SELECT s, v FROM " + currentTable + " 
WHERE k = ? AND c = 0");
+
+                 // select just static column, restrict on partition and limit 
to 1 row
+                 testQueryStaticColumn(cluster,
+                                       "LET row0 = (SELECT s FROM " + 
currentTable + " WHERE k = ? LIMIT 1);\n" +
+                                       "SELECT row0.s;\n",
+
+                                       "SELECT s FROM " + currentTable + " 
WHERE k = ? LIMIT 1");
+             });
+    }
+
+    private void testQueryStaticColumn(Cluster cluster, String 
accordReadQuery, String simpleReadQuery)
+    {
+        logger().info("Empty table");
+        int key = 10;
+        assertResultsFromAccordMatches(cluster, accordReadQuery, 
simpleReadQuery, key++);
+
+        cluster.get(1).coordinator().execute("INSERT INTO " + currentTable + " 
(k, s) VALUES (?, null);", ConsistencyLevel.ALL, key);
+        logger().info("null -> static column");
+        assertResultsFromAccordMatches(cluster, accordReadQuery, 
simpleReadQuery, key++);
+
+        cluster.get(1).coordinator().execute("INSERT INTO " + currentTable + " 
(k, s) VALUES (?, 1);", ConsistencyLevel.ALL, key);
+        logger().info("Inserted 1 -> static column");
+        assertResultsFromAccordMatches(cluster, accordReadQuery, 
simpleReadQuery, key++);
+
+        cluster.get(1).coordinator().execute("INSERT INTO " + currentTable + " 
(k, c) VALUES (?, 0);", ConsistencyLevel.ALL, key);
+        logger().info("Inserted 0 -> clustering");
+        assertResultsFromAccordMatches(cluster, accordReadQuery, 
simpleReadQuery, key);
+    }
+
+    @Test
+    public void testUpdateStaticColumn() throws Exception {
+        test("CREATE TABLE " + currentTable + " (k int, c int, s int static, v 
int, primary key (k, c))",
+             cluster ->
+             {
+                 checkUpdateStatic(cluster, "SET s=1 WHERE k=?", 101, "[[101, 
null, 1, null]]", "[]");
+                 checkUpdateStatic(cluster, "SET s=1, v=11 WHERE k=? AND c=0", 
101, "[[101, 0, 1, 11]]", "[[101, 0, 1, 11]]");
+
+                 // commented out until 
org.apache.cassandra.cql3.statements.ModificationStatement.createSelectForTxn 
is fixed
+                 // checkUpdateStatic(cluster, "SET s+=1 WHERE k=?", 101, 
"[]", "[]");
+
+                 checkUpdateStatic(cluster, "SET s+=1, v+=11 WHERE k=? AND 
c=0", 101, "[]", "[]");
+             });
+    }
+
+    private void checkUpdateStatic(Cluster cluster, String update, int key, 
String expPart, String expClust)
+    {
+        Object[][] r1, r2, r3, r4, r;
+        r = cluster.get(1).coordinator().execute("UPDATE " + currentTable + " 
" + update + " IF s = NULL;", ConsistencyLevel.QUORUM, key);
+        Assertions.assertThat(Arrays.deepToString(r)).isEqualTo("[[true]]");
+        r1 = cluster.get(1).coordinator().execute("SELECT * FROM " + 
currentTable + " WHERE k = ? LIMIT 1;", ConsistencyLevel.SERIAL, key);
+        r2 = cluster.get(1).coordinator().execute("SELECT * FROM " + 
currentTable + " WHERE k = ? AND c = 0;", ConsistencyLevel.SERIAL, key);
+        cluster.get(1).coordinator().execute("TRUNCATE " + currentTable, 
ConsistencyLevel.ALL);
+
+        executeAsTxn(cluster, "UPDATE " + currentTable + " " + update + ";", 
key);
+        r3 = executeAsTxn(cluster, "SELECT * FROM " + currentTable + " WHERE k 
= ? LIMIT 1;", key).toObjectArrays();
+        r4 = executeAsTxn(cluster, "SELECT * FROM " + currentTable + " WHERE k 
= ? AND c = 0;", key).toObjectArrays();
+        cluster.get(1).coordinator().execute("TRUNCATE " + currentTable, 
ConsistencyLevel.ALL);
+
+        Assertions.assertThat(Arrays.deepToString(r1)).isEqualTo(expPart);
+        Assertions.assertThat(Arrays.deepToString(r2)).isEqualTo(expClust);
+        Assertions.assertThat(Arrays.deepToString(r3)).isEqualTo(expPart);
+        Assertions.assertThat(Arrays.deepToString(r4)).isEqualTo(expClust);
+    }
+
+    private void assertResultsFromAccordMatches(Cluster cluster, String 
accordRead, String simpleRead, int key)
+    {
+        Object[][] simpleReadResult = 
cluster.get(1).executeInternal(simpleRead, key);
+        Object[][] accordReadResult = executeWithRetry(cluster, accordRead, 
key).toObjectArrays();
+
+        
Assertions.assertThat(withRemovedNullOnlyRows(accordReadResult)).isEqualTo(withRemovedNullOnlyRows(simpleReadResult));
+    }
+
+    private static Object[][] withRemovedNullOnlyRows(Object[][] results)
+    {
+        return Arrays.stream(results)
+                     .filter(row -> 
!Arrays.stream(row).allMatch(Objects::isNull))
+                     .toArray(Object[][]::new);
+    }
+
     @Test
     public void testScalarEQ() throws Throwable
     {
@@ -2305,7 +2413,7 @@ public class AccordCQLTest extends AccordTestBase
                         "  LET existing = (SELECT * FROM demo_ks.org_docs 
WHERE org_name='demo' AND doc_id=101);\n" +
                         "  SELECT members_version FROM demo_ks.org_users WHERE 
org_name='demo' LIMIT 1;\n" +
                         "  IF demo_user.members_version = 5 AND existing IS 
NULL THEN\n" +
-                        "    UPDATE demo_ks.org_docs SET title='slides.key', 
permissions=777, contents_version += 1 WHERE org_name='demo' AND doc_id=101;\n" 
+
+                        "    UPDATE demo_ks.org_docs SET title='slides.key', 
permissions=777, contents_version = 6 WHERE org_name='demo' AND doc_id=101;\n" +
                         "    UPDATE demo_ks.user_docs SET title='slides.key', 
permissions=777 WHERE user='blake' AND doc_id=101;\n" +
                         "    UPDATE demo_ks.user_docs SET title='slides.key', 
permissions=777 WHERE user='scott' AND doc_id=101;\n" +
                         "  END IF\n" +
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
index 7590fa4136..1a7d59f8fe 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
@@ -19,11 +19,13 @@
 package org.apache.cassandra.distributed.test.accord;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -60,12 +62,14 @@ import static org.junit.Assert.assertArrayEquals;
 
 public abstract class AccordTestBase extends TestBaseImpl
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(AccordTestBase.class);
+    private static final int MAX_RETRIES = 10;
+
     protected static final AtomicInteger COUNTER = new AtomicInteger(0);
 
     protected static Cluster SHARED_CLUSTER;
-    
+
     protected String currentTable;
-    private final Logger logger = LoggerFactory.getLogger(getClass());
 
     @BeforeClass
     public static void setupClass() throws IOException
@@ -140,8 +144,16 @@ public abstract class AccordTestBase extends TestBaseImpl
                            .start());
     }
 
-    private static SimpleQueryResult execute(Cluster cluster, String check, 
Object... boundValues)
+    protected static SimpleQueryResult executeAsTxn(Cluster cluster, String 
check, Object... boundValues)
+    {
+        String normalized = wrapInTxn(check);
+        logger.info("Executing transaction statement:\n{}", normalized);
+        return cluster.coordinator(1).executeWithResult(normalized, 
ConsistencyLevel.ANY, boundValues);
+    }
+
+    protected static SimpleQueryResult execute(Cluster cluster, String check, 
Object... boundValues)
     {
+        logger.info("Executing statement:\n{}", check);
         return cluster.coordinator(1).executeWithResult(check, 
ConsistencyLevel.ANY, boundValues);
     }
 
@@ -181,29 +193,48 @@ public abstract class AccordTestBase extends TestBaseImpl
         {
             return execute(cluster, check, boundValues);
         }
-        catch (Throwable t)
+        catch (RuntimeException ex)
         {
-            if (AssertionUtils.rootCauseIs(Preempted.class).matches(t))
+            if (count <= MAX_RETRIES && 
AssertionUtils.rootCauseIs(Preempted.class).matches(ex))
             {
-                logger.warn("[Retry attempt={}] Preempted failure for {}", 
count, check);
+                logger.warn("[Retry attempt={}] Preempted failure for\n{}", 
count, check);
                 return executeWithRetry0(count + 1, cluster, check, 
boundValues);
             }
 
-            throw t;
+            throw ex;
         }
     }
 
     protected SimpleQueryResult executeWithRetry(Cluster cluster, String 
check, Object... boundValues)
     {
+        check = wrapInTxn(check);
+
         // is this method safe?
-        cluster.get(1).runOnInstance(() -> {
-            TransactionStatement stmt = AccordTestUtils.parse(check);
-            if (!isIdempotent(stmt))
-                throw new AssertionError("Unable to retry txn that is not 
idempotent: cql=" + check);
-        });
+
+        if (!isIdempotent(cluster, check))
+            throw new AssertionError("Unable to retry txn that is not 
idempotent: cql=\n" + check);
+
         return executeWithRetry0(0, cluster, check, boundValues);
     }
 
+    private boolean isIdempotent(Cluster cluster, String cql)
+    {
+        return cluster.get(1).callOnInstance(() -> {
+            TransactionStatement stmt = AccordTestUtils.parse(cql);
+            return isIdempotent(stmt);
+        });
+    }
+
+    private static String wrapInTxn(String statement)
+    {
+        if (!statement.trim().toUpperCase().startsWith("BEGIN TRANSACTION"))
+        {
+            statement = statement.trim();
+            statement = 
Arrays.stream(statement.split("\\n")).collect(Collectors.joining("\n  ", "BEGIN 
TRANSACTION\n  ", "\nCOMMIT TRANSACTION"));
+        }
+        return statement;
+    }
+
     public static boolean isIdempotent(TransactionStatement statement)
     {
         for (ModificationStatement update : statement.getUpdates())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to