This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new acec78abf4 BEGIN TRANSACTION crashes if a mutation touches multiple 
rows
acec78abf4 is described below

commit acec78abf40230243693e2e13ded131356773ada
Author: David Capwell <[email protected]>
AuthorDate: Tue Aug 19 13:23:04 2025 -0700

    BEGIN TRANSACTION crashes if a mutation touches multiple rows
    
    patch by David Capwell; reviewed by Ariel Weisberg for CASSANDRA-20844
---
 CHANGES.txt                                        |   1 +
 .../cql3/restrictions/SimpleRestriction.java       |   2 +-
 .../cql3/statements/ModificationStatement.java     |  18 +++-
 .../org/apache/cassandra/service/StorageProxy.java |   5 +-
 .../service/accord/txn/AccordUpdateParameters.java |   4 +-
 .../service/accord/txn/TxnReferenceOperations.java |  37 +++----
 .../cassandra/service/accord/txn/TxnWrite.java     |  25 +++--
 .../service/consensus/TransactionalMode.java       |   6 +-
 .../UnsupportedTransactionConsistencyLevel.java    |  43 ++++++++
 .../ConsensusMigrationMutationHelper.java          |   6 +-
 .../org/apache/cassandra/utils/ByteBufferUtil.java |   1 +
 .../distributed/test/accord/AccordCQLTestBase.java | 115 ++++++++++++++++++---
 .../test/accord/AccordInteroperabilityTest.java    |  10 +-
 .../distributed/test/accord/AccordTestBase.java    |   1 +
 .../test/cql3/CasMultiNodeTableWalkBase.java       |   2 +-
 .../test/cql3/SingleNodeTokenConflictTest.java     |   4 +-
 .../fuzz/topology/AccordTopologyMixupTest.java     |   2 +-
 .../cassandra/harry/model/ASTSingleTableModel.java |  16 ++-
 .../test/SingleNodeSingleTableASTTest.java         |   3 -
 .../simulator/test/SingleTableASTSimulation.java   |  13 +--
 .../config/DatabaseDescriptorRefTest.java          |   1 +
 .../apache/cassandra/cql3/ast/ExpressionTest.java  |  14 +++
 .../org/apache/cassandra/cql3/ast/Literal.java     |   6 ++
 .../unit/org/apache/cassandra/cql3/ast/Symbol.java |   8 +-
 .../org/apache/cassandra/utils/ASTGenerators.java  |  49 +++++++--
 25 files changed, 302 insertions(+), 90 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index eb07e60cb6..aeef6ebc6c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * BEGIN TRANSACTION crashes if a mutation touches multiple rows 
(CASSANDRA-20844)
  * Fix version range check in MessagingService.getVersionOrdinal 
(CASSANDRA-20842)
  * Allow custom constraints to be loaded via SPI (CASSANDRA-20824)
  * Optimize DataPlacement lookup by ReplicationParams (CASSANDRA-20804)
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/SimpleRestriction.java 
b/src/java/org/apache/cassandra/cql3/restrictions/SimpleRestriction.java
index 13faf3a27c..85d75fb07f 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/SimpleRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SimpleRestriction.java
@@ -225,7 +225,7 @@ public final class SimpleRestriction implements 
SingleRestriction
     {
         assert operator == Operator.EQ ||
                operator == Operator.IN ||
-               operator == Operator.ANN;
+               operator == Operator.ANN : String.format("Unexpected operator: 
%s", operator);
         return bindAndGetClusteringElements(options);
     }
 
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 5d8d5d5b3f..fa7aaaebaa 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -912,8 +912,16 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
     {
         List<TxnReferenceOperation> regularOps = 
getTxnReferenceOps(operations.regularSubstitutions(), options);
         List<TxnReferenceOperation> staticOps = 
getTxnReferenceOps(operations.staticSubstitutions(), options);
-        Clustering<?> clustering = !regularOps.isEmpty() ? 
Iterables.getOnlyElement(createClustering(options, state)) : null;
-        return new TxnReferenceOperations(metadata, clustering, regularOps, 
staticOps);
+        List<Clustering<?>> clusterings = txnClusterings(options, state);
+        return new TxnReferenceOperations(metadata, clusterings, regularOps, 
staticOps);
+    }
+
+    private List<Clustering<?>> txnClusterings(QueryOptions options, 
ClientState state)
+    {
+        if (restrictions.hasAllPrimaryKeyColumnsRestrictedByEqualities())
+            return new ArrayList<>(restrictions.getClusteringColumns(options, 
state));
+        // Range/Partition delete, static only
+        return Collections.emptyList();
     }
 
     public ModificationStatement forTxn()
@@ -944,14 +952,16 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
     {
         PartitionUpdate baseUpdate = getTxnUpdate(state, options);
         TxnReferenceOperations referenceOps = getTxnReferenceOps(options, 
state);
-        return new TxnWrite.Fragment(partitionKey, index, baseUpdate, 
referenceOps);
+        long timestamp = attrs.isTimestampSet() ? 
attrs.getTimestamp(TxnWrite.NO_TIMESTAMP, options) : TxnWrite.NO_TIMESTAMP;
+        return new TxnWrite.Fragment(partitionKey, index, baseUpdate, 
referenceOps, timestamp);
     }
 
     public TxnWrite.Fragment getTxnWriteFragment(int index, ClientState state, 
QueryOptions options, KeyCollector keyCollector)
     {
         PartitionUpdate baseUpdate = getTxnUpdate(state, options);
         TxnReferenceOperations referenceOps = getTxnReferenceOps(options, 
state);
-        return new 
TxnWrite.Fragment(keyCollector.collect(baseUpdate.metadata(), 
baseUpdate.partitionKey()), index, baseUpdate, referenceOps);
+        long timestamp = attrs.isTimestampSet() ? 
attrs.getTimestamp(TxnWrite.NO_TIMESTAMP, options) : TxnWrite.NO_TIMESTAMP;
+        return new 
TxnWrite.Fragment(keyCollector.collect(baseUpdate.metadata(), 
baseUpdate.partitionKey()), index, baseUpdate, referenceOps, timestamp);
     }
 
     final void addUpdates(UpdatesCollector collector,
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 72692af133..40b3d1fe82 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -149,6 +149,7 @@ import 
org.apache.cassandra.service.accord.txn.TxnRangeReadResult;
 import org.apache.cassandra.service.accord.txn.TxnRead;
 import org.apache.cassandra.service.accord.txn.TxnResult;
 import org.apache.cassandra.service.consensus.TransactionalMode;
+import 
org.apache.cassandra.service.consensus.UnsupportedTransactionConsistencyLevel;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.SplitConsumer;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.SplitMutations;
 import org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter;
@@ -2243,7 +2244,7 @@ public class StorageProxy implements StorageProxyMBean
     public static IAccordResult<TxnResult> readWithAccord(ClusterMetadata cm, 
PartitionRangeReadCommand command, AbstractBounds<PartitionPosition> range, 
ConsistencyLevel consistencyLevel, Dispatcher.RequestTime requestTime)
     {
         if (consistencyLevel != null && 
!IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS.contains(consistencyLevel))
-            throw new InvalidRequestException(consistencyLevel + " is not 
supported by Accord");
+            throw 
UnsupportedTransactionConsistencyLevel.read(consistencyLevel);
 
         TableMetadata tableMetadata = getTableMetadata(cm, 
command.metadata().id);
         TableParams tableParams = tableMetadata.params;
@@ -2260,7 +2261,7 @@ public class StorageProxy implements StorageProxyMBean
     private static IAccordResult<TxnResult> 
readWithAccordAsync(ClusterMetadata cm, SinglePartitionReadCommand.Group group, 
ConsistencyLevel consistencyLevel, Dispatcher.RequestTime requestTime)
     {
         if (consistencyLevel != null && 
!IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS.contains(consistencyLevel))
-            throw new InvalidRequestException(consistencyLevel + " is not 
supported by Accord");
+            throw 
UnsupportedTransactionConsistencyLevel.read(consistencyLevel);
 
         // If the non-SERIAL write strategy is sending all writes through 
Accord there is no need to use the supplied consistency
         // level since Accord will manage reading safely
diff --git 
a/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java 
b/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java
index 2e5baf296b..db369440d5 100644
--- 
a/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java
+++ 
b/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java
@@ -71,7 +71,7 @@ public class AccordUpdateParameters
         return data;
     }
 
-    public UpdateParameters updateParameters(TableMetadata metadata, 
DecoratedKey dk, int rowIndex)
+    public UpdateParameters updateParameters(TableMetadata metadata, 
DecoratedKey dk, int rowIndex, long overrideTimestamp)
     {
         // This is currently only used by Guardrails, but this logically have 
issues with Accord as drifts in config
         // values could cause unexpected issues in Accord. (ex. some nodes 
reject writes while others accept)
@@ -82,7 +82,7 @@ public class AccordUpdateParameters
         return new RowUpdateParameters(metadata,
                                        disabledGuardrails,
                                        options,
-                                       timestamp,
+                                       overrideTimestamp == 
TxnWrite.NO_TIMESTAMP ? timestamp : overrideTimestamp,
                                        MICROSECONDS.toSeconds(timestamp),
                                        ttl,
                                        prefetchRow(dk, rowIndex));
diff --git 
a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java
index 7170d14503..8063e0eed3 100644
--- 
a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java
+++ 
b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java
@@ -19,12 +19,11 @@
 package org.apache.cassandra.service.accord.txn;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
-import com.google.common.base.Preconditions;
-
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.ParameterisedVersionedSerializer;
@@ -40,18 +39,17 @@ import static 
org.apache.cassandra.utils.CollectionSerializers.serializedListSiz
 
 public class TxnReferenceOperations
 {
-    private static final TxnReferenceOperations EMPTY = new 
TxnReferenceOperations(null, null, Collections.emptyList(), 
Collections.emptyList());
+    private static final TxnReferenceOperations EMPTY = new 
TxnReferenceOperations(null, Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList());
 
     private final TableMetadata metadata;
-    final Clustering<?> clustering;
+    final List<Clustering<?>> clusterings;
     final List<TxnReferenceOperation> regulars;
     final List<TxnReferenceOperation> statics;
 
-    public TxnReferenceOperations(TableMetadata metadata, Clustering<?> 
clustering, List<TxnReferenceOperation> regulars, List<TxnReferenceOperation> 
statics)
+    public TxnReferenceOperations(TableMetadata metadata, List<Clustering<?>> 
clusterings, List<TxnReferenceOperation> regulars, List<TxnReferenceOperation> 
statics)
     {
         this.metadata = metadata;
-        Preconditions.checkArgument(clustering != null || regulars.isEmpty());
-        this.clustering = clustering;
+        this.clusterings = clusterings;
         this.regulars = regulars;
         this.statics = statics;
     }
@@ -62,19 +60,19 @@ public class TxnReferenceOperations
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         TxnReferenceOperations that = (TxnReferenceOperations) o;
-        return metadata.equals(that.metadata) && Objects.equals(clustering, 
that.clustering) && regulars.equals(that.regulars) && 
statics.equals(that.statics);
+        return metadata.equals(that.metadata) && 
clusterings.equals(that.clusterings) && regulars.equals(that.regulars) && 
statics.equals(that.statics);
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hash(metadata, clustering, regulars, statics);
+        return Objects.hash(metadata, clusterings, regulars, statics);
     }
 
     @Override
     public String toString()
     {
-        return "TxnReferenceOperations{metadata=" + metadata + ", clustering=" 
+ clustering + ", regulars=" + regulars + ", statics=" + statics + '}';
+        return "TxnReferenceOperations{metadata=" + metadata + ", 
clusterings=" + clusterings + ", regulars=" + regulars + ", statics=" + statics 
+ '}';
     }
 
     public static TxnReferenceOperations empty()
@@ -97,9 +95,9 @@ public class TxnReferenceOperations
                 return;
 
             tables.serialize(operations.metadata, out);
-            out.writeBoolean(operations.clustering != null);
-            if (operations.clustering != null)
-                Clustering.serializer.serialize(operations.clustering, out, 
version.messageVersion(), operations.metadata.comparator.subtypes());
+            out.writeVInt32(operations.clusterings.size());
+            for (Clustering<?> clustering : operations.clusterings)
+                Clustering.serializer.serialize(clustering, out, 
version.messageVersion(), operations.metadata.comparator.subtypes());
             serializeList(operations.regulars, tables, out, 
TxnReferenceOperation.serializer);
             serializeList(operations.statics, tables, out, 
TxnReferenceOperation.serializer);
         }
@@ -111,8 +109,11 @@ public class TxnReferenceOperations
                 return TxnReferenceOperations.empty();
 
             TableMetadata metadata = tables.deserialize(in);
-            Clustering<?> clustering = in.readBoolean() ? 
Clustering.serializer.deserialize(in, version.messageVersion(), 
metadata.comparator.subtypes()) : null;
-            return new TxnReferenceOperations(metadata, clustering, 
deserializeList(tables, in, TxnReferenceOperation.serializer),
+            int clusteringCount = in.readVInt32();
+            List<Clustering<?>> clusterings = new ArrayList<>(clusteringCount);
+            for (int i = 0; i < clusteringCount; i++)
+                clusterings.add(Clustering.serializer.deserialize(in, 
version.messageVersion(), metadata.comparator.subtypes()));
+            return new TxnReferenceOperations(metadata, clusterings, 
deserializeList(tables, in, TxnReferenceOperation.serializer),
                                               deserializeList(tables, in, 
TxnReferenceOperation.serializer));
         }
 
@@ -123,9 +124,9 @@ public class TxnReferenceOperations
             if (operations.isEmpty())
                 return size;
             size += tables.serializedSize(operations.metadata);
-            size += TypeSizes.BOOL_SIZE;
-            if (operations.clustering != null)
-                size += 
Clustering.serializer.serializedSize(operations.clustering, 
version.messageVersion(), operations.metadata.comparator.subtypes());
+            size += TypeSizes.sizeofVInt(operations.clusterings.size());
+            for (Clustering<?> clustering : operations.clusterings)
+                size += Clustering.serializer.serializedSize(clustering, 
version.messageVersion(), operations.metadata.comparator.subtypes());
             size += serializedListSize(operations.regulars, tables, 
TxnReferenceOperation.serializer);
             size +=  serializedListSize(operations.statics, tables, 
TxnReferenceOperation.serializer);
             return size;
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
index 31efe97026..c5890b1cd0 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
@@ -29,7 +29,6 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Executor;
 
-import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,6 +80,8 @@ import static 
org.apache.cassandra.utils.ArraySerializers.skipArray;
 
 public class TxnWrite extends AbstractKeySorted<TxnWrite.Update> implements 
Write
 {
+    public static final long NO_TIMESTAMP = 0;
+
     @SuppressWarnings("unused")
     private static final Logger logger = 
LoggerFactory.getLogger(TxnWrite.class);
 
@@ -245,13 +246,15 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
         public final int index;
         public final PartitionUpdate baseUpdate;
         public final TxnReferenceOperations referenceOps;
+        public final long timestamp;
 
-        public Fragment(PartitionKey key, int index, PartitionUpdate 
baseUpdate, TxnReferenceOperations referenceOps)
+        public Fragment(PartitionKey key, int index, PartitionUpdate 
baseUpdate, TxnReferenceOperations referenceOps, long timestamp)
         {
             this.key = key;
             this.index = index;
             this.baseUpdate = baseUpdate;
             this.referenceOps = referenceOps;
+            this.timestamp = timestamp;
         }
 
         public static int compareKeys(Fragment left, Fragment right)
@@ -302,17 +305,20 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
                                                                                
 baseUpdate.rowCount(),
                                                                                
 baseUpdate.canHaveShadowedData());
 
-            UpdateParameters up = 
parameters.updateParameters(baseUpdate.metadata(), key, index);
+            UpdateParameters up = 
parameters.updateParameters(baseUpdate.metadata(), key, index, timestamp);
             TxnData data = parameters.getData();
             Row staticRow = applyUpdates(baseUpdate.staticRow(), 
referenceOps.statics, key, Clustering.STATIC_CLUSTERING, up, data);
 
             if (!staticRow.isEmpty())
                 updateBuilder.add(staticRow);
 
-            Row existing = baseUpdate.hasRows() ? 
Iterables.getOnlyElement(baseUpdate) : null;
-            Row row = applyUpdates(existing, referenceOps.regulars, key, 
referenceOps.clustering, up, data);
-            if (row != null)
-                updateBuilder.add(row);
+            for (Clustering<?> clustering : referenceOps.clusterings)
+            {
+                Row existing = baseUpdate.hasRows() ? 
baseUpdate.getRow(clustering) : null;
+                Row row = applyUpdates(existing, referenceOps.regulars, key, 
clustering, up, data);
+                if (row != null)
+                    updateBuilder.add(row);
+            }
 
             return new Update(this.key, index, updateBuilder.build(), tables);
         }
@@ -371,6 +377,7 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
                 out.writeUnsignedVInt32(fragment.index);
                 
PartitionUpdate.serializer.serializeWithoutKey(fragment.baseUpdate, tables, 
out, version.messageVersion());
                 
TxnReferenceOperations.serializer.serialize(fragment.referenceOps, tables, out, 
version);
+                out.writeUnsignedVInt(fragment.timestamp);
             }
 
             public Fragment deserialize(PartitionKey key, TableMetadatas 
tables, DataInputPlus in, Version version) throws IOException
@@ -379,7 +386,8 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
                 // TODO (required): why FROM_REMOTE?
                 PartitionUpdate baseUpdate = 
PartitionUpdate.serializer.deserialize(key, tables, in, 
version.messageVersion(), FROM_REMOTE);
                 TxnReferenceOperations referenceOps = 
TxnReferenceOperations.serializer.deserialize(tables, in, version);
-                return new Fragment(key, idx, baseUpdate, referenceOps);
+                long timestamp = in.readUnsignedVInt();
+                return new Fragment(key, idx, baseUpdate, referenceOps, 
timestamp);
             }
 
             public long serializedSize(Fragment fragment, TableMetadatas 
tables, Version version)
@@ -388,6 +396,7 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
                 size += TypeSizes.sizeofUnsignedVInt(fragment.index);
                 size += 
PartitionUpdate.serializer.serializedSizeWithoutKey(fragment.baseUpdate, 
tables, version.messageVersion());
                 size += 
TxnReferenceOperations.serializer.serializedSize(fragment.referenceOps, tables, 
version);
+                size += TypeSizes.sizeofUnsignedVInt(fragment.timestamp);
                 return size;
             }
         }
diff --git 
a/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java 
b/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java
index ceac2d89dc..5355d33d10 100644
--- a/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java
+++ b/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java
@@ -142,7 +142,7 @@ public enum TransactionalMode
     public ConsistencyLevel commitCLForMode(TransactionalMigrationFromMode 
fromMode, ConsistencyLevel consistencyLevel, ClusterMetadata cm, TableId 
tableId, Token token)
     {
         if 
(!IAccordService.SUPPORTED_COMMIT_CONSISTENCY_LEVELS.contains(consistencyLevel))
-            throw new UnsupportedOperationException("Consistency level " + 
consistencyLevel + " is unsupported with Accord for write/commit, supported are 
ANY, ONE, QUORUM, and ALL");
+            throw 
UnsupportedTransactionConsistencyLevel.commit(consistencyLevel);
 
         if (ignoresSuppliedCommitCL())
         {
@@ -172,7 +172,7 @@ public enum TransactionalMode
     public ConsistencyLevel readCLForMode(TransactionalMigrationFromMode 
fromMode, ConsistencyLevel consistencyLevel, ClusterMetadata cm, TableId 
tableId, Token token)
     {
         if 
(!IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS.contains(consistencyLevel))
-            throw new UnsupportedOperationException("Consistency level " + 
consistencyLevel + " is unsupported with Accord for read, supported are ONE, 
QUORUM, and SERIAL");
+            throw 
UnsupportedTransactionConsistencyLevel.read(consistencyLevel);
 
         if (ignoresSuppliedReadCL())
         {
@@ -215,7 +215,7 @@ public enum TransactionalMode
     public ConsistencyLevel readCLForMode(TransactionalMigrationFromMode 
fromMode, ConsistencyLevel consistencyLevel, ClusterMetadata cm, TableId 
tableId, AbstractBounds<PartitionPosition> range)
     {
         if 
(!IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS.contains(consistencyLevel))
-            throw new UnsupportedOperationException("Consistency level " + 
consistencyLevel + " is unsupported with Accord for read, supported are ONE, 
QUORUM, and SERIAL");
+            throw 
UnsupportedTransactionConsistencyLevel.read(consistencyLevel);
 
         checkState(range.unwrap().size() == 1);
         if (ignoresSuppliedReadCL())
diff --git 
a/src/java/org/apache/cassandra/service/consensus/UnsupportedTransactionConsistencyLevel.java
 
b/src/java/org/apache/cassandra/service/consensus/UnsupportedTransactionConsistencyLevel.java
new file mode 100644
index 0000000000..5b7ecf7b3a
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/service/consensus/UnsupportedTransactionConsistencyLevel.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.consensus;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.accord.IAccordService;
+
+public class UnsupportedTransactionConsistencyLevel extends 
InvalidRequestException
+{
+    public enum Kind { Read, Commit }
+
+    private UnsupportedTransactionConsistencyLevel(Kind kind, ConsistencyLevel 
consistencyLevel)
+    {
+        super("ConsistencyLevel " + consistencyLevel + " is unsupported with 
Accord for " + (kind == Kind.Commit ? "write/commit" : "read") + ", supported 
are " + (kind == Kind.Commit ? 
IAccordService.SUPPORTED_COMMIT_CONSISTENCY_LEVELS : 
IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS));
+    }
+
+    public static UnsupportedTransactionConsistencyLevel 
commit(ConsistencyLevel consistencyLevel)
+    {
+        return new UnsupportedTransactionConsistencyLevel(Kind.Commit, 
consistencyLevel);
+    }
+
+    public static UnsupportedTransactionConsistencyLevel read(ConsistencyLevel 
consistencyLevel)
+    {
+        return new UnsupportedTransactionConsistencyLevel(Kind.Read, 
consistencyLevel);
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
index c319daee9b..d8cc96a3a1 100644
--- 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
+++ 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
@@ -41,7 +41,6 @@ import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RetryOnDifferentSystemException;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
@@ -63,6 +62,7 @@ import org.apache.cassandra.service.accord.txn.TxnResult;
 import org.apache.cassandra.service.accord.txn.TxnUpdate;
 import org.apache.cassandra.service.accord.txn.TxnWrite;
 import org.apache.cassandra.service.consensus.TransactionalMode;
+import 
org.apache.cassandra.service.consensus.UnsupportedTransactionConsistencyLevel;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tracing.Tracing;
@@ -247,7 +247,7 @@ public class ConsensusMigrationMutationHelper
                                                                  
PreserveTimestamp preserveTimestamps)
     {
         if (consistencyLevel != null && 
!IAccordService.SUPPORTED_COMMIT_CONSISTENCY_LEVELS.contains(consistencyLevel))
-            throw new InvalidRequestException(consistencyLevel + " is not 
supported by Accord");
+            throw 
UnsupportedTransactionConsistencyLevel.commit(consistencyLevel);
 
         TableMetadatas tables;
         {
@@ -271,7 +271,7 @@ public class ConsensusMigrationMutationHelper
             {
                 PartitionKey pk = keyCollector.collect(update.metadata(), 
update.partitionKey());
                 minEpoch = Math.max(minEpoch, 
update.metadata().epoch.getEpoch());
-                fragments.add(new TxnWrite.Fragment(pk, fragmentIndex++, 
update, TxnReferenceOperations.empty()));
+                fragments.add(new TxnWrite.Fragment(pk, fragmentIndex++, 
update, TxnReferenceOperations.empty(), TxnWrite.NO_TIMESTAMP));
             }
         }
         // Potentially ignore commit consistency level if the 
TransactionalMode specifies full
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java 
b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 57d824885a..9e61fd2f26 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -570,6 +570,7 @@ public class ByteBufferUtil
 
     public static ByteBuffer objectToBytes(Object obj)
     {
+        if (obj == null) return null;
         if (obj instanceof Integer)
             return ByteBufferUtil.bytes((int) obj);
         else if (obj instanceof Byte)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
index 27a07343f3..f89fb1a986 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
@@ -50,6 +50,13 @@ import accord.topology.Topologies;
 import org.apache.cassandra.config.Config.PaxosVariant;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.ast.Symbol;
+import org.apache.cassandra.cql3.ast.AssignmentOperator;
+import org.apache.cassandra.cql3.ast.Literal;
+import org.apache.cassandra.cql3.ast.Mutation;
+import org.apache.cassandra.cql3.ast.Reference;
+import org.apache.cassandra.cql3.ast.Select;
+import org.apache.cassandra.cql3.ast.Statement;
+import org.apache.cassandra.cql3.ast.Txn;
 import org.apache.cassandra.cql3.functions.types.utils.Bytes;
 import org.apache.cassandra.cql3.statements.TransactionStatement;
 import org.apache.cassandra.distributed.util.QueryResultUtil;
@@ -85,6 +92,7 @@ import static 
org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
 import static org.apache.cassandra.distributed.util.QueryResultUtil.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -3239,19 +3247,16 @@ public abstract class AccordCQLTestBase extends 
AccordTestBase
                  ICoordinator coordinator = cluster.coordinator(1);
                  coordinator.execute("INSERT INTO " + qualifiedAccordTableName 
+ "(k, c, v0, v1) VALUES (0, 0, {0}, 1)", QUORUM);
 
-                 String cql = "BEGIN TRANSACTION\n" +
-                              "  LET row = (SELECT *\n" +
-                              "             FROM " + qualifiedAccordTableName 
+ '\n' +
-                              "             WHERE k = ? AND c = ?);\n" +
-                              "  UPDATE " + qualifiedAccordTableName + '\n' +
-                              "  SET\n" +
-                              "    v0={1},\n" +
-                              "    v1 += row.v1\n" +
-                              "  WHERE \n" +
-                              "    k = ? AND \n" +
-                              "    c = ?;\n" +
-                              "COMMIT TRANSACTION";
-                 coordinator.execute(cql, QUORUM, 0, 0, 0, 0);
+                 Statement stmt = Txn.builder()
+                                     .addLet("row", 
Select.builder().table(KEYSPACE, accordTableName).value("k", 0).value("c", 
0).build())
+                                     .addUpdate(Mutation.update(KEYSPACE, 
accordTableName)
+                                                        .set(v0, new 
Literal(Collections.singleton(1), v0.type()))
+                                                        .set(v1, new 
AssignmentOperator(AssignmentOperator.Kind.ADD, Reference.of(row, v1)))
+                                                        .value("k", 0)
+                                                        .value("c", 0)
+                                                        .build())
+                                     .build();
+                 coordinator.execute(stmt.toCQL(), QUORUM, 
stmt.bindsEncoded());
 
                  // is the data correct?
                  var result = coordinator.executeWithResult("SELECT * FROM " + 
qualifiedAccordTableName, QUORUM);
@@ -3260,4 +3265,88 @@ public abstract class AccordCQLTestBase extends 
AccordTestBase
                          .build());
              });
     }
+
+    @Test
+    public void updateMultipleRows()
+    {
+        SHARED_CLUSTER.schemaChange("CREATE TABLE " + qualifiedAccordTableName 
+ " (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH " + 
transactionalMode.asCqlParam());
+
+        ICoordinator node = SHARED_CLUSTER.coordinator(1);
+        node.execute("INSERT INTO " + qualifiedAccordTableName + " (pk, ck, v) 
VALUES (1, 10, 100)", ConsistencyLevel.QUORUM);
+        node.execute("INSERT INTO " + qualifiedAccordTableName + " (pk, ck, v) 
VALUES (1, 20, 200)", ConsistencyLevel.QUORUM);
+        node.execute("INSERT INTO " + qualifiedAccordTableName + " (pk, ck, v) 
VALUES (1, 30, 300)", ConsistencyLevel.QUORUM);
+
+        node.execute("BEGIN TRANSACTION\n" +
+                            "  UPDATE " + qualifiedAccordTableName + '\n' +
+                            "  SET v = 999\n" +
+                            "  WHERE pk = 1\n" +
+                            "        AND ck IN (10, 20);\n" +
+                            "COMMIT TRANSACTION", ConsistencyLevel.QUORUM);
+
+        assertRows(node.execute("SELECT * FROM " + qualifiedAccordTableName + 
" WHERE pk = 1", ConsistencyLevel.QUORUM),
+                   AssertUtils.row(1, 10, 999),
+                   AssertUtils.row(1, 20, 999),
+                   AssertUtils.row(1, 30, 300));
+    }
+
+    @Test
+    public void deleteMultipleRows()
+    {
+        SHARED_CLUSTER.schemaChange("CREATE TABLE " + qualifiedAccordTableName 
+ " (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH " + 
transactionalMode.asCqlParam());
+
+        ICoordinator node = SHARED_CLUSTER.coordinator(1);
+        node.execute("INSERT INTO " + qualifiedAccordTableName + " (pk, ck, v) 
VALUES (1, 10, 100)", ConsistencyLevel.QUORUM);
+        node.execute("INSERT INTO " + qualifiedAccordTableName + " (pk, ck, v) 
VALUES (1, 20, 200)", ConsistencyLevel.QUORUM);
+        node.execute("INSERT INTO " + qualifiedAccordTableName + " (pk, ck, v) 
VALUES (1, 30, 300)", ConsistencyLevel.QUORUM);
+
+        node.execute("BEGIN TRANSACTION\n" +
+                     "  DELETE FROM " + qualifiedAccordTableName + '\n' +
+                     "  WHERE pk = 1\n" +
+                     "        AND ck IN (10, 20);\n" +
+                     "COMMIT TRANSACTION", ConsistencyLevel.QUORUM);
+
+        // Verify the deletes
+        assertRows(node.execute("SELECT * FROM " + qualifiedAccordTableName + 
" WHERE pk = 1", ConsistencyLevel.QUORUM),
+                   AssertUtils.row(1, 30, 300));
+    }
+
+    @Test
+    public void usingTimestamp() throws Exception
+    {
+        // This test was discovered by 
org.apache.cassandra.distributed.test.cql3.MixedReadsAccordInteropMultiNodeTableWalkTest
+        // It could be simplified but was boiled down to the minimum steps to 
reproduce the timestamp issue: USING TIMESTAMP
+        // was not respected operations that happen post read
+        test("CREATE TABLE "+qualifiedAccordTableName+" (\n" +
+             "\t\t    pk0 float,\n" +
+             "\t\t    pk1 varint,\n" +
+             "\t\t    ck0 smallint,\n" +
+             "\t\t    ck1 tinyint,\n" +
+             "\t\t    v3 uuid,\n" +
+             "\t\t    v1 list<uuid>,\n" +
+             "\t\t    v4 map<vector<uuid, 1>, frozen<map<smallint, text>>>,\n" 
+
+             "\t\t    PRIMARY KEY ((pk0, pk1), ck0, ck1)\n" +
+             "\t\t) WITH CLUSTERING ORDER BY (ck0 DESC, ck1 ASC)\n" +
+             "\t\t    AND " + transactionalMode.asCqlParam(), cluster -> {
+            ICoordinator node = cluster.coordinator(2);
+            node.execute("UPDATE " + qualifiedAccordTableName + " USING 
TIMESTAMP 6 " +
+                          "SET " +
+                          "    v1=[00000000-0000-4e00-8c00-000000000000], " +
+                          "    v4={[00000000-0000-4800-b300-000000000000]: 
{-30955: 'ϵ', -10479: '虑퐕㐧', 7904: '䁋'}}, " +
+                          "    v3=00000000-0000-4600-b300-000000000000 " +
+                          "WHERE  pk0 = -1.1763917E35 AND  pk1 = -466454 " +
+                          "       AND  ck0 IN (-26786, 10038, 4991) AND  ck1 
IN (124 - 36, -100)", ConsistencyLevel.QUORUM);
+            AssertUtils.assertRows(node.executeWithResult("SELECT 
writetime(v1), writetime(v3), writetime(v4) FROM " + qualifiedAccordTableName + 
" WHERE pk0 = -1.1763917E35 AND  pk1 = -466454", ConsistencyLevel.QUORUM),
+                                   QueryResults.builder()
+                                               .row(List.of(6L), 6L, 
List.of(6L))
+                                               .row(List.of(6L), 6L, 
List.of(6L))
+                                               .row(List.of(6L), 6L, 
List.of(6L))
+                                               .row(List.of(6L), 6L, 
List.of(6L))
+                                               .row(List.of(6L), 6L, 
List.of(6L))
+                                               .row(List.of(6L), 6L, 
List.of(6L))
+                                               .build());
+            node.execute("DELETE FROM " + qualifiedAccordTableName + " USING 
TIMESTAMP 8 WHERE  pk0 = -1.1763917E35 AND  pk1 = -466454", 
ConsistencyLevel.QUORUM);
+            QueryResultUtil.assertThat(node.executeWithResult("SELECT * FROM " 
+ qualifiedAccordTableName + " WHERE pk0 = -1.1763917E35 AND  pk1 = -466454", 
ConsistencyLevel.QUORUM))
+                           .isEmpty();
+        });
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordInteroperabilityTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordInteroperabilityTest.java
index 8f072a9782..d38f07ef9b 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordInteroperabilityTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordInteroperabilityTest.java
@@ -32,6 +32,8 @@ import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.shared.AssertUtils;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.service.accord.IAccordService;
+import org.apache.cassandra.utils.AssertionUtils;
+import org.assertj.core.api.Assertions;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -108,8 +110,8 @@ public class AccordInteroperabilityTest extends 
AccordTestBase
                      }
                      catch (Throwable t)
                      {
-                         assertEquals(InvalidRequestException.class.getName(), 
t.getClass().getName());
-                         assertEquals(cl + " is not supported by Accord", 
t.getMessage());
+                         
Assertions.assertThat(t).is(AssertionUtils.isInstanceof(InvalidRequestException.class));
+                         assertEquals("ConsistencyLevel " + cl + " is 
unsupported with Accord for read, supported are [ONE, QUORUM, ALL, SERIAL]", 
t.getMessage());
                      }
                  }
              });
@@ -130,11 +132,11 @@ public class AccordInteroperabilityTest extends 
AccordTestBase
                      }
                      catch (Throwable t)
                      {
-                         assertEquals(InvalidRequestException.class.getName(), 
t.getClass().getName());
+                         
Assertions.assertThat(t).is(AssertionUtils.isInstanceof(InvalidRequestException.class));
                          if (cl == ConsistencyLevel.SERIAL || cl == 
ConsistencyLevel.LOCAL_SERIAL)
                              assertEquals("You must use conditional updates 
for serializable writes", t.getMessage());
                          else
-                            assertEquals(cl + " is not supported by Accord", 
t.getMessage());
+                             assertEquals("ConsistencyLevel " + cl + " is 
unsupported with Accord for write/commit, supported are [ANY, ONE, QUORUM, ALL, 
SERIAL]", t.getMessage());
                      }
                  }
              });
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
index 6334ff6957..b4c53b6159 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
@@ -221,6 +221,7 @@ public abstract class AccordTestBase extends TestBaseImpl
     {
         for (String ddl : ddls)
             SHARED_CLUSTER.schemaChange(ddl);
+        ClusterUtils.waitForCMSToQuiesce(SHARED_CLUSTER, 
ClusterUtils.maxEpoch(SHARED_CLUSTER));
 
         // Evict commands from the cache immediately to expose problems 
loading from disk.
 //        SHARED_CLUSTER.forEach(node -> node.runOnInstance(() -> 
AccordService.instance().setCacheSize(0)));
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java
index 31d1aab31f..9a466b9493 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java
@@ -79,7 +79,7 @@ public abstract class CasMultiNodeTableWalkBase extends 
MultiNodeTableWalkBase
         protected Gen<Mutation> toMutationGen(ASTGenerators.MutationGenBuilder 
mutationGenBuilder)
         {
             mutationGenBuilder.withCasGen(i -> true)
-                              .withAllowUpdateMultipleClusteringKeys(false); 
// paxos supports but the model doesn't yet
+                              .disallowUpdateMultipleRows(); // paxos supports 
but the model doesn't yet
             // generator might not always generate a cas statement... should 
fix generator!
             Gen<Mutation> gen = 
toGen(mutationGenBuilder.build()).filter(Mutation::isCas);
             if (metadata.regularAndStaticColumns().stream().anyMatch(c -> 
c.type.isUDT())
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTokenConflictTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTokenConflictTest.java
index 216b1e79c6..703c4e3c93 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTokenConflictTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTokenConflictTest.java
@@ -384,9 +384,7 @@ public class SingleNodeTokenConflictTest extends 
StatefulASTBase
 
 
             this.mutationGen = toGen(new 
ASTGenerators.MutationGenBuilder(metadata)
-                                     .withoutTransaction()
-                                     .withoutTtl()
-                                     .withoutTimestamp()
+                                     .withTxnSafe()
                                      
.withPartitions(SourceDSL.arbitrary().pick(uniquePartitions))
                                      .withIgnoreIssues(IGNORED_ISSUES)
                                      .build());
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
 
b/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
index 8d6bc02773..8895a7271b 100644
--- 
a/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
+++ 
b/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
@@ -148,7 +148,7 @@ public class AccordTopologyMixupTest extends 
TopologyMixupTestBase<AccordTopolog
     private static CommandGen<Spec> cqlOperations(Spec spec)
     {
         Gen<Statement> select = (Gen<Statement>) (Gen<?>) fromQT(new 
ASTGenerators.SelectGenBuilder(spec.metadata).withLimit1().build());
-        Gen<Statement> mutation = (Gen<Statement>) (Gen<?>) fromQT(new 
ASTGenerators.MutationGenBuilder(spec.metadata).withoutTimestamp().withoutTtl().withAllowUpdateMultipleClusteringKeys(false).build());
+        Gen<Statement> mutation = (Gen<Statement>) (Gen<?>) fromQT(new 
ASTGenerators.MutationGenBuilder(spec.metadata).withTxnSafe().disallowUpdateMultiplePartitionKeys().build());
         Gen<Statement> txn = (Gen<Statement>) (Gen<?>) fromQT(new 
ASTGenerators.TxnGenBuilder(spec.metadata).build());
         Map<Gen<Statement>, Integer> operations = new LinkedHashMap<>();
         operations.put(select, 1);
diff --git 
a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java 
b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
index 3d2bf9a74d..c914350328 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
@@ -857,7 +857,10 @@ public class ASTSingleTableModel
                 SelectResult result = (SelectResult) o;
                 if (result.rows.length == 0)
                     return null;
-                return result.rows[0][result.columns.indexOf(symbol)];
+                ByteBuffer bb = result.rows[0][result.columns.indexOf(symbol)];
+                if (bb != null && symbol.type().isNull(bb))
+                    bb = null;
+                return bb;
             }
             else
             {
@@ -1194,7 +1197,9 @@ public class ASTSingleTableModel
                 sb.append("\n\tDiff (expected over actual):\n");
                 Row eSmall = e.select(smallestDiff);
                 Row aSmall = smallest.select(smallestDiff);
-                sb.append(table(eSmall.columns, Arrays.asList(eSmall, 
aSmall)));
+                // Symbol.toString is just the column name, it is easier to 
understand what is going on if the type is included,
+                // which is done in Symbol.detailedName: name type (reversed)?
+                
sb.append(table(eSmall.columns.stream().map(Symbol::detailedName).collect(Collectors.toList()),
 Arrays.asList(eSmall, aSmall)));
             }
         }
         else
@@ -1272,7 +1277,12 @@ public class ASTSingleTableModel
 
     private static String table(ImmutableUniqueList<Symbol> columns, 
Collection<Row> rows)
     {
-        return 
TableBuilder.toStringPiped(columns.stream().map(Symbol::toCQL).collect(Collectors.toList()),
+        return 
table(columns.stream().map(Symbol::toCQL).collect(Collectors.toList()), rows);
+    }
+
+    private static String table(List<String> columns, Collection<Row> rows)
+    {
+        return TableBuilder.toStringPiped(columns,
                                           // intellij or junit can be tripped 
up by utf control or invisible chars, so this logic tries to normalize to make 
things more safe
                                           () -> rows.stream()
                                                     .map(r -> 
r.asCQL().stream().map(StringUtils::escapeControlChars).collect(Collectors.toList()))
diff --git 
a/test/simulator/test/org/apache/cassandra/simulator/test/SingleNodeSingleTableASTTest.java
 
b/test/simulator/test/org/apache/cassandra/simulator/test/SingleNodeSingleTableASTTest.java
index 8f9635641e..7924343dfd 100644
--- 
a/test/simulator/test/org/apache/cassandra/simulator/test/SingleNodeSingleTableASTTest.java
+++ 
b/test/simulator/test/org/apache/cassandra/simulator/test/SingleNodeSingleTableASTTest.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import org.junit.Ignore;
 import org.junit.Test;
 
 import accord.utils.DefaultRandom;
@@ -122,7 +121,6 @@ public class SingleNodeSingleTableASTTest extends 
SimulationTestBase
     }
 
     @Test
-    @Ignore
     public void normal() throws IOException
     {
 //        testOne(SimulationRunner.parseHex("0x2fd91c2a2be59d7d"), 
SingleTableASTSimulation::new);
@@ -137,7 +135,6 @@ public class SingleNodeSingleTableASTTest extends 
SimulationTestBase
     }
 
     @Test
-    @Ignore
     public void accordMixedReads() throws IOException
     {
 //        testOne(SimulationRunner.parseHex("0x2fd91c2a2be59d7d"), 
SingleTableASTSimulation.MixedReadsAccordSingleTableASTSimulation::new);
diff --git 
a/test/simulator/test/org/apache/cassandra/simulator/test/SingleTableASTSimulation.java
 
b/test/simulator/test/org/apache/cassandra/simulator/test/SingleTableASTSimulation.java
index bae0740295..ef1f60b470 100644
--- 
a/test/simulator/test/org/apache/cassandra/simulator/test/SingleTableASTSimulation.java
+++ 
b/test/simulator/test/org/apache/cassandra/simulator/test/SingleTableASTSimulation.java
@@ -70,6 +70,7 @@ import 
org.apache.cassandra.simulator.systems.SimulatedActionCallable;
 import org.apache.cassandra.simulator.systems.SimulatedSystems;
 import org.apache.cassandra.utils.ASTGenerators;
 import org.apache.cassandra.utils.AbstractTypeGenerators;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CassandraGenerators;
 import org.apache.cassandra.utils.FastByteOperations;
 import org.apache.cassandra.utils.Generators;
@@ -241,7 +242,7 @@ public class SingleTableASTSimulation extends 
SimulationTestBase.SimpleSimulatio
                                                                        
.uniqueBestEffort()
                                                                        
.ofSize(rs.nextInt(1, 20))
                                                                        
.next(rs);
-            Gen<Action> mutationGen = toGen(ASTGenerators.mutationBuilder(rs, 
model, uniquePartitions, i -> null).disallowEmpty().build())
+            Gen<Action> mutationGen = toGen(ASTGenerators.mutationBuilder(rs, 
model, uniquePartitions, i -> null).build())
                                       .map(mutation -> query(mutation));
 
             Gen<Action> selectPartitionGen = Gens.pick(uniquePartitions)
@@ -393,12 +394,12 @@ public class SingleTableASTSimulation extends 
SimulationTestBase.SimpleSimulatio
 
         private IIsolatedExecutor.SerializableCallable<Object[][]> 
query(Statement statement, ConsistencyLevel cl)
         {
-            // Simulator acts differently than jvm-dtest, so ByteBuffer isn't 
safe!
-            // java.lang.RuntimeException: java.io.NotSerializableException: 
java.nio.HeapByteBuffer
-            // So switch to literals for now
-            statement = statement.visit(StandardVisitors.BIND_TO_LITERAL);
             String cql = statement.toCQL();
-            Object[] binds = statement.binds();
+            ByteBuffer[] encoded = statement.bindsEncoded();
+            // Simulator doesn't support ByteBuffer (fails to transfer), so 
need to convert to byte[], which will get converted to ByteBuffer within 
Cassandra.
+            Object[] binds = new Object[encoded.length];
+            for (int i = 0; i < encoded.length; i++)
+                binds[i] = encoded[i] == null ? null : 
ByteBufferUtil.getArray(encoded[i]);
             return () -> {
                 Query q = new Query(cql, Long.MIN_VALUE, false, cl, null, 
binds);
                 return q.call().toObjectArrays();
diff --git 
a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java 
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index b16cdc9734..ffd0522ee8 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -115,6 +115,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.config.Config$CorruptedTombstoneStrategy",
     "org.apache.cassandra.config.Config$BatchlogEndpointStrategy",
     "org.apache.cassandra.config.Config$TombstonesMetricGranularity",
+    
"org.apache.cassandra.service.consensus.UnsupportedTransactionConsistencyLevel",
     "org.apache.cassandra.repair.autorepair.AutoRepairConfig",
     "org.apache.cassandra.repair.autorepair.AutoRepairConfig$Options",
     "org.apache.cassandra.repair.autorepair.AutoRepairConfig$RepairType",
diff --git a/test/unit/org/apache/cassandra/cql3/ast/ExpressionTest.java 
b/test/unit/org/apache/cassandra/cql3/ast/ExpressionTest.java
index a2805e8569..12174e0a26 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/ExpressionTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/ExpressionTest.java
@@ -18,6 +18,10 @@
 
 package org.apache.cassandra.cql3.ast;
 
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+
 import org.junit.Test;
 
 import accord.utils.Gen;
@@ -55,8 +59,18 @@ public class ExpressionTest
         Txn txn = Txn.builder()
                      .addLet("a", selectWithBind())
                      .addReturn(selectWithBind())
+                     .addIf(new Conditional.Is(Symbol.unknownType("a"), 
Conditional.Is.Kind.NotNull),
+                            new Mutation.Insert(new 
TableReference(Optional.empty(), "tbl"), new 
LinkedHashMap<>(Map.of(Symbol.unknownType("a"), Bind.of(0))), false, 
Optional.empty()))
                      .build();
         assertNoBind(txn.visit(StandardVisitors.BIND_TO_LITERAL));
+
+        txn = Txn.builder()
+                 .addLet("a", selectWithBind())
+                 .addReturn(selectWithBind())
+                     .addIf(Where.create(Where.Inequality.EQUAL, Bind.of(0), 
Bind.of(42)),
+                        new Mutation.Insert(new 
TableReference(Optional.empty(), "tbl"), new 
LinkedHashMap<>(Map.of(Symbol.unknownType("a"), Bind.of(0))), false, 
Optional.empty()))
+                 .build();
+        assertNoBind(txn.visit(StandardVisitors.BIND_TO_LITERAL));
     }
 
     private static Select selectWithBind()
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Literal.java 
b/test/unit/org/apache/cassandra/cql3/ast/Literal.java
index eb6d83df41..09973bd0aa 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Literal.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Literal.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.StringType;
+import org.apache.cassandra.db.marshal.UTF8Type;
 
 public class Literal implements Value
 {
@@ -47,6 +48,11 @@ public class Literal implements Value
         return new Literal(value, LongType.instance);
     }
 
+    public static Literal of(String value)
+    {
+        return new Literal(value, UTF8Type.instance);
+    }
+
     @Override
     public AbstractType<?> type()
     {
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Symbol.java 
b/test/unit/org/apache/cassandra/cql3/ast/Symbol.java
index f905218c45..e492293691 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Symbol.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Symbol.java
@@ -121,7 +121,7 @@ public class Symbol implements ReferenceExpression, 
Comparable<Symbol>
 
     public String detailedName()
     {
-        return symbol + " " + type.asCQL3Type() + (reversed ? " (reversed)" : 
"");
+        return symbol + ' ' + type.asCQL3Type() + (reversed ? " (reversed)" : 
"");
     }
 
     @Override
@@ -172,11 +172,5 @@ public class Symbol implements ReferenceExpression, 
Comparable<Symbol>
             UnquotedSymbol symbol1 = (UnquotedSymbol) o;
             return Objects.equals(symbol, symbol1.symbol);
         }
-
-        @Override
-        public int hashCode()
-        {
-            return Objects.hash(symbol);
-        }
     }
 }
diff --git a/test/unit/org/apache/cassandra/utils/ASTGenerators.java 
b/test/unit/org/apache/cassandra/utils/ASTGenerators.java
index 4f502b19bb..860995b28f 100644
--- a/test/unit/org/apache/cassandra/utils/ASTGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/ASTGenerators.java
@@ -411,6 +411,7 @@ public class ASTGenerators
         private Map<Symbol, ExpressionBuilder> columnExpressions = new 
LinkedHashMap<>();
         private boolean allowPartitionOnlyUpdate = true;
         private boolean allowPartitionOnlyInsert = true;
+        private boolean allowUpdateMultiplePartitionKeys = true;
         private boolean allowUpdateMultipleClusteringKeys = true;
         private EnumSet<KnownIssue> ignoreIssues = IGNORED_ISSUES;
 
@@ -449,12 +450,46 @@ public class ASTGenerators
             return this;
         }
 
-        public MutationGenBuilder 
withAllowUpdateMultipleClusteringKeys(boolean allowUpdateMultipleClusteringKeys)
+        public MutationGenBuilder allowUpdateMultiplePartitionKeys()
+        {
+            return withAllowUpdateMultiplePartitionKeys(true);
+        }
+
+        public MutationGenBuilder disallowUpdateMultiplePartitionKeys()
+        {
+            return withAllowUpdateMultiplePartitionKeys(false);
+        }
+
+        private MutationGenBuilder 
withAllowUpdateMultiplePartitionKeys(boolean allowUpdateMultiplePartitionKeys)
+        {
+            this.allowUpdateMultiplePartitionKeys = 
allowUpdateMultiplePartitionKeys;
+            return this;
+        }
+
+        public MutationGenBuilder allowUpdateMultipleClusteringKeys()
+        {
+            return withAllowUpdateMultipleClusteringKeys(true);
+        }
+
+        public MutationGenBuilder disallowUpdateMultipleClusteringKeys()
+        {
+            return withAllowUpdateMultipleClusteringKeys(false);
+        }
+
+        private MutationGenBuilder 
withAllowUpdateMultipleClusteringKeys(boolean allowUpdateMultipleClusteringKeys)
         {
             this.allowUpdateMultipleClusteringKeys = 
allowUpdateMultipleClusteringKeys;
             return this;
         }
 
+        /**
+         * Tells the generator to avoid producing updates that do multiple 
partition and clustering keys.
+         */
+        public MutationGenBuilder disallowUpdateMultipleRows()
+        {
+            return 
disallowUpdateMultiplePartitionKeys().disallowUpdateMultipleClusteringKeys();
+        }
+
         public MutationGenBuilder 
withColumnExpressions(Consumer<ExpressionBuilder> fn)
         {
             for (Symbol symbol : allColumns)
@@ -719,7 +754,7 @@ public class ASTGenerators
                         var timestamp = timestampGen.generate(rnd);
                         if (timestamp.isPresent())
                             builder.timestamp(valueGen(timestamp.getAsLong(), 
LongType.instance).generate(rnd));
-                        if (allowUpdateMultipleClusteringKeys)
+                        if (allowUpdateMultiplePartitionKeys)
                             where(rnd, columnExpressions, builder, 
partitionColumns, partitionValueGen);
                         else
                             values(rnd, columnExpressions, builder, 
partitionColumns, partitionValueGen);
@@ -1056,7 +1091,7 @@ public class ASTGenerators
                     }
                     MutationGenBuilder mutationBuilder = new 
MutationGenBuilder(metadata)
                                                          .withTxnSafe()
-                                                         
.withAllowUpdateMultipleClusteringKeys(false)
+                                                         
.disallowUpdateMultiplePartitionKeys()
                                                          .withReferences(new 
ArrayList<>(builder.allowedReferences()));
                     if (!allowReferences)
                         
mutationBuilder.withReferences(Collections.emptyList());
@@ -1183,11 +1218,9 @@ public class ASTGenerators
 
         private Gen<Mutation> mutationGen(RandomSource rs, 
LinkedHashMap<Symbol, Object> pk)
         {
-            MutationGenBuilder builder = mutationBuilder(IGNORED_ISSUES, rs, 
model, List.of(pk), indexes);
-            builder.withTxnSafe()
-                   //TODO (now, coverage): remove this as we should support
-                   // working around the bug to make progress
-                   .withAllowUpdateMultipleClusteringKeys(false);
+            MutationGenBuilder builder = mutationBuilder(IGNORED_ISSUES, rs, 
model, List.of(pk), indexes)
+                                         .withTxnSafe()
+                                         
.disallowUpdateMultiplePartitionKeys();
             if (!allowEmpty)
                 builder.disallowEmpty();
             return builder.build();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to