This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8d641e06fc Accord: BEGIN TRANSACTIONs IF condition logic does not 
properly support meaningless emptiness and null values
8d641e06fc is described below

commit 8d641e06fcc15f93fdb74cfe12511345e9b9ee22
Author: David Capwell <[email protected]>
AuthorDate: Tue Jun 24 15:37:12 2025 -0700

    Accord: BEGIN TRANSACTIONs IF condition logic does not properly support 
meaningless emptiness and null values
    
    patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-20667
---
 CHANGES.txt                                        |   1 +
 .../cassandra/cql3/conditions/ColumnCondition.java |  27 +-
 .../cassandra/cql3/statements/CQL3CasRequest.java  |   4 +-
 .../cql3/statements/TransactionStatement.java      |   2 +-
 .../org/apache/cassandra/cql3/terms/Lists.java     |   6 +-
 .../cql3/transactions/ConditionStatement.java      |   7 +-
 .../cql3/transactions/ReferenceValue.java          |   2 +-
 .../cql3/transactions/RowDataReference.java        |   2 +-
 .../cassandra/service/accord/AccordResult.java     |   3 +
 ...AbstractParameterisedUnversionedSerialized.java |  42 ++
 ... AbstractParameterisedVersionedSerialized.java} |  42 +-
 .../service/accord/txn/AbstractSerialized.java     |  59 +-
 .../cassandra/service/accord/txn/TxnCondition.java | 218 ++++---
 .../cassandra/service/accord/txn/TxnNamedRead.java |   2 +-
 .../cassandra/service/accord/txn/TxnReference.java | 680 ++++++++++++++-------
 .../service/accord/txn/TxnReferenceOperation.java  |  17 +-
 .../service/accord/txn/TxnReferenceOperations.java |  12 +-
 .../service/accord/txn/TxnReferenceValue.java      |  65 +-
 .../cassandra/service/accord/txn/TxnUpdate.java    |  10 +-
 .../cassandra/service/accord/txn/TxnWrite.java     |   2 +-
 .../cassandra/utils/CollectionSerializers.java     |   9 +
 .../cql3/validation/operations/InsertTest.java     |  52 ++
 .../cassandra/db/marshal/AbstractTypeTest.java     |  75 +++
 .../cassandra/db/partitions/SimplePartition.java   | 223 +++++++
 .../service/accord/txn/TxnConditionTest.java       | 425 ++++++++++++-
 .../cassandra/utils/AbstractTypeGenerators.java    |  39 ++
 26 files changed, 1590 insertions(+), 436 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 3a14287612..4b04683842 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Accord: BEGIN TRANSACTIONs IF condition logic does not properly support 
meaningless emptiness and null values (CASSANDRA-20667)
  * Accord: startup race condition where accord journal tries to access the 2i 
index before its ready (CASSANDRA-20686)
  * Adopt Unsafe::invokeCleaner for Direct ByteBuffer cleaning (CASSANDRA-20677)
  * Add additional metrics around hints (CASSANDRA-20499)
diff --git a/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java 
b/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
index 1f9ba51862..b996f00e26 100644
--- a/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
+++ b/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
@@ -236,6 +236,8 @@ public final class ColumnCondition
          */
         public abstract boolean appliesTo(Row row);
 
+        public abstract boolean isNull(Row row);
+
         public abstract BoundKind kind();
 
         public static final ParameterisedUnversionedSerializer<Bound, 
TableMetadatas> serializer = new ParameterisedUnversionedSerializer<>() {
@@ -294,6 +296,12 @@ public final class ColumnCondition
             return operator.isSatisfiedBy(column.type, rowValue(row), value);
         }
 
+        @Override
+        public boolean isNull(Row row)
+        {
+            return column.type.isNull(rowValue(row));
+        }
+
         protected ByteBuffer rowValue(Row row)
         {
             // If we're asking for a given cell, and we didn't get any row 
from our read, it's
@@ -399,10 +407,21 @@ public final class ColumnCondition
         @Override
         public boolean appliesTo(Row row)
         {
-            ByteBuffer element = ((MultiElementType<?>) 
column.type).getElement(columnData(row), keyOrIndex);
+            ByteBuffer element = elementValue(row);
             return operator.isSatisfiedBy(elementType, element, value);
         }
 
+        private ByteBuffer elementValue(Row row)
+        {
+            return ((MultiElementType<?>) 
column.type).getElement(columnData(row), keyOrIndex);
+        }
+
+        @Override
+        public boolean isNull(Row row)
+        {
+            return column.type.isNull(elementValue(row));
+        }
+
         /**
          * Returns the column data for the given row.
          * @param row the row
@@ -454,6 +473,12 @@ public final class ColumnCondition
             return operator.isSatisfiedBy((MultiElementType<?>) column.type, 
columnData, value);
         }
 
+        @Override
+        public boolean isNull(Row row)
+        {
+            return row == null || row.getComplexColumnData(column) == null;
+        }
+
         @Override
         public boolean equals(Object o)
         {
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java 
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 816fc9a814..e7a561c76c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -428,7 +428,7 @@ public class CQL3CasRequest implements CASRequest
 
         public TxnCondition asTxnCondition()
         {
-            TxnReference txnReference = new 
TxnReference(txnDataName(CAS_READ), null, null);
+            TxnReference txnReference = 
TxnReference.row(txnDataName(CAS_READ));
             return new TxnCondition.Exists(txnReference, 
TxnCondition.Kind.IS_NULL);
         }
     }
@@ -453,7 +453,7 @@ public class CQL3CasRequest implements CASRequest
 
         public TxnCondition asTxnCondition()
         {
-            TxnReference txnReference = new 
TxnReference(txnDataName(CAS_READ), null, null);
+            TxnReference txnReference = 
TxnReference.row(txnDataName(CAS_READ));
             return new TxnCondition.Exists(txnReference, 
TxnCondition.Kind.IS_NOT_NULL);
         }
     }
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
index f27c49cfcb..67a796a28a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
@@ -565,7 +565,7 @@ public class TransactionStatement implements 
CQLStatement.CompositeCQLStatement,
             {
                 RowDataReference reference = returningReferences.get(i);
                 TxnReference txnReference = reference.toTxnReference(options);
-                ByteBuffer buffer = txnReference.toByteBuffer(data, 
resultType.get(i));
+                ByteBuffer buffer = txnReference.asColumn().toByteBuffer(data, 
resultType.get(i));
                 result.add(buffer);
             }
 
diff --git a/src/java/org/apache/cassandra/cql3/terms/Lists.java 
b/src/java/org/apache/cassandra/cql3/terms/Lists.java
index 579b783d8d..d0b0a6eb80 100644
--- a/src/java/org/apache/cassandra/cql3/terms/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/terms/Lists.java
@@ -300,7 +300,7 @@ public abstract class Lists
         @Override
         public boolean requiresTimestamp()
         {
-            return true;
+            return column.type.isMultiCell();
         }
 
         public void execute(DecoratedKey partitionKey, UpdateParameters 
params) throws InvalidRequestException
@@ -397,7 +397,7 @@ public abstract class Lists
         @Override
         public boolean requiresTimestamp()
         {
-            return true;
+            return column.type.isMultiCell();
         }
 
         static void doAppend(Term.Terminal value, ColumnMetadata column, 
UpdateParameters params) throws InvalidRequestException
@@ -455,7 +455,7 @@ public abstract class Lists
         @Override
         public boolean requiresTimestamp()
         {
-            return true;
+            return column.type.isMultiCell();
         }
 
         public void execute(DecoratedKey partitionKey, UpdateParameters 
params) throws InvalidRequestException
diff --git 
a/src/java/org/apache/cassandra/cql3/transactions/ConditionStatement.java 
b/src/java/org/apache/cassandra/cql3/transactions/ConditionStatement.java
index 2ce6f33502..25715b34e1 100644
--- a/src/java/org/apache/cassandra/cql3/transactions/ConditionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/transactions/ConditionStatement.java
@@ -25,6 +25,9 @@ import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.terms.Term;
 import org.apache.cassandra.cql3.VariableSpecifications;
 import org.apache.cassandra.service.accord.txn.TxnCondition;
+import org.apache.cassandra.service.accord.txn.TxnReference;
+
+import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
 
 public class ConditionStatement
 {
@@ -137,7 +140,9 @@ public class ConditionStatement
             case LT:
             case LTE:
                 // TODO: Support for references on LHS and RHS
-                return new 
TxnCondition.Value(reference.toTxnReference(options),
+                TxnReference ref = reference.toTxnReference(options);
+                checkTrue(ref.kind == TxnReference.Kind.COLUMN, "Condition %s 
requires COLUMN reference but given %s", kind, ref.kind);
+                return new TxnCondition.Value(ref.asColumn(),
                                               kind.toTxnKind(reversed),
                                               value.bindAndGet(options),
                                               options.getProtocolVersion());
diff --git 
a/src/java/org/apache/cassandra/cql3/transactions/ReferenceValue.java 
b/src/java/org/apache/cassandra/cql3/transactions/ReferenceValue.java
index d6a4ab8acf..0cd09e783b 100644
--- a/src/java/org/apache/cassandra/cql3/transactions/ReferenceValue.java
+++ b/src/java/org/apache/cassandra/cql3/transactions/ReferenceValue.java
@@ -106,7 +106,7 @@ public abstract class ReferenceValue
         @Override
         public TxnReferenceValue bindAndGet(QueryOptions options)
         {
-            return new 
TxnReferenceValue.Substitution(reference.toTxnReference(options));
+            return new 
TxnReferenceValue.Substitution(reference.toTxnReference(options).asColumn());
         }
 
         public static class Raw extends ReferenceValue.Raw
diff --git 
a/src/java/org/apache/cassandra/cql3/transactions/RowDataReference.java 
b/src/java/org/apache/cassandra/cql3/transactions/RowDataReference.java
index 038bd45963..161e1d5706 100644
--- a/src/java/org/apache/cassandra/cql3/transactions/RowDataReference.java
+++ b/src/java/org/apache/cassandra/cql3/transactions/RowDataReference.java
@@ -187,7 +187,7 @@ public class RowDataReference extends Term.NonTerminal
         Preconditions.checkState(elementPath == null || column.isComplex() || 
column.type.isFrozenCollection());
         Preconditions.checkState(fieldPath == null || column.isComplex() || 
column.type.isUDT());
 
-        return new TxnReference(txnDataName, table, column, 
bindCellPath(options));
+        return TxnReference.columnOrRow(txnDataName, table, column, 
bindCellPath(options));
     }
 
     public ColumnIdentifier getFullyQualifiedName()
diff --git a/src/java/org/apache/cassandra/service/accord/AccordResult.java 
b/src/java/org/apache/cassandra/service/accord/AccordResult.java
index 86315b6d60..9d97d3108d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordResult.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordResult.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.service.accord.api.AccordAgent;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.service.accord.txn.RetryWithNewProtocolResult;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.concurrent.AsyncFuture;
 
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
@@ -142,6 +143,8 @@ public class AccordResult<V> extends AsyncFuture<V> 
implements BiConsumer<V, Thr
             }
             else
             {
+                logger.error("Unexpected exception", fail);
+                JVMStabilityInspector.inspectThrowable(fail);
                 report = bookkeeping.newFailed(txnId, keysOrRanges);
             }
             report.addSuppressed(fail);
diff --git 
a/src/java/org/apache/cassandra/service/accord/txn/AbstractParameterisedUnversionedSerialized.java
 
b/src/java/org/apache/cassandra/service/accord/txn/AbstractParameterisedUnversionedSerialized.java
new file mode 100644
index 0000000000..b0a827b7bf
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/service/accord/txn/AbstractParameterisedUnversionedSerialized.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.io.ParameterisedUnversionedSerializer;
+
+public abstract class AbstractParameterisedUnversionedSerialized<T, P> extends 
AbstractSerialized<T>
+{
+    public AbstractParameterisedUnversionedSerialized(@Nullable ByteBuffer 
latestVersionBytes)
+    {
+        super(latestVersionBytes);
+    }
+
+    protected abstract ParameterisedUnversionedSerializer<T, P> serializer();
+
+    protected T deserialize(P param)
+    {
+        T result = memoized;
+        if (result == null && latestVersionBytes != null)
+            memoized = result = serializer().deserializeUnchecked(param, 
latestVersionBytes);
+        return result;
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/service/accord/txn/AbstractSerialized.java 
b/src/java/org/apache/cassandra/service/accord/txn/AbstractParameterisedVersionedSerialized.java
similarity index 66%
copy from 
src/java/org/apache/cassandra/service/accord/txn/AbstractSerialized.java
copy to 
src/java/org/apache/cassandra/service/accord/txn/AbstractParameterisedVersionedSerialized.java
index ac172a221b..6eb0bb8b9e 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/AbstractSerialized.java
+++ 
b/src/java/org/apache/cassandra/service/accord/txn/AbstractParameterisedVersionedSerialized.java
@@ -31,42 +31,17 @@ import 
org.apache.cassandra.service.accord.serializers.Version;
  * Item that is serialized by default
  */
 @NotThreadSafe
-public abstract class AbstractSerialized<T, P>
+public abstract class AbstractParameterisedVersionedSerialized<T, P> extends 
AbstractSerialized<T>
 {
-    private @Nullable final ByteBuffer latestVersionBytes;
-    private transient @Nullable T memoized = null;
-
-    protected AbstractSerialized(@Nullable ByteBuffer latestVersionBytes)
+    protected AbstractParameterisedVersionedSerialized(@Nullable ByteBuffer 
latestVersionBytes)
     {
-        this.latestVersionBytes = latestVersionBytes;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o) return true;
-        if (o == null || (o.getClass() != getClass())) return false;
-
-        AbstractSerialized<?,?> that = (AbstractSerialized<?,?>) o;
-        return Objects.equals(latestVersionBytes, that.latestVersionBytes);
+        super(latestVersionBytes);
     }
 
-    @Override
-    public int hashCode()
-    {
-        return latestVersionBytes != null ? latestVersionBytes.hashCode() : 0;
-    }
-
-    public abstract long estimatedSizeOnHeap();
     protected abstract ByteBuffer serialize(T value, P param, Version version);
     protected abstract ByteBuffer reserialize(ByteBuffer bytes, P param, 
Version srcVersion, Version trgVersion);
     protected abstract T deserialize(P param, ByteBuffer bytes, Version 
version);
 
-    protected boolean isNull()
-    {
-        return latestVersionBytes == null;
-    }
-
     @Nullable
     protected T deserialize(P param)
     {
@@ -76,17 +51,6 @@ public abstract class AbstractSerialized<T, P>
         return result;
     }
 
-    public void unmemoize()
-    {
-        memoized = null;
-    }
-
-    @Nullable
-    protected ByteBuffer unsafeBytes()
-    {
-        return latestVersionBytes;
-    }
-
     @Nonnull
     protected ByteBuffer bytes(P param, Version target)
     {
diff --git 
a/src/java/org/apache/cassandra/service/accord/txn/AbstractSerialized.java 
b/src/java/org/apache/cassandra/service/accord/txn/AbstractSerialized.java
index ac172a221b..d3d86dfa85 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/AbstractSerialized.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/AbstractSerialized.java
@@ -25,57 +25,28 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import accord.utils.Invariants;
-import org.apache.cassandra.service.accord.serializers.Version;
 
 /**
  * Item that is serialized by default
  */
 @NotThreadSafe
-public abstract class AbstractSerialized<T, P>
+public abstract class AbstractSerialized<T>
 {
-    private @Nullable final ByteBuffer latestVersionBytes;
-    private transient @Nullable T memoized = null;
+    protected @Nullable final ByteBuffer latestVersionBytes;
+    protected transient @Nullable T memoized = null;
 
-    protected AbstractSerialized(@Nullable ByteBuffer latestVersionBytes)
+    public AbstractSerialized(@Nullable ByteBuffer latestVersionBytes)
     {
         this.latestVersionBytes = latestVersionBytes;
     }
 
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o) return true;
-        if (o == null || (o.getClass() != getClass())) return false;
-
-        AbstractSerialized<?,?> that = (AbstractSerialized<?,?>) o;
-        return Objects.equals(latestVersionBytes, that.latestVersionBytes);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return latestVersionBytes != null ? latestVersionBytes.hashCode() : 0;
-    }
-
     public abstract long estimatedSizeOnHeap();
-    protected abstract ByteBuffer serialize(T value, P param, Version version);
-    protected abstract ByteBuffer reserialize(ByteBuffer bytes, P param, 
Version srcVersion, Version trgVersion);
-    protected abstract T deserialize(P param, ByteBuffer bytes, Version 
version);
 
     protected boolean isNull()
     {
         return latestVersionBytes == null;
     }
 
-    @Nullable
-    protected T deserialize(P param)
-    {
-        T result = memoized;
-        if (result == null && latestVersionBytes != null)
-            memoized = result = deserialize(param, latestVersionBytes, 
Version.LATEST);
-        return result;
-    }
-
     public void unmemoize()
     {
         memoized = null;
@@ -88,11 +59,25 @@ public abstract class AbstractSerialized<T, P>
     }
 
     @Nonnull
-    protected ByteBuffer bytes(P param, Version target)
+    protected ByteBuffer bytes()
     {
         Invariants.nonNull(latestVersionBytes);
-        if (Version.LATEST == target)
-            return latestVersionBytes;
-        return reserialize(latestVersionBytes, param, Version.LATEST, target);
+        return latestVersionBytes;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || (o.getClass() != getClass())) return false;
+
+        AbstractSerialized<?> that = (AbstractSerialized<?>) o;
+        return Objects.equals(latestVersionBytes, that.latestVersionBytes);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return latestVersionBytes != null ? latestVersionBytes.hashCode() : 0;
     }
 }
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
index 881a332162..589f954fb6 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.service.accord.txn;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -38,19 +39,19 @@ import 
org.apache.cassandra.cql3.conditions.ColumnCondition.Bound;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.db.partitions.FilteredPartition;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.ColumnData;
 import org.apache.cassandra.db.rows.ComplexColumnData;
 import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.io.ParameterisedVersionedSerializer;
+import org.apache.cassandra.io.ParameterisedUnversionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.accord.serializers.TableMetadatas;
-import org.apache.cassandra.service.accord.serializers.Version;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
@@ -67,7 +68,7 @@ import static 
org.apache.cassandra.utils.CollectionSerializers.serializedListSiz
 
 public abstract class TxnCondition
 {
-    public static class SerializedTxnCondition extends 
AbstractSerialized<TxnCondition, TableMetadatas>
+    public static class SerializedTxnCondition extends 
AbstractParameterisedUnversionedSerialized<TxnCondition, TableMetadatas>
     {
         private static final long EMPTY_SIZE = ObjectSizes.measure(new 
SerializedTxnCondition(null));
 
@@ -76,41 +77,29 @@ public abstract class TxnCondition
             super(latestVersionBytes);
         }
 
-        protected SerializedTxnCondition(TxnCondition condition, 
TableMetadatas param)
-        {
-            this(serializer.serializeUnchecked(condition, param, 
Version.LATEST));
-        }
-
         @Override
-        public long estimatedSizeOnHeap()
+        protected ParameterisedUnversionedSerializer<TxnCondition, 
TableMetadatas> serializer()
         {
-            return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(unsafeBytes());
+            return serializer;
         }
 
-        @Override
-        protected ByteBuffer serialize(TxnCondition value, TableMetadatas 
param, Version version)
-        {
-            return serializer.serializeUnchecked(value, param, version);
-        }
-
-        @Override
-        protected ByteBuffer reserialize(ByteBuffer bytes, TableMetadatas 
param, Version srcVersion, Version trgVersion)
+        protected SerializedTxnCondition(TxnCondition condition, 
TableMetadatas param)
         {
-            return bytes;
+            this(serializer.serializeUnchecked(condition, param));
         }
 
         @Override
-        protected TxnCondition deserialize(TableMetadatas param, ByteBuffer 
bytes, Version version)
+        public long estimatedSizeOnHeap()
         {
-            return serializer.deserializeUnchecked(param, bytes, version);
+            return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(unsafeBytes());
         }
     }
 
     private interface ConditionSerializer<T extends TxnCondition>
     {
-        void serialize(T condition, TableMetadatas tables, DataOutputPlus out, 
Version version) throws IOException;
-        T deserialize(TableMetadatas tables, DataInputPlus in, Version 
version, Kind kind) throws IOException;
-        long serializedSize(T condition, TableMetadatas tables, Version 
version);
+        void serialize(T condition, TableMetadatas tables, DataOutputPlus out) 
throws IOException;
+        T deserialize(TableMetadatas tables, DataInputPlus in, Kind kind) 
throws IOException;
+        long serializedSize(T condition, TableMetadatas tables);
     }
 
     public enum Kind
@@ -227,11 +216,11 @@ public abstract class TxnCondition
         private static final ConditionSerializer<None> serializer = new 
ConditionSerializer<>()
         {
             @Override
-            public void serialize(None condition, TableMetadatas tables, 
DataOutputPlus out, Version version) {}
+            public void serialize(None condition, TableMetadatas tables, 
DataOutputPlus out) {}
             @Override
-            public None deserialize(TableMetadatas tables, DataInputPlus in, 
Version version, Kind kind) { return instance; }
+            public None deserialize(TableMetadatas tables, DataInputPlus in, 
Kind kind) { return instance; }
             @Override
-            public long serializedSize(None condition, TableMetadatas tables, 
Version version) { return 0; }
+            public long serializedSize(None condition, TableMetadatas tables) 
{ return 0; }
         };
     }
 
@@ -266,9 +255,7 @@ public abstract class TxnCondition
         @Override
         public void collect(TableMetadatas.Collector collector)
         {
-            TableMetadata table = reference.table();
-            if (table != null)
-                collector.add(table);
+            reference.collect(collector);
         }
 
         @Override
@@ -283,12 +270,10 @@ public abstract class TxnCondition
             return reference.toString() + ' ' + kind.toString();
         }
 
-        @Override
-        public boolean applies(TxnData data)
+        private boolean applies(FilteredPartition partition, boolean exists, 
TxnReference.ColumnReference ref)
         {
-            FilteredPartition partition = reference.getPartition(data);
-            boolean exists = partition != null && !partition.isEmpty();
-
+            ColumnMetadata column = ref.column();
+            if (column.isPartitionKey()) return exists;
             Row row = null;
             if (exists)
             {
@@ -296,9 +281,9 @@ public abstract class TxnCondition
                 exists = row != null && !row.isEmpty();
             }
 
-            if (exists && reference.selectsColumn())
+            if (exists)
             {
-                ColumnData columnData = reference.getColumnData(row);
+                ColumnData columnData = ref.getColumnData(row);
 
                 if (columnData == null)
                 {
@@ -306,10 +291,43 @@ public abstract class TxnCondition
                 }
                 else if (columnData.column().isComplex())
                 {
-                    if (reference.isElementSelection() || 
reference.isFieldSelection())
+                    if (ref.isElementSelection())
                     {
                         Cell<?> cell = (Cell<?>) columnData;
                         exists = !cell.isTombstone();
+                        // Collections don't support NULL but meangingless 
null types are supported, so byte[0] is allowed!
+                        // This is NULL when touched, so need to still check 
each value
+                        if (exists)
+                        {
+                            CollectionType<?> type = (CollectionType<?>) 
column.type.unwrap();
+                            switch (type.kind)
+                            {
+                                case MAP:
+                                {
+                                    exists = 
!type.nameComparator().isNull(cell.path().get(0));
+                                    if (exists)
+                                        exists = 
!type.valueComparator().isNull(cell.buffer());
+                                }
+                                break;
+                                case SET:
+                                {
+                                    exists = 
!type.nameComparator().isNull(cell.path().get(0));
+                                }
+                                break;
+                                case LIST:
+                                {
+                                    exists = 
!type.valueComparator().isNull(cell.buffer());
+                                }
+                                break;
+                                default:
+                                    throw new 
UnsupportedOperationException(type.kind.name());
+                            }
+                        }
+                    }
+                    else if (ref.isFieldSelection())
+                    {
+                        Cell<?> cell = (Cell<?>) columnData;
+                        exists = exists(cell, ref.getFieldSelectionType());
                     }
                     else
                     {
@@ -318,26 +336,45 @@ public abstract class TxnCondition
                             exists = false;
                     }
                 }
-                else if (reference.isElementSelection())
+                else if (ref.isElementSelection())
                 {
                     // This is frozen, so check if the Cell is a tombstone and 
that the element is present.
                     Cell<?> cell = (Cell<?>) columnData;
-                    ByteBuffer element = 
reference.getFrozenCollectionElement(cell);
-                    exists = element != null && !cell.isTombstone();
+                    exists = exists(cell, column.type);
+                    if (exists)
+                    {
+                        ByteBuffer element = 
ref.getFrozenCollectionElement(cell);
+                        exists = 
!ref.getFrozenCollectionElementType().isNull(element);
+                    }
                 }
-                else if (reference.isFieldSelection())
+                else if (ref.isFieldSelection())
                 {
                     // This is frozen, so check if the Cell is a tombstone and 
that the field is present.
                     Cell<?> cell = (Cell<?>) columnData;
-                    ByteBuffer fieldValue = 
reference.getFrozenFieldValue(cell);
-                    exists = fieldValue != null && !cell.isTombstone();
+                    exists = exists(cell, column.type);
+                    if (exists)
+                    {
+                        ByteBuffer fieldValue = ref.getFrozenFieldValue(cell);
+                        exists = 
!ref.getFieldSelectionType().isNull(fieldValue);
+                    }
                 }
                 else
                 {
                     Cell<?> cell = (Cell<?>) columnData;
-                    exists = !cell.isTombstone();
+                    exists = exists(cell, column.type);
                 }
             }
+            return exists;
+        }
+
+        @Override
+        public boolean applies(TxnData data)
+        {
+            FilteredPartition partition = reference.getPartition(data);
+            boolean exists = partition != null && !partition.isEmpty();
+
+            if (reference.kind == TxnReference.Kind.COLUMN)
+                exists = applies(partition, exists, reference.asColumn());
 
             switch (kind())
             {
@@ -350,24 +387,29 @@ public abstract class TxnCondition
             }
         }
 
-        private static final ConditionSerializer<Exists> serializer = new 
ConditionSerializer<Exists>()
+        private static boolean exists(Cell<?> cell, AbstractType<?> type)
+        {
+            return !cell.isTombstone() && !type.unwrap().isNull(cell.buffer());
+        }
+
+        private static final ConditionSerializer<Exists> serializer = new 
ConditionSerializer<>()
         {
             @Override
-            public void serialize(Exists condition, TableMetadatas tables, 
DataOutputPlus out, Version version) throws IOException
+            public void serialize(Exists condition, TableMetadatas tables, 
DataOutputPlus out) throws IOException
             {
-                TxnReference.serializer.serialize(condition.reference, tables, 
out, version);
+                TxnReference.serializer.serialize(condition.reference, tables, 
out);
             }
 
             @Override
-            public Exists deserialize(TableMetadatas tables, DataInputPlus in, 
Version version, Kind kind) throws IOException
+            public Exists deserialize(TableMetadatas tables, DataInputPlus in, 
Kind kind) throws IOException
             {
-                return new Exists(TxnReference.serializer.deserialize(tables, 
in, version), kind);
+                return new Exists(TxnReference.serializer.deserialize(tables, 
in), kind);
             }
 
             @Override
-            public long serializedSize(Exists condition, TableMetadatas 
tables, Version version)
+            public long serializedSize(Exists condition, TableMetadatas tables)
             {
-                return 
TxnReference.serializer.serializedSize(condition.reference, tables, version);
+                return 
TxnReference.serializer.serializedSize(condition.reference, tables);
             }
         };
     }
@@ -417,14 +459,14 @@ public abstract class TxnCondition
         private static final ConditionSerializer<ColumnConditionsAdapter> 
serializer = new ConditionSerializer<ColumnConditionsAdapter>()
         {
             @Override
-            public void serialize(ColumnConditionsAdapter condition, 
TableMetadatas tables, DataOutputPlus out, Version version) throws IOException
+            public void serialize(ColumnConditionsAdapter condition, 
TableMetadatas tables, DataOutputPlus out) throws IOException
             {
                 clusteringSerializer.serialize(condition.clustering, out);
                 serializeCollection(condition.bounds, tables, out, 
Bound.serializer);
             }
 
             @Override
-            public ColumnConditionsAdapter deserialize(TableMetadatas tables, 
DataInputPlus in, Version version, Kind ignored) throws IOException
+            public ColumnConditionsAdapter deserialize(TableMetadatas tables, 
DataInputPlus in, Kind ignored) throws IOException
             {
                 Clustering<?> clustering = 
clusteringSerializer.deserialize(in);
                 List<Bound> bounds = deserializeList(tables, in, 
Bound.serializer);
@@ -432,7 +474,7 @@ public abstract class TxnCondition
             }
 
             @Override
-            public long serializedSize(ColumnConditionsAdapter condition, 
TableMetadatas tables, Version version)
+            public long serializedSize(ColumnConditionsAdapter condition, 
TableMetadatas tables)
             {
                 return 
clusteringSerializer.serializedSize(condition.clustering)
                     + serializedCollectionSize(condition.bounds, tables, 
Bound.serializer);
@@ -442,24 +484,28 @@ public abstract class TxnCondition
 
     public static class Value extends TxnCondition
     {
-        private static final Set<Kind> KINDS = ImmutableSet.of(Kind.EQUAL, 
Kind.NOT_EQUAL,
-                                                               
Kind.GREATER_THAN, Kind.GREATER_THAN_OR_EQUAL,
-                                                               Kind.LESS_THAN, 
Kind.LESS_THAN_OR_EQUAL);
+        private static final EnumSet<Kind> KINDS = EnumSet.of(Kind.EQUAL, 
Kind.NOT_EQUAL,
+                                                              
Kind.GREATER_THAN, Kind.GREATER_THAN_OR_EQUAL,
+                                                              Kind.LESS_THAN, 
Kind.LESS_THAN_OR_EQUAL);
 
-        private final TxnReference reference;
+        private final TxnReference.ColumnReference reference;
         private final ByteBuffer value;
         private final ProtocolVersion version;
 
-        public Value(TxnReference reference, Kind kind, ByteBuffer value, 
ProtocolVersion version)
+        public Value(TxnReference.ColumnReference reference, Kind kind, 
ByteBuffer value, ProtocolVersion version)
         {
             super(kind);
             Invariants.requireArgument(KINDS.contains(kind), "Kind " + kind + 
" cannot be used with a value condition");
-            Invariants.requireArgument(reference.selectsColumn(), "Reference " 
+ reference + " does not select a column");
             this.reference = reference;
             this.value = value;
             this.version = version;
         }
 
+        public static EnumSet<Kind> supported()
+        {
+            return EnumSet.copyOf(KINDS);
+        }
+
         @Override
         public boolean equals(Object o)
         {
@@ -473,9 +519,7 @@ public abstract class TxnCondition
         @Override
         public void collect(TableMetadatas.Collector collector)
         {
-            TableMetadata table = reference.table();
-            if (table != null)
-                collector.add(table);
+            reference.collect(collector);
         }
 
         @Override
@@ -532,33 +576,39 @@ public abstract class TxnCondition
         @Override
         public boolean applies(TxnData data)
         {
-            return getBounds(data).appliesTo(reference.getRow(data));
+            Bound bounds = getBounds(data);
+            if (reference.column().type.unwrap().isNull(bounds.value))
+                return false;
+            Row row = reference.getRow(data);
+            if (bounds.isNull(row))
+                return false;
+            return bounds.appliesTo(row);
         }
 
         private static final ConditionSerializer<Value> serializer = new 
ConditionSerializer<>()
         {
             @Override
-            public void serialize(Value condition, TableMetadatas tables, 
DataOutputPlus out, Version version) throws IOException
+            public void serialize(Value condition, TableMetadatas tables, 
DataOutputPlus out) throws IOException
             {
-                TxnReference.serializer.serialize(condition.reference, tables, 
out, version);
+                TxnReference.serializer.serialize(condition.reference, tables, 
out);
                 ByteBufferUtil.writeWithVIntLength(condition.value, out);
                 out.writeUTF(condition.version.name());
             }
 
             @Override
-            public Value deserialize(TableMetadatas tables, DataInputPlus in, 
Version version, Kind kind) throws IOException
+            public Value deserialize(TableMetadatas tables, DataInputPlus in, 
Kind kind) throws IOException
             {
-                TxnReference reference = 
TxnReference.serializer.deserialize(tables, in, version);
+                TxnReference.ColumnReference reference = 
TxnReference.serializer.deserialize(tables, in).asColumn();
                 ByteBuffer value = ByteBufferUtil.readWithVIntLength(in);
                 ProtocolVersion protocolVersion = 
ProtocolVersion.valueOf(in.readUTF());
                 return new Value(reference, kind, value, protocolVersion);
             }
 
             @Override
-            public long serializedSize(Value condition, TableMetadatas tables, 
Version version)
+            public long serializedSize(Value condition, TableMetadatas tables)
             {
                 long size = 0;
-                size += 
TxnReference.serializer.serializedSize(condition.reference, tables, version);
+                size += 
TxnReference.serializer.serializedSize(condition.reference, tables);
                 size += 
ByteBufferUtil.serializedSizeWithVIntLength(condition.value);
                 size += TypeSizes.sizeof(condition.version.name());
                 return size;
@@ -625,48 +675,48 @@ public abstract class TxnCondition
         private static final ConditionSerializer<BooleanGroup> serializer = 
new ConditionSerializer<>()
         {
             @Override
-            public void serialize(BooleanGroup condition, TableMetadatas 
tables, DataOutputPlus out, Version version) throws IOException
+            public void serialize(BooleanGroup condition, TableMetadatas 
tables, DataOutputPlus out) throws IOException
             {
-                serializeList(condition.conditions, tables, out, version, 
TxnCondition.serializer);
+                serializeList(condition.conditions, tables, out, 
TxnCondition.serializer);
             }
 
             @Override
-            public BooleanGroup deserialize(TableMetadatas tables, 
DataInputPlus in, Version version, Kind kind) throws IOException
+            public BooleanGroup deserialize(TableMetadatas tables, 
DataInputPlus in, Kind kind) throws IOException
             {
-                return new BooleanGroup(kind, deserializeList(tables, in, 
version, TxnCondition.serializer));
+                return new BooleanGroup(kind, deserializeList(tables, in, 
TxnCondition.serializer));
             }
 
             @Override
-            public long serializedSize(BooleanGroup condition, TableMetadatas 
tables, Version version)
+            public long serializedSize(BooleanGroup condition, TableMetadatas 
tables)
             {
-                return serializedListSize(condition.conditions, tables, 
version, TxnCondition.serializer);
+                return serializedListSize(condition.conditions, tables, 
TxnCondition.serializer);
             }
         };
     }
 
-    public static final ParameterisedVersionedSerializer<TxnCondition, 
TableMetadatas, Version> serializer = new ParameterisedVersionedSerializer<>()
+    public static final ParameterisedUnversionedSerializer<TxnCondition, 
TableMetadatas> serializer = new ParameterisedUnversionedSerializer<>()
     {
         @SuppressWarnings("unchecked")
         @Override
-        public void serialize(TxnCondition condition, TableMetadatas tables, 
DataOutputPlus out, Version version) throws IOException
+        public void serialize(TxnCondition condition, TableMetadatas tables, 
DataOutputPlus out) throws IOException
         {
             out.writeUnsignedVInt32(condition.kind.ordinal());
-            condition.kind.serializer().serialize(condition, tables, out, 
version);
+            condition.kind.serializer().serialize(condition, tables, out);
         }
 
         @Override
-        public TxnCondition deserialize(TableMetadatas tables, DataInputPlus 
in, Version version) throws IOException
+        public TxnCondition deserialize(TableMetadatas tables, DataInputPlus 
in) throws IOException
         {
             Kind kind = Kind.values()[in.readUnsignedVInt32()];
-            return kind.serializer().deserialize(tables, in, version, kind);
+            return kind.serializer().deserialize(tables, in, kind);
         }
 
         @SuppressWarnings("unchecked")
         @Override
-        public long serializedSize(TxnCondition condition, TableMetadatas 
tables, Version version)
+        public long serializedSize(TxnCondition condition, TableMetadatas 
tables)
         {
             long size = TypeSizes.sizeofUnsignedVInt(condition.kind.ordinal());
-            size += condition.kind.serializer().serializedSize(condition, 
tables, version);
+            size += condition.kind.serializer().serializedSize(condition, 
tables);
             return size;
         }
     };
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
index ea27a398c2..ab774c806a 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
@@ -78,7 +78,7 @@ import static 
org.apache.cassandra.utils.ByteBufferUtil.readWithVIntLength;
 import static 
org.apache.cassandra.utils.ByteBufferUtil.serializedSizeWithVIntLength;
 import static org.apache.cassandra.utils.ByteBufferUtil.writeWithVIntLength;
 
-public class TxnNamedRead extends AbstractSerialized<ReadCommand, 
TableMetadatas>
+public class TxnNamedRead extends 
AbstractParameterisedVersionedSerialized<ReadCommand, TableMetadatas>
 {
     @SuppressWarnings("unused")
     private static final Logger logger = 
LoggerFactory.getLogger(TxnNamedRead.class);
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnReference.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnReference.java
index 59a1c5afb6..13d021930a 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnReference.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnReference.java
@@ -20,141 +20,118 @@ package org.apache.cassandra.service.accord.txn;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.Objects;
 
+import javax.annotation.Nullable;
+
+import accord.utils.Invariants;
+import accord.utils.TinyEnumSet;
+import accord.utils.UnhandledEnum;
 import accord.utils.VIntCoding;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.marshal.ValueAccessor;
 import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.rows.AbstractCell;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.CellPath;
 import org.apache.cassandra.db.rows.ColumnData;
 import org.apache.cassandra.db.rows.ComplexColumnData;
 import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.io.ParameterisedVersionedSerializer;
+import org.apache.cassandra.io.ParameterisedUnversionedSerializer;
+import org.apache.cassandra.io.UnversionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.accord.serializers.TableMetadatas;
-import org.apache.cassandra.service.accord.serializers.Version;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
 
 import static org.apache.cassandra.db.marshal.CollectionType.Kind.SET;
 import static 
org.apache.cassandra.service.accord.AccordSerializers.columnMetadataSerializer;
 
-public class TxnReference
+public abstract class TxnReference
 {
-    private final int tuple;
-    private final TableMetadata table;
-    private final ColumnMetadata column;
-    private final CellPath path;
+    public final Kind kind;
+    protected final int tuple;
 
-    public TxnReference(int tuple, TableMetadata table, ColumnMetadata column, 
CellPath path)
+    private TxnReference(Kind kind, int tuple)
     {
+        this.kind = kind;
         this.tuple = tuple;
-        this.table = table;
-        this.column = column;
-        this.path = path;
-    }
-
-    public TxnReference(int tuple, ColumnMetadata column, TableMetadata table)
-    {
-        this(tuple, table, column, null);
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        TxnReference reference = (TxnReference) o;
-        return tuple == reference.tuple && Objects.equals(column, 
reference.column) && Objects.equals(path, reference.path);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return Objects.hash(tuple, column, path);
-    }
-
-    @Override
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder().append(tuple);
-        if (column != null)
-            
sb.append(':').append(column.ksName).append('.').append(column.cfName).append('.').append(column.name.toString());
-        if (path != null)
-            sb.append(path);
-        return sb.toString();
-    }
-
-    public ColumnMetadata column()
-    {
-        return column;
-    }
-
-    public TableMetadata table()
-    {
-        return table;
     }
 
-    public void collect(TableMetadatas.Collector collector)
+    /**
+     * Creates a reference to a "row".  This method isn't directly used by the 
main logic and instead
+     * exists to aid in testing.
+     */
+    public static RowReference row(int tuple)
     {
-        collector.add(table);
+        return new RowReference(tuple);
     }
 
-    public CellPath path()
+    /**
+     * Creates a reference to a "column".  This method isn't directly used by 
the main logic and instead
+     * exists to aid in testing.
+     */
+    public static ColumnReference column(int tuple, TableMetadata table, 
ColumnMetadata column)
     {
-        return path;
-    }
-    
-    public boolean selectsColumn()
-    {
-        return column != null;
+        return column(tuple, table, column, null);
     }
 
-    public boolean selectsPath()
+    /**
+     * Creates a reference to a "column".  This method isn't directly used by 
the main logic and instead
+     * exists to aid in testing.
+     */
+    public static ColumnReference column(int tuple, TableMetadata table, 
ColumnMetadata column, CellPath path)
     {
-        return selectsColumn() && path != null;
+        Invariants.nonNull(table, "table is null");
+        Invariants.nonNull(column, "column is null");
+        return new ColumnReference(tuple, table, column, path);
     }
 
-    public boolean isElementSelection()
+    public static TxnReference columnOrRow(int tuple,
+                                           @Nullable TableMetadata table,
+                                           @Nullable ColumnMetadata column,
+                                           @Nullable CellPath path)
     {
-        return selectsPath() && column.type.isCollection();
+        if (column == null)
+        {
+            Invariants.require(table == null, "Column is null but table isn't; 
unknown reference type");
+            Invariants.require(path == null, "Column is null but path isn't; 
unknown reference type");
+            return row(tuple);
+        }
+        return column(tuple, table, column, path);
     }
 
-    public boolean isFieldSelection()
-    {
-        return selectsPath() && column.type.isUDT();
-    }
+    public abstract void collect(TableMetadatas.Collector collector);
 
-    public ByteBuffer getPartitionKey(TxnData data)
+    public RowReference asRow()
     {
-        FilteredPartition partition = getPartition(data);
-        if (partition == null) return null;
-        return partition.metadata().partitionKeyColumns().size() == 1
-               ? partition.partitionKey().getKey()
-               : ((CompositeType) 
partition.metadata().partitionKeyType).split(partition.partitionKey().getKey())[column.position()];
+        Invariants.require(kind == Kind.ROW, "Expected to be a row but was a 
%s", kind);
+        return (RowReference) this;
     }
 
-    public ByteBuffer getClusteringKey(TxnData data)
+    public ColumnReference asColumn()
     {
-        Row row = getRow(data);
-        if (row == null)
-            return null;
-        return row.clustering().bufferAt(column.position());
+        Invariants.require(kind == Kind.COLUMN, "Expected to be a column but 
was a %s", kind);
+        return (ColumnReference) this;
     }
 
     public TxnDataKeyValue getPartition(TxnData data)
     {
         return (TxnDataKeyValue)data.get(tuple);
     }
-    
+
     public Row getRow(TxnData data)
     {
         FilteredPartition partition = getPartition(data);
@@ -163,7 +140,7 @@ public class TxnReference
 
     public Row getRow(FilteredPartition partition)
     {
-        if (column != null && column.isStatic())
+        if (kind == Kind.COLUMN && asColumn().column.isStatic())
             return partition.staticRow();
         assert partition.rowCount() <= 1 : "Multi-row references are not 
allowed";
         if (partition.rowCount() == 0)
@@ -171,184 +148,477 @@ public class TxnReference
         return partition.getAtIdx(0);
     }
 
-    public ColumnData getColumnData(Row row)
+    public static final UnversionedSerializer<RowReference> rowSerializer = 
new UnversionedSerializer<>()
     {
-        if (column.isComplex() && path == null)
-            return row.getComplexColumnData(column);
+        @Override
+        public void serialize(RowReference reference, DataOutputPlus out) 
throws IOException
+        {
+            out.writeUnsignedVInt32(reference.tuple);
+        }
 
-        if (path != null && column.type.isMultiCell())
+        @Override
+        public RowReference deserialize(DataInputPlus in) throws IOException
         {
-            if (column.type.isCollection())
-            {
-                CollectionType<?> collectionType = (CollectionType<?>) 
column.type;
+            int tuple = in.readUnsignedVInt32();
+            return row(tuple);
+        }
 
-                if (collectionType.kind == CollectionType.Kind.LIST)
-                    return 
row.getComplexColumnData(column).getCellByIndex(ByteBufferUtil.toInt(path.get(0)));
-            }
+        @Override
+        public long serializedSize(RowReference reference)
+        {
+            return VIntCoding.sizeOfUnsignedVInt(reference.tuple);
+        }
+    };
 
-            return row.getCell(column, path);
+    public static final ParameterisedUnversionedSerializer<ColumnReference, 
TableMetadatas> columnSerializer = new 
ParameterisedUnversionedSerializer<ColumnReference, TableMetadatas>()
+    {
+        @Override
+        public void serialize(ColumnReference reference, TableMetadatas 
tables, DataOutputPlus out) throws IOException
+        {
+            out.writeUnsignedVInt32(reference.tuple);
+            tables.serialize(reference.table, out);
+            columnMetadataSerializer.serialize(reference.column, 
reference.table, out);
+            out.writeBoolean(reference.path != null);
+            if (reference.path != null)
+                CollectionType.cellPathSerializer.serialize(reference.path, 
out);
         }
 
-        return row.getCell(column);
-    }
+        @Override
+        public ColumnReference deserialize(TableMetadatas tables, 
DataInputPlus in) throws IOException
+        {
+            int tuple = in.readUnsignedVInt32();
+            TableMetadata table = tables.deserialize(in);
+            ColumnMetadata column = 
columnMetadataSerializer.deserialize(table, in);
+            CellPath path = in.readBoolean() ? 
CollectionType.cellPathSerializer.deserialize(in) : null;
+            return TxnReference.column(tuple, table, column, path);
+        }
 
-    public ColumnData getColumnData(TxnData data)
-    {
-        Row row = getRow(data);
-        return row != null ? getColumnData(row) : null;
-    }
+        @Override
+        public long serializedSize(ColumnReference reference, TableMetadatas 
tables)
+        {
+            long size = 0;
+            size += VIntCoding.sizeOfUnsignedVInt(reference.tuple);
+            size += tables.serializedSize(reference.table);
+            size += columnMetadataSerializer.serializedSize(reference.column, 
reference.table);
+            size += TypeSizes.BOOL_SIZE;
+            if (reference.path != null)
+                size += 
CollectionType.cellPathSerializer.serializedSize(reference.path);
+            return size;
+        }
+    };
 
-    public ByteBuffer getFrozenCollectionElement(Cell<?> collection)
-    {
-        CollectionType<?> collectionType = (CollectionType<?>) column.type;
-        return 
collectionType.getSerializer().getSerializedValue(collection.buffer(), 
path.get(0), collectionType.nameComparator());
-    }
 
-    public ByteBuffer getFrozenFieldValue(Cell<?> udt)
+    static final ParameterisedUnversionedSerializer<TxnReference, 
TableMetadatas> serializer = new ParameterisedUnversionedSerializer<>()
     {
-        UserType userType = (UserType) column.type;
-        int field = ByteBufferUtil.getUnsignedShort(path.get(0), 0);
-        return userType.unpack(udt.buffer()).get(field);
-    }
+        @Override
+        public void serialize(TxnReference reference, TableMetadatas tables, 
DataOutputPlus out) throws IOException
+        {
+            out.writeUnsignedVInt32(TinyEnumSet.encode(reference.kind));
+            switch (reference.kind)
+            {
+                case ROW:
+                    rowSerializer.serialize(reference.asRow(), out);
+                    break;
+                case COLUMN:
+                    columnSerializer.serialize(reference.asColumn(), tables, 
out);
+                    break;
+                default:
+                    throw new UnhandledEnum(reference.kind);
+            }
+        }
+
+        @Override
+        public TxnReference deserialize(TableMetadatas tables, DataInputPlus 
in) throws IOException
+        {
+            TinyEnumSet<TxnReference.Kind> kind = new 
TinyEnumSet<>(in.readUnsignedVInt32());
+            if (kind.contains(Kind.ROW)) return rowSerializer.deserialize(in);
+            if (kind.contains(Kind.COLUMN)) return 
columnSerializer.deserialize(tables, in);
+            throw Invariants.illegalArgument("Unexpected kind: " + kind);
+        }
+
+        @Override
+        public long serializedSize(TxnReference reference, TableMetadatas 
tables)
+        {
+            long size = 
VIntCoding.sizeOfUnsignedVInt(TinyEnumSet.encode(reference.kind));
+            switch (reference.kind)
+            {
+                case ROW:
+                    size += rowSerializer.serializedSize(reference.asRow());
+                    break;
+                case COLUMN:
+                    size += 
columnSerializer.serializedSize(reference.asColumn(), tables);
+                    break;
+                default:
+                    throw new UnhandledEnum(reference.kind);
+            }
+            return size;
+        }
+    };
 
-    public AbstractType<?> getFieldSelectionType()
+    public enum Kind { ROW, COLUMN }
+
+    public static class RowReference extends TxnReference
     {
-        assert isFieldSelection() : "No field selection type exists";
-        UserType userType = (UserType) column.type;
-        int field = ByteBufferUtil.getUnsignedShort(path.get(0), 0);
-        return userType.fieldType(field);
+        public RowReference(int tuple)
+        {
+            super(Kind.ROW, tuple);
+        }
+
+        @Override
+        public void collect(TableMetadatas.Collector collector)
+        {
+            // no-op
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (o == null || getClass() != o.getClass()) return false;
+            RowReference that = (RowReference) o;
+            return tuple == that.tuple;
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(tuple);
+        }
+
+        @Override
+        public String toString()
+        {
+            return Integer.toString(tuple);
+        }
     }
 
-    public ByteBuffer toByteBuffer(TxnData data, AbstractType<?> receiver)
+    public static class ColumnReference extends TxnReference
     {
-        // TODO: confirm all references can be satisfied as part of the txn 
condition
-        AbstractType<?> type = column().type;
+        private final TableMetadata table;
+        private final ColumnMetadata column;
+        @Nullable
+        private final CellPath path;
+
+        public ColumnReference(int tuple, TableMetadata table, ColumnMetadata 
column, @Nullable CellPath path)
+        {
+            super(Kind.COLUMN, tuple);
+            this.table = table;
+            this.column = column;
+            this.path = path;
+        }
+
+        public ColumnMetadata column()
+        {
+            return column;
+        }
+
+        public TableMetadata table()
+        {
+            return table;
+        }
+
+        @Nullable
+        public CellPath path()
+        {
+            return path;
+        }
+
+        public boolean selectsPath()
+        {
+            return path != null;
+        }
+
+        public boolean isElementSelection()
+        {
+            return selectsPath() && column.type.isCollection();
+        }
+
+        public boolean isFieldSelection()
+        {
+            return selectsPath() && column.type.isUDT();
+        }
+
+        public ByteBuffer getPartitionKey(TxnData data)
+        {
+            FilteredPartition partition = getPartition(data);
+            if (partition == null) return null;
+            return partition.metadata().partitionKeyColumns().size() == 1
+                   ? partition.partitionKey().getKey()
+                   : ((CompositeType) 
partition.metadata().partitionKeyType).split(partition.partitionKey().getKey())[column.position()];
+        }
+
+        @Override
+        public void collect(TableMetadatas.Collector collector)
+        {
+            collector.add(table);
+        }
+
+        public ByteBuffer getClusteringKey(TxnData data)
+        {
+            Row row = getRow(data);
+            if (row == null)
+                return null;
+            return row.clustering().bufferAt(column.position());
+        }
 
-        // Modify the type we'll check if the reference is to a collection 
element.
-        if (selectsPath())
+        public ColumnData getColumnData(Row row)
         {
-            if (type.isCollection())
+            if (column.isClusteringColumn())
+                return new ClusteringColumnData(column, 
row.clustering().bufferAt(column.position()));
+            if (column.isComplex() && path == null)
+                return row.getComplexColumnData(column);
+
+            if (path != null && column.type.isMultiCell())
             {
-                CollectionType<?> collectionType = (CollectionType<?>) type;
-                type = collectionType.kind == SET ? 
collectionType.nameComparator() : collectionType.valueComparator();
+                if (column.type.isCollection())
+                {
+                    CollectionType<?> collectionType = (CollectionType<?>) 
column.type;
+
+                    if (collectionType.kind == CollectionType.Kind.LIST)
+                        return 
row.getComplexColumnData(column).getCellByIndex(ByteBufferUtil.toInt(path.get(0)));
+                }
+
+                return row.getCell(column, path);
             }
-            else if (type.isUDT())
-                type = getFieldSelectionType();
+
+            return row.getCell(column);
+        }
+
+        public ColumnData getColumnData(TxnData data)
+        {
+            Row row = getRow(data);
+            return row != null ? getColumnData(row) : null;
         }
 
-        // Account for frozen collection and reversed clustering key 
references:
-        AbstractType<?> receiveType = type.isFrozenCollection() ? 
receiver.freeze().unwrap() : receiver.unwrap();
-        if (!(receiveType == type.unwrap()))
-            throw new IllegalArgumentException("Receiving type " + receiveType 
+ " does not match " + type.unwrap());
+        public ByteBuffer getFrozenCollectionElement(Cell<?> collection)
+        {
+            CollectionType<?> collectionType = (CollectionType<?>) 
column.type.unwrap();
+            return 
collectionType.getSerializer().getSerializedValue(collection.buffer(), 
path.get(0), collectionType.nameComparator());
+        }
 
-        if (column().isPartitionKey())
-            return getPartitionKey(data);
-        else if (column().isClusteringColumn())
-            return getClusteringKey(data);
+        public AbstractType<?> getFrozenCollectionElementType()
+        {
+            CollectionType<?> type = (CollectionType<?>) column.type.unwrap();
+            if (type instanceof ListType) return Int32Type.instance; // by 
index is the only thing supported right now; see getFrozenCollectionElement
+            return type.nameComparator();
+        }
 
-        ColumnData columnData = getColumnData(data);
+        public ByteBuffer getFrozenFieldValue(Cell<?> udt)
+        {
+            UserType userType = (UserType) column.type.unwrap();
+            int field = ByteBufferUtil.getUnsignedShort(path.get(0), 0);
+            List<ByteBuffer> tuple = userType.unpack(udt.buffer());
+            return tuple.size() > field ? tuple.get(field) : null;
+        }
 
-        if (columnData == null)
-            return null;
+        public AbstractType<?> getFieldSelectionType()
+        {
+            assert isFieldSelection() : "No field selection type exists";
+            UserType userType = (UserType) column.type;
+            int field = ByteBufferUtil.getUnsignedShort(path.get(0), 0);
+            return userType.fieldType(field);
+        }
 
-        if (selectsComplex())
+        public ByteBuffer toByteBuffer(TxnData data, AbstractType<?> receiver)
         {
-            ComplexColumnData complex = (ComplexColumnData) columnData;
+            // TODO: confirm all references can be satisfied as part of the 
txn condition
+            AbstractType<?> type = column().type;
+
+            // Modify the type we'll check if the reference is to a collection 
element.
+            if (selectsPath())
+            {
+                if (type.isCollection())
+                {
+                    CollectionType<?> collectionType = (CollectionType<?>) 
type;
+                    type = collectionType.kind == SET ? 
collectionType.nameComparator() : collectionType.valueComparator();
+                }
+                else if (type.isUDT())
+                    type = getFieldSelectionType();
+            }
+
+            // Account for frozen collection and reversed clustering key 
references:
+            AbstractType<?> receiveType = type.isFrozenCollection() ? 
receiver.freeze().unwrap() : receiver.unwrap();
+            if (!(receiveType == type.unwrap()))
+                throw new IllegalArgumentException("Receiving type " + 
receiveType + " does not match " + type.unwrap());
+
+            if (column().isPartitionKey())
+                return getPartitionKey(data);
+            else if (column().isClusteringColumn())
+                return getClusteringKey(data);
+
+            ColumnData columnData = getColumnData(data);
+
+            if (columnData == null)
+                return null;
 
-            if (type instanceof CollectionType)
+            if (selectsComplex())
             {
-                CollectionType<?> col = (CollectionType<?>) type;
-                return col.serializeForNativeProtocol(complex.iterator());
+                ComplexColumnData complex = (ComplexColumnData) columnData;
+
+                if (type instanceof CollectionType)
+                {
+                    CollectionType<?> col = (CollectionType<?>) type;
+                    return col.serializeForNativeProtocol(complex.iterator());
+                }
+                else if (type instanceof UserType)
+                {
+                    UserType udt = (UserType) type;
+                    return udt.serializeForNativeProtocol(complex.iterator());
+                }
+
+                throw new UnsupportedOperationException("Unsupported complex 
type: " + type);
             }
-            else if (type instanceof UserType)
+            else if (selectsFrozenCollectionElement())
             {
-                UserType udt = (UserType) type;
-                return udt.serializeForNativeProtocol(complex.iterator());
+                // If a path is selected for a non-frozen collection, the 
element will already be materialized.
+                return getFrozenCollectionElement((Cell<?>) columnData);
+            }
+            else if (selectsFrozenUDTField())
+            {
+                return getFrozenFieldValue((Cell<?>) columnData);
             }
 
-            throw new UnsupportedOperationException("Unsupported complex type: 
" + type);
+            Cell<?> cell = (Cell<?>) columnData;
+            return selectsSetElement() ? cell.path().get(0) : cell.buffer();
         }
-        else if (selectsFrozenCollectionElement())
+
+        private boolean selectsComplex()
         {
-            // If a path is selected for a non-frozen collection, the element 
will already be materialized.
-            return getFrozenCollectionElement((Cell<?>) columnData);
+            return column.isComplex() && path == null;
         }
-        else if (selectsFrozenUDTField())
+
+        private boolean selectsSetElement()
         {
-            return getFrozenFieldValue((Cell<?>) columnData);
+            return selectsPath() && column.type instanceof SetType;
         }
 
-        Cell<?> cell = (Cell<?>) columnData;
-        return selectsSetElement() ? cell.path().get(0) : cell.buffer();
-    }
+        private boolean selectsFrozenCollectionElement()
+        {
+            return selectsPath() && column.type.isFrozenCollection();
+        }
 
-    private boolean selectsComplex()
-    {
-        return column.isComplex() && path == null;
-    }
+        private boolean selectsFrozenUDTField()
+        {
+            return selectsPath() && column.type.isUDT() && 
!column.type.isMultiCell();
+        }
 
-    private boolean selectsSetElement()
-    {
-        return selectsPath() && column.type instanceof SetType;
-    }
+        @Override
+        public boolean equals(Object o)
+        {
+            if (o == null || getClass() != o.getClass()) return false;
+            ColumnReference that = (ColumnReference) o;
+            return tuple == that.tuple && Objects.equals(table, that.table) && 
Objects.equals(column, that.column) && Objects.equals(path, that.path);
+        }
 
-    private boolean selectsFrozenCollectionElement()
-    {
-        return selectsPath() && column.type.isFrozenCollection();
-    }
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(tuple, table, column, path);
+        }
 
-    private boolean selectsFrozenUDTField()
-    {
-        return selectsPath() && column.type.isUDT() && 
!column.type.isMultiCell();
+        @Override
+        public String toString()
+        {
+            StringBuilder sb = new StringBuilder().append(tuple);
+            
sb.append(':').append(column.ksName).append('.').append(column.cfName).append('.').append(column.name.toString());
+            if (path != null)
+                sb.append('#').append(path);
+            return sb.toString();
+        }
     }
 
-    static final ParameterisedVersionedSerializer<TxnReference, 
TableMetadatas, Version> serializer = new ParameterisedVersionedSerializer<>()
+    private static class ClusteringColumnData extends AbstractCell<ByteBuffer>
     {
+        private static final long EMPTY_SIZE = ObjectSizes.measure(new 
ClusteringColumnData(null, null));
+        private final ByteBuffer value;
+
+        private ClusteringColumnData(ColumnMetadata column, ByteBuffer value)
+        {
+            super(column);
+            this.value = value;
+        }
+
         @Override
-        public void serialize(TxnReference reference, TableMetadatas tables, 
DataOutputPlus out, Version version) throws IOException
+        public ByteBuffer value()
         {
-            out.writeUnsignedVInt32(reference.tuple);
-            out.writeBoolean(reference.column != null);
-            if (reference.column != null)
-            {
-                tables.serialize(reference.table, out);
-                columnMetadataSerializer.serialize(reference.column, 
reference.table, out);
-            }
-            out.writeBoolean(reference.path != null);
-            if (reference.path != null)
-                CollectionType.cellPathSerializer.serialize(reference.path, 
out);
+            return value;
         }
 
         @Override
-        public TxnReference deserialize(TableMetadatas tables, DataInputPlus 
in, Version version) throws IOException
+        public ValueAccessor<ByteBuffer> accessor()
         {
-            int name = in.readUnsignedVInt32();
-            TableMetadata table = null;
-            ColumnMetadata column = null;
-            if (in.readBoolean())
-            {
-                table = tables.deserialize(in);
-                column = columnMetadataSerializer.deserialize(table, in);
-            }
-            CellPath path = in.readBoolean() ? 
CollectionType.cellPathSerializer.deserialize(in) : null;
-            return new TxnReference(name, table, column, path);
+            return ByteBufferAccessor.instance;
         }
 
         @Override
-        public long serializedSize(TxnReference reference, TableMetadatas 
tables, Version version)
+        public boolean isTombstone()
         {
-            long size = 0;
-            size += VIntCoding.sizeOfUnsignedVInt(reference.tuple);
-            size += TypeSizes.BOOL_SIZE;
-            if (reference.column != null)
-            {
-                size += tables.serializedSize(reference.table);
-                size += 
columnMetadataSerializer.serializedSize(reference.column, reference.table);
-            }
-            size += TypeSizes.BOOL_SIZE;
-            if (reference.path != null)
-                size += 
CollectionType.cellPathSerializer.serializedSize(reference.path);
-            return size;
+            return false;
         }
-    };
+
+        @Override
+        public long unsharedHeapSizeExcludingData()
+        {
+            return EMPTY_SIZE;
+        }
+
+        @Override
+        public long unsharedHeapSize()
+        {
+            return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(value);
+        }
+
+        @Override
+        public long timestamp()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int ttl()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CellPath path()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Cell<?> withUpdatedColumn(ColumnMetadata newColumn)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Cell<?> withUpdatedValue(ByteBuffer newValue)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Cell<?> withUpdatedTimestamp(long newTimestamp)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Cell<?> withUpdatedTimestampAndLocalDeletionTime(long 
newTimestamp, long newLocalDeletionTime)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Cell<?> withSkippedValue()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        protected int localDeletionTimeAsUnsignedInt()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java
index 21c22b5a57..c453f612b2 100644
--- 
a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java
+++ 
b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java
@@ -43,14 +43,13 @@ import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.db.marshal.TupleType;
 import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.db.rows.CellPath;
-import org.apache.cassandra.io.ParameterisedVersionedSerializer;
+import org.apache.cassandra.io.ParameterisedUnversionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.accord.AccordSerializers;
 import org.apache.cassandra.service.accord.serializers.TableMetadatas;
-import org.apache.cassandra.service.accord.serializers.Version;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.db.marshal.CollectionType.Kind.MAP;
@@ -265,15 +264,15 @@ public class TxnReferenceOperation
         return new Constants.Value(bytes);
     }
 
-    static final ParameterisedVersionedSerializer<TxnReferenceOperation, 
TableMetadatas, Version> serializer = new ParameterisedVersionedSerializer<>()
+    static final ParameterisedUnversionedSerializer<TxnReferenceOperation, 
TableMetadatas> serializer = new ParameterisedUnversionedSerializer<>()
     {
         @Override
-        public void serialize(TxnReferenceOperation operation, TableMetadatas 
tables, DataOutputPlus out, Version version) throws IOException
+        public void serialize(TxnReferenceOperation operation, TableMetadatas 
tables, DataOutputPlus out) throws IOException
         {
             out.writeByte(operation.kind.id);
             tables.serialize(operation.table, out);
             columnMetadataSerializer.serialize(operation.receiver, 
operation.table, out);
-            TxnReferenceValue.serializer.serialize(operation.value, tables, 
out, version);
+            TxnReferenceValue.serializer.serialize(operation.value, tables, 
out);
 
             out.writeBoolean(operation.key != null);
             if (operation.key != null)
@@ -285,24 +284,24 @@ public class TxnReferenceOperation
         }
 
         @Override
-        public TxnReferenceOperation deserialize(TableMetadatas tables, 
DataInputPlus in, Version version) throws IOException
+        public TxnReferenceOperation deserialize(TableMetadatas tables, 
DataInputPlus in) throws IOException
         {
             Kind kind = Kind.from(in.readByte());
             TableMetadata table = tables.deserialize(in);
             ColumnMetadata receiver = 
columnMetadataSerializer.deserialize(table, in);
-            TxnReferenceValue value = 
TxnReferenceValue.serializer.deserialize(tables, in, version);
+            TxnReferenceValue value = 
TxnReferenceValue.serializer.deserialize(tables, in);
             ByteBuffer key = in.readBoolean() ? 
ByteBufferUtil.readWithVIntLength(in) : null;
             ByteBuffer field = in.readBoolean() ? 
ByteBufferUtil.readWithVIntLength(in) : null;
             return new TxnReferenceOperation(kind, receiver, table, key, 
field, value);
         }
 
         @Override
-        public long serializedSize(TxnReferenceOperation operation, 
TableMetadatas tables, Version version)
+        public long serializedSize(TxnReferenceOperation operation, 
TableMetadatas tables)
         {
             long size = Byte.BYTES;
             size +=  tables.serializedSize(operation.table);
             size += 
columnMetadataSerializer.serializedSize(operation.receiver, operation.table);
-            size += 
TxnReferenceValue.serializer.serializedSize(operation.value, tables, version);
+            size += 
TxnReferenceValue.serializer.serializedSize(operation.value, tables);
 
             if (operation.key != null)
                 size += 
ByteBufferUtil.serializedSizeWithVIntLength(operation.key);
diff --git 
a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java
index 679106be11..7170d14503 100644
--- 
a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java
+++ 
b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java
@@ -100,8 +100,8 @@ public class TxnReferenceOperations
             out.writeBoolean(operations.clustering != null);
             if (operations.clustering != null)
                 Clustering.serializer.serialize(operations.clustering, out, 
version.messageVersion(), operations.metadata.comparator.subtypes());
-            serializeList(operations.regulars, tables, out, version, 
TxnReferenceOperation.serializer);
-            serializeList(operations.statics, tables, out, version, 
TxnReferenceOperation.serializer);
+            serializeList(operations.regulars, tables, out, 
TxnReferenceOperation.serializer);
+            serializeList(operations.statics, tables, out, 
TxnReferenceOperation.serializer);
         }
 
         @Override
@@ -112,8 +112,8 @@ public class TxnReferenceOperations
 
             TableMetadata metadata = tables.deserialize(in);
             Clustering<?> clustering = in.readBoolean() ? 
Clustering.serializer.deserialize(in, version.messageVersion(), 
metadata.comparator.subtypes()) : null;
-            return new TxnReferenceOperations(metadata, clustering, 
deserializeList(tables, in, version, TxnReferenceOperation.serializer),
-                                              deserializeList(tables, in, 
version, TxnReferenceOperation.serializer));
+            return new TxnReferenceOperations(metadata, clustering, 
deserializeList(tables, in, TxnReferenceOperation.serializer),
+                                              deserializeList(tables, in, 
TxnReferenceOperation.serializer));
         }
 
         @Override
@@ -126,8 +126,8 @@ public class TxnReferenceOperations
             size += TypeSizes.BOOL_SIZE;
             if (operations.clustering != null)
                 size += 
Clustering.serializer.serializedSize(operations.clustering, 
version.messageVersion(), operations.metadata.comparator.subtypes());
-            size += serializedListSize(operations.regulars, tables, version, 
TxnReferenceOperation.serializer);
-            size +=  serializedListSize(operations.statics, tables, version, 
TxnReferenceOperation.serializer);
+            size += serializedListSize(operations.regulars, tables, 
TxnReferenceOperation.serializer);
+            size +=  serializedListSize(operations.statics, tables, 
TxnReferenceOperation.serializer);
             return size;
         }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceValue.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceValue.java
index 7dbcea1c93..9ff76c026c 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceValue.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceValue.java
@@ -22,22 +22,23 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
+import javax.annotation.Nullable;
+
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.ParameterisedVersionedSerializer;
+import org.apache.cassandra.io.ParameterisedUnversionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.accord.serializers.TableMetadatas;
-import org.apache.cassandra.service.accord.serializers.Version;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public abstract class TxnReferenceValue
 {
     private interface Serializer<T extends TxnReferenceValue>
     {
-        void serialize(T t, TableMetadatas tables, DataOutputPlus out, Version 
version) throws IOException;
-        T deserialize(TableMetadatas tables, DataInputPlus in, Version 
version, Kind kind) throws IOException;
-        long serializedSize(T t, TableMetadatas tables, Version version);
+        void serialize(T t, TableMetadatas tables, DataOutputPlus out) throws 
IOException;
+        T deserialize(TableMetadatas tables, DataInputPlus in, Kind kind) 
throws IOException;
+        long serializedSize(T t, TableMetadatas tables);
     }
 
     enum Kind
@@ -60,9 +61,10 @@ public abstract class TxnReferenceValue
 
     public static class Constant extends TxnReferenceValue
     {
+        @Nullable
         private final ByteBuffer value;
 
-        public Constant(ByteBuffer value)
+        public Constant(@Nullable ByteBuffer value)
         {
             this.value = value;
         }
@@ -78,7 +80,7 @@ public abstract class TxnReferenceValue
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
             Constant constant = (Constant) o;
-            return value.equals(constant.value);
+            return Objects.equals(value, constant.value);
         }
 
         @Override
@@ -90,7 +92,7 @@ public abstract class TxnReferenceValue
         @Override
         public String toString()
         {
-            return "Constant=" + ByteBufferUtil.bytesToHex(value);
+            return "Constant=" + value == null ? "null" : 
ByteBufferUtil.bytesToHex(value);
         }
 
         @Override
@@ -113,30 +115,35 @@ public abstract class TxnReferenceValue
         private static final Serializer<Constant> serializer = new 
Serializer<Constant>()
         {
             @Override
-            public void serialize(Constant constant, TableMetadatas tables, 
DataOutputPlus out, Version version) throws IOException
+            public void serialize(Constant constant, TableMetadatas tables, 
DataOutputPlus out) throws IOException
             {
-                ByteBufferUtil.writeWithVIntLength(constant.value, out);
+                out.writeBoolean(constant.value != null);
+                if (constant.value != null)
+                    ByteBufferUtil.writeWithVIntLength(constant.value, out);
             }
 
             @Override
-            public Constant deserialize(TableMetadatas tables, DataInputPlus 
in, Version version, Kind kind) throws IOException
+            public Constant deserialize(TableMetadatas tables, DataInputPlus 
in, Kind kind) throws IOException
             {
-                return new Constant(ByteBufferUtil.readWithVIntLength(in));
+                return new Constant(in.readBoolean() ? 
ByteBufferUtil.readWithVIntLength(in) : null);
             }
 
             @Override
-            public long serializedSize(Constant constant, TableMetadatas 
tables, Version version)
+            public long serializedSize(Constant constant, TableMetadatas 
tables)
             {
-                return 
ByteBufferUtil.serializedSizeWithVIntLength(constant.value);
+                long size = TypeSizes.sizeof(constant.value != null);
+                if (constant.value != null)
+                    size += 
ByteBufferUtil.serializedSizeWithVIntLength(constant.value);
+                return size;
             }
         };
     }
 
     public static class Substitution extends TxnReferenceValue
     {
-        private final TxnReference reference;
+        private final TxnReference.ColumnReference reference;
 
-        public Substitution(TxnReference reference)
+        public Substitution(TxnReference.ColumnReference reference)
         {
             this.reference = reference;
         }
@@ -183,47 +190,47 @@ public abstract class TxnReferenceValue
         private static final Serializer<Substitution> serializer = new 
Serializer<>()
         {
             @Override
-            public void serialize(Substitution substitution, TableMetadatas 
tables, DataOutputPlus out, Version version) throws IOException
+            public void serialize(Substitution substitution, TableMetadatas 
tables, DataOutputPlus out) throws IOException
             {
-                TxnReference.serializer.serialize(substitution.reference, 
tables, out, version);
+                
TxnReference.columnSerializer.serialize(substitution.reference, tables, out);
             }
 
             @Override
-            public Substitution deserialize(TableMetadatas tables, 
DataInputPlus in, Version version, Kind kind) throws IOException
+            public Substitution deserialize(TableMetadatas tables, 
DataInputPlus in, Kind kind) throws IOException
             {
-                return new 
Substitution(TxnReference.serializer.deserialize(tables, in, version));
+                return new 
Substitution(TxnReference.columnSerializer.deserialize(tables, in));
             }
 
             @Override
-            public long serializedSize(Substitution substitution, 
TableMetadatas tables, Version version)
+            public long serializedSize(Substitution substitution, 
TableMetadatas tables)
             {
-                return 
TxnReference.serializer.serializedSize(substitution.reference, tables, version);
+                return 
TxnReference.columnSerializer.serializedSize(substitution.reference, tables);
             }
         };
     }
 
-    static final ParameterisedVersionedSerializer<TxnReferenceValue, 
TableMetadatas, Version> serializer = new ParameterisedVersionedSerializer<>()
+    static final ParameterisedUnversionedSerializer<TxnReferenceValue, 
TableMetadatas> serializer = new ParameterisedUnversionedSerializer<>()
     {
         @SuppressWarnings("unchecked")
         @Override
-        public void serialize(TxnReferenceValue value, TableMetadatas tables, 
DataOutputPlus out, Version version) throws IOException
+        public void serialize(TxnReferenceValue value, TableMetadatas tables, 
DataOutputPlus out) throws IOException
         {
             out.writeUnsignedVInt32(value.kind().ordinal());
-            value.kind().serializer.serialize(value, tables, out, version);
+            value.kind().serializer.serialize(value, tables, out);
         }
 
         @Override
-        public TxnReferenceValue deserialize(TableMetadatas tables, 
DataInputPlus in, Version version) throws IOException
+        public TxnReferenceValue deserialize(TableMetadatas tables, 
DataInputPlus in) throws IOException
         {
             Kind kind = Kind.values()[in.readUnsignedVInt32()];
-            return kind.serializer.deserialize(tables, in, version, kind);
+            return kind.serializer.deserialize(tables, in, kind);
         }
 
         @SuppressWarnings("unchecked")
         @Override
-        public long serializedSize(TxnReferenceValue value, TableMetadatas 
tables, Version version)
+        public long serializedSize(TxnReferenceValue value, TableMetadatas 
tables)
         {
-            return TypeSizes.sizeofUnsignedVInt(value.kind().ordinal()) + 
value.kind().serializer.serializedSize(value, tables, version);
+            return TypeSizes.sizeofUnsignedVInt(value.kind().ordinal()) + 
value.kind().serializer.serializedSize(value, tables);
         }
     };
 }
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
index 1fdc0e54f9..25e85fd0bc 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
@@ -77,7 +77,7 @@ public class TxnUpdate extends AccordUpdate
     final TableMetadatas tables;
     private final Keys keys;
     private final ByteBuffer[] fragments;
-    private final AbstractSerialized<TxnCondition, TableMetadatas> condition;
+    private final SerializedTxnCondition condition;
 
     @Nullable
     private final ConsistencyLevel cassandraCommitCL;
@@ -107,7 +107,7 @@ public class TxnUpdate extends AccordUpdate
         this.preserveTimestamps = preserveTimestamps;
     }
 
-    private TxnUpdate(TableMetadatas tables, Keys keys, ByteBuffer[] 
fragments, AbstractSerialized<TxnCondition, TableMetadatas> condition, 
ConsistencyLevel cassandraCommitCL, boolean preserveTimestamps)
+    private TxnUpdate(TableMetadatas tables, Keys keys, ByteBuffer[] 
fragments, SerializedTxnCondition condition, ConsistencyLevel 
cassandraCommitCL, boolean preserveTimestamps)
     {
         this.tables = tables;
         this.keys = keys;
@@ -261,14 +261,14 @@ public class TxnUpdate extends AccordUpdate
         return updates;
     }
 
-    public static final AccordUpdateSerializer<TxnUpdate> serializer = new 
AccordUpdateSerializer<TxnUpdate>()
+    public static final AccordUpdateSerializer<TxnUpdate> serializer = new 
AccordUpdateSerializer<>()
     {
         @Override
         public void serialize(TxnUpdate update, TableMetadatasAndKeys 
tablesAndKeys, DataOutputPlus out, Version version) throws IOException
         {
             out.writeByte(update.preserveTimestamps ? FLAG_PRESERVE_TIMESTAMPS 
: 0);
             tablesAndKeys.serializeKeys(update.keys, out);
-            writeWithVIntLength(update.condition.bytes(tablesAndKeys.tables, 
version), out);
+            writeWithVIntLength(update.condition.bytes(), out);
             serializeArray(update.fragments, out, 
ByteBufferUtil.byteBufferSerializer);
             serializeNullable(update.cassandraCommitCL, out, 
consistencyLevelSerializer);
         }
@@ -290,7 +290,7 @@ public class TxnUpdate extends AccordUpdate
         {
             long size = 1; // flags
             size += tablesAndKeys.serializedKeysSize(update.keys);
-            size += 
serializedSizeWithVIntLength(update.condition.bytes(tablesAndKeys.tables, 
version));
+            size += serializedSizeWithVIntLength(update.condition.bytes());
             size += serializedArraySize(update.fragments, 
ByteBufferUtil.byteBufferSerializer);
             size += serializedNullableSize(update.cassandraCommitCL, 
consistencyLevelSerializer);
             return size;
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
index abd27ae588..1df846ed2c 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
@@ -85,7 +85,7 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
 
     private static final long EMPTY_SIZE = 
ObjectSizes.measure(EMPTY_CONDITION_FAILED);
 
-    public static class Update extends AbstractSerialized<PartitionUpdate, 
TableMetadatas>
+    public static class Update extends 
AbstractParameterisedVersionedSerialized<PartitionUpdate, TableMetadatas>
     {
         private static final long EMPTY_SIZE = ObjectSizes.measure(new 
Update(null, 0, ByteBufferUtil.EMPTY_BYTE_BUFFER));
         public final PartitionKey key;
diff --git a/src/java/org/apache/cassandra/utils/CollectionSerializers.java 
b/src/java/org/apache/cassandra/utils/CollectionSerializers.java
index 81173bdf1e..d4718096a3 100644
--- a/src/java/org/apache/cassandra/utils/CollectionSerializers.java
+++ b/src/java/org/apache/cassandra/utils/CollectionSerializers.java
@@ -435,6 +435,15 @@ public class CollectionSerializers
         return size;
     }
 
+    public static <V, P, L extends List<V>> long serializedListSize(L values, 
P p, AsymmetricParameterisedUnversionedSerializer<V, P, ?> valueSerializer)
+    {
+        int items = values.size();
+        long size = sizeofUnsignedVInt(items);
+        for (int i = 0 ; i < items ; ++i)
+            size += valueSerializer.serializedSize(values.get(i), p);
+        return size;
+    }
+
     public static <V, P, L extends List<V>, Version> long serializedListSize(L 
values, P p, Version version, AsymmetricParameterisedVersionedSerializer<V, P, 
?, Version> valueSerializer)
     {
         int items = values.size();
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java 
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java
index f382675955..03da925f98 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java
@@ -19,6 +19,14 @@
 package org.apache.cassandra.cql3.validation.operations;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -28,7 +36,12 @@ import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.Duration;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.UntypedResultSet.Row;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class InsertTest extends CQLTester.Fuzzed
 {
@@ -296,4 +309,43 @@ public class InsertTest extends CQLTester.Fuzzed
         assertInvalidThrow(InvalidRequestException.class,
                            "INSERT INTO %s (a, b) VALUES ('foo', ?)", new 
String(TOO_BIG.array()));
     }
+
+    @Test
+    public void testMapEmptyValueMeaningless()
+    {
+        createTable("CREATE TABLE %s(pk int primary key, v1 map<int, int>, v2 
frozen<map<int, int>>)");
+        ByteBuffer value = MapType.getInstance(Int32Type.instance, 
Int32Type.instance, true).pack(Arrays.asList(ByteBufferUtil.EMPTY_BYTE_BUFFER, 
ByteBufferUtil.EMPTY_BYTE_BUFFER));
+        execute("INSERT INTO %s(pk, v1, v2) VALUES (?, ?, ?)", 0, value, 
value);
+
+        Map<Integer, Integer> expected = new HashMap<>();
+        expected.put(null, null);
+        assertRows(execute("SELECT * FROM %s"),
+                   row(0, expected, expected));
+    }
+
+    @Test
+    public void testListEmptyValueMeaningless()
+    {
+        createTable("CREATE TABLE %s(pk int primary key, v1 list<int>, v2 
frozen<list<int>>)");
+        ByteBuffer value = ListType.getInstance(Int32Type.instance, 
true).pack(Collections.singletonList(ByteBufferUtil.EMPTY_BYTE_BUFFER));
+        execute("INSERT INTO %s(pk, v1, v2) VALUES (?, ?, ?)", 0, value, 
value);
+
+        List<Integer> expected = new ArrayList<>();
+        expected.add(null);
+        assertRows(execute("SELECT * FROM %s"),
+                   row(0, expected, expected));
+    }
+
+    @Test
+    public void testSetEmptyValueMeaningless()
+    {
+        createTable("CREATE TABLE %s(pk int primary key, v1 set<int>, v2 
frozen<set<int>>)");
+        ByteBuffer value = SetType.getInstance(Int32Type.instance, 
true).pack(Collections.singletonList(ByteBufferUtil.EMPTY_BYTE_BUFFER));
+        execute("INSERT INTO %s(pk, v1, v2) VALUES (?, ?, ?)", 0, value, 
value);
+
+        Set<Integer> expected = new HashSet<>();
+        expected.add(null);
+        assertRows(execute("SELECT * FROM %s"),
+                   row(0, expected, expected));
+    }
 }
diff --git a/test/unit/org/apache/cassandra/db/marshal/AbstractTypeTest.java 
b/test/unit/org/apache/cassandra/db/marshal/AbstractTypeTest.java
index 84701a2115..fd3b9b68a8 100644
--- a/test/unit/org/apache/cassandra/db/marshal/AbstractTypeTest.java
+++ b/test/unit/org/apache/cassandra/db/marshal/AbstractTypeTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.db.marshal;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.UncheckedIOException;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Modifier;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
@@ -111,6 +112,7 @@ import 
org.apache.cassandra.utils.asserts.SoftAssertionsWithLimit;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable;
 import org.apache.cassandra.utils.bytecomparable.ByteSource;
 import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
+import org.assertj.core.api.Assertions;
 import org.assertj.core.api.SoftAssertions;
 import org.assertj.core.description.Description;
 import org.quicktheories.core.Gen;
@@ -244,6 +246,79 @@ public class AbstractTypeTest
             throw new AssertionError("Uncovered types:\n" + sb);
     }
 
+    @Test
+    @SuppressWarnings("rawtypes")
+    public void meaninglessEmptyness()
+    {
+        // this test just makes sure that all types are covered and no new 
type is left out
+        Set<Class<? extends AbstractType>> subTypes = 
reflections.getSubTypesOf(AbstractType.class);
+        for (var klass : subTypes)
+        {
+            if (Modifier.isAbstract(klass.getModifiers()))
+                continue;
+            if (isTestType(klass))
+                continue;
+            if (isPrefixCompositeType(klass))
+                continue;
+            AbstractType<?> type = null;
+            for (var f : klass.getDeclaredFields())
+            {
+                if (!(Modifier.isPublic(f.getModifiers()) && 
Modifier.isStatic(f.getModifiers())))
+                    continue;
+                if (AbstractType.class.isAssignableFrom(f.getType()))
+                {
+                    try
+                    {
+                        type = (AbstractType<?>) f.get(null);
+                        break;
+                    }
+                    catch (IllegalAccessException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            if (type == null)
+            {
+                for (var c : klass.getDeclaredConstructors())
+                {
+                    if (c.getParameterCount() == 0)
+                    {
+                        try
+                        {
+                            type = (AbstractType<?>) c.newInstance();
+                            break;
+                        }
+                        catch (InstantiationException | IllegalAccessException 
| InvocationTargetException e)
+                        {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }
+            }
+            if (type == null)
+                continue;
+            if (type.isEmptyValueMeaningless())
+            {
+                AbstractType<?> finalType = type;
+                // some types (such as TimeUUID) have the same equqlas, so 
equality checks don't work here, need reference checks
+                
Assertions.assertThat(AbstractTypeGenerators.MEANINGLESS_EMPTYNESS)
+                          .describedAs("New type %s detected that says its 
emptyness is meaningless, but it isn't allowed to be!  This is a legacy concept 
only!", type.getClass())
+                          .anyMatch(t -> t == finalType);
+
+                
Assertions.assertThat(type.isNull(ByteBufferUtil.EMPTY_BYTE_BUFFER)).isTrue();
+            }
+        }
+    }
+
+    @Test
+    public void onlyMeaninglessEmptyness()
+    {
+        
qt().forAll(Generators.filter(AbstractTypeGenerators.builder().withDefaultSizeGen(1).build(),
 t -> 
!AbstractTypeGenerators.MEANINGLESS_EMPTYNESS.contains(t))).checkAssert(type -> 
{
+            Assertions.assertThat(type.isEmptyValueMeaningless()).isFalse();
+        });
+    }
+
     @SuppressWarnings("rawtypes")
     private boolean isTestType(Class<? extends AbstractType> klass)
     {
diff --git a/test/unit/org/apache/cassandra/db/partitions/SimplePartition.java 
b/test/unit/org/apache/cassandra/db/partitions/SimplePartition.java
new file mode 100644
index 0000000000..d139874687
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/partitions/SimplePartition.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.transform.FilteredRows;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.TimeUUID;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.HeapCloner;
+import org.apache.cassandra.utils.memory.HeapPool;
+
+public class SimplePartition extends AbstractBTreePartition
+{
+    static
+    {
+        DatabaseDescriptor.clientInitialization(false); // if the user setup 
DD respect w/e was done
+        if (DatabaseDescriptor.getPartitioner() == null)
+            
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+    }
+    public static final int DEFAULT_TIMESTAMP = 42;
+    private static final HeapPool POOL = new HeapPool(Long.MAX_VALUE, 1.0f, () 
-> ImmediateFuture.success(Boolean.TRUE));
+
+    private final OpOrder writeOrder = new OpOrder();
+    private final AtomicBTreePartition delegate;
+
+    public SimplePartition(TableMetadata metadata, DecoratedKey partitionKey)
+    {
+        super(partitionKey);
+        delegate = new 
AtomicBTreePartition(TableMetadataRef.forOfflineTools(metadata), partitionKey, 
POOL.newAllocator(metadata.toString()));
+    }
+
+    @Override
+    protected BTreePartitionData holder()
+    {
+        return delegate.holder();
+    }
+
+    @Override
+    protected boolean canHaveShadowedData()
+    {
+        return false;
+    }
+
+    @Override
+    public TableMetadata metadata()
+    {
+        return delegate.metadata();
+    }
+
+    public SimplePartition clear()
+    {
+        delegate.unsafeSetHolder(BTreePartitionData.EMPTY);
+        return this;
+    }
+
+    public SimplePartition add(Row row)
+    {
+        PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata(), 
partitionKey, row);
+        try (OpOrder.Group group = writeOrder.start())
+        {
+            delegate.addAll(update, HeapCloner.instance, group, 
UpdateTransaction.NO_OP);
+        }
+        return this;
+    }
+
+    public RowBuilder add(Clustering<?> ck)
+    {
+        return new RowBuilder(ck);
+    }
+
+    public SimplePartition addEmpty(Clustering<?> ck)
+    {
+        return add(ck).build();
+    }
+
+    public SimplePartition addEmptyAndLive(Clustering<?> ck)
+    {
+        return addEmptyAndLive(ck, DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP);
+    }
+
+    public SimplePartition addEmptyAndLive(Clustering<?> ck, long timestamp, 
long nowInSec)
+    {
+        return add(ck).liveness(timestamp, nowInSec).build();
+    }
+
+    public RowIterator filtered()
+    {
+        return FilteredRows.filter(unfilteredIterator(), DEFAULT_TIMESTAMP);
+    }
+
+    public class RowBuilder
+    {
+        private final Row.Builder builder = BTreeRow.unsortedBuilder();
+        private long timestamp = DEFAULT_TIMESTAMP;
+
+        public RowBuilder(Clustering<?> ck)
+        {
+            builder.newRow(ck);
+        }
+
+        public RowBuilder timestamp(long timestamp)
+        {
+            this.timestamp = timestamp;
+            return this;
+        }
+
+        public RowBuilder liveness(long timestamp, long nowInSec)
+        {
+            builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(timestamp, 
nowInSec));
+            return this;
+        }
+
+        public RowBuilder add(ColumnMetadata column, ByteBuffer value)
+        {
+            if (column.type.unwrap().isMultiCell())
+                throw new IllegalArgumentException("Unable to add a single 
value to a multi cell column " + column);
+            builder.addCell(BufferCell.live(column, timestamp, value));
+            return this;
+        }
+
+        public RowBuilder addComplex(ColumnMetadata column, List<ByteBuffer> 
values)
+        {
+            if (!column.type.unwrap().isMultiCell())
+                throw new IllegalArgumentException("Unable to add multiple 
values to a regular column " + column);
+            builder.addComplexDeletion(column, DeletionTime.build(timestamp - 
1, timestamp - 1));
+            // map needs to be specially handled as its key/value
+            if (column.type.unwrap() instanceof MapType)
+            {
+                for (int i = 0; i < values.size(); i = i + 2)
+                {
+                    ByteBuffer key = values.get(i);
+                    ByteBuffer value = values.get(i + 1);
+                    builder.addCell(BufferCell.live(column, timestamp, value, 
CellPath.create(key)));
+                }
+            }
+            else
+            {
+                for (int i = 0; i < values.size(); i++)
+                    builder.addCell(complexCell(column, i, values.get(i), 
timestamp));
+            }
+            return this;
+        }
+
+        private Cell<?> complexCell(ColumnMetadata column, int idx, ByteBuffer 
value, long timestamp)
+        {
+            var type = column.type.unwrap();
+            if (type.isCollection())
+            {
+                CollectionType<?> ct = (CollectionType<?>) type;
+                switch (ct.kind)
+                {
+                    case SET:
+                    {
+                        // this isn't correct... the value isn't actually 
known... so only support map with key/value matching...
+                        return BufferCell.live(column, timestamp, value, 
CellPath.create(value));
+                    }
+                    case LIST:
+                    {
+                        // this isn't actually correct, as the cellpath is 
based off time, but a counter is used to keep things deterministic
+                        CellPath path = 
CellPath.create(ByteBuffer.wrap(TimeUUID.Generator.atUnixMillisAsBytes(idx)));
+                        return BufferCell.live(column, timestamp, value, path);
+                    }
+                    case MAP:
+                        throw new UnsupportedOperationException("Map isn't 
supported due to API being single element rather than multi-element");
+                    default:
+                        throw new 
UnsupportedOperationException(ct.kind.name());
+                }
+            }
+            else if (type.isUDT())
+            {
+                UserType ut = (UserType) type;
+                CellPath path = ut.cellPathForField(ut.fieldName(idx));
+                return BufferCell.live(column, timestamp, value, path);
+            }
+
+            throw new UnsupportedOperationException(type.toString());
+        }
+
+        public SimplePartition build()
+        {
+            SimplePartition.this.add(builder.build());
+            return SimplePartition.this;
+        }
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/service/accord/txn/TxnConditionTest.java 
b/test/unit/org/apache/cassandra/service/accord/txn/TxnConditionTest.java
index 1cb1199f7f..01d9d50aa3 100644
--- a/test/unit/org/apache/cassandra/service/accord/txn/TxnConditionTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/txn/TxnConditionTest.java
@@ -20,11 +20,29 @@ package org.apache.cassandra.service.accord.txn;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import javax.annotation.Nullable;
 
+import accord.utils.DefaultRandom;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.db.BufferClustering;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.EmptyIterators;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.partitions.SimplePartition;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.utils.AbstractTypeGenerators;
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
 import accord.utils.Gen;
@@ -42,35 +60,97 @@ import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.accord.serializers.TableMetadatas;
-import org.apache.cassandra.service.accord.serializers.Version;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CassandraGenerators;
 import org.apache.cassandra.utils.Generators;
 
 import static accord.utils.Property.qt;
+import static org.apache.cassandra.utils.ByteBufferUtil.EMPTY_BYTE_BUFFER;
+import static org.apache.cassandra.utils.Generators.toGen;
 
 //TOOD (maintaince): rather than copy the condition supported kinds, maybe 
references directly from the type?
 public class TxnConditionTest
 {
     private static final SchemaProvider SCHEMA = new SchemaProvider();
+
     static
     {
         // ColumnMetadata serializer only stores the ks/table/name and uses 
Schema to load it
         Schema.instance = SCHEMA;
     }
 
+    private static final FieldIdentifier UDT_F1 = 
FieldIdentifier.forInternalString("f1");
+    private static final UserType UDT = new UserType("ks", 
ByteBufferUtil.bytes("udt"), Collections.singletonList(UDT_F1), 
Collections.singletonList(Int32Type.instance), true);
+    private static final UserType FROZEN_UDT = UDT.freeze();
+
+    private static final SetType<Integer> SET = 
SetType.getInstance(Int32Type.instance, true);
+    private static final SetType<Integer> FROZEN_SET = SET.freeze();
+
+    private static ColumnIdentifier name(ColumnMetadata.Kind kind, int offset)
+    {
+        StringBuilder sb = new StringBuilder(3);
+        switch (kind)
+        {
+            case PARTITION_KEY:
+                sb.append("pk");
+                break;
+            case CLUSTERING:
+                sb.append("ck");
+                break;
+            case STATIC:
+                sb.append('s');
+                break;
+            case REGULAR:
+                sb.append('v');
+                break;
+            default:
+                throw new UnsupportedOperationException(kind.name());
+        }
+        sb.append(offset);
+        return ColumnIdentifier.getInterned(sb.toString(), false);
+    }
+    private static final ColumnIdentifier COL_PK1 = 
name(ColumnMetadata.Kind.PARTITION_KEY, 1);
+    private static final ColumnIdentifier COL_S1 = 
name(ColumnMetadata.Kind.STATIC, 1);
+    private static final ColumnIdentifier COL_S2 = 
name(ColumnMetadata.Kind.STATIC, 2);
+    private static final ColumnIdentifier COL_S3 = 
name(ColumnMetadata.Kind.STATIC, 3);
+    private static final ColumnIdentifier COL_S4 = 
name(ColumnMetadata.Kind.STATIC, 4);
+    private static final ColumnIdentifier COL_S5 = 
name(ColumnMetadata.Kind.STATIC, 5);
+    private static final ColumnIdentifier COL_CK1 = 
name(ColumnMetadata.Kind.CLUSTERING, 1);
+    private static final ColumnIdentifier COL_R1 = 
name(ColumnMetadata.Kind.REGULAR, 1);
+    private static final ColumnIdentifier COL_R2 = 
name(ColumnMetadata.Kind.REGULAR, 2);
+    private static final ColumnIdentifier COL_R3 = 
name(ColumnMetadata.Kind.REGULAR, 3);
+    private static final ColumnIdentifier COL_R4 = 
name(ColumnMetadata.Kind.REGULAR, 4);
+    private static final ColumnIdentifier COL_R5 = 
name(ColumnMetadata.Kind.REGULAR, 5);
+    private static final TableMetadata tb1 = TableMetadata.builder("ks", 
"tbl1")
+                                                          
.addPartitionKeyColumn(COL_PK1, Int32Type.instance)
+                                                          
.addStaticColumn(COL_S1, Int32Type.instance)
+                                                          
.addStaticColumn(COL_S2, FROZEN_UDT)
+                                                          
.addStaticColumn(COL_S3, FROZEN_SET)
+                                                          
.addStaticColumn(COL_S4, UDT)
+                                                          
.addStaticColumn(COL_S5, SET)
+                                                          
.addClusteringColumn(COL_CK1, Int32Type.instance)
+                                                          
.addRegularColumn(COL_R1, Int32Type.instance)
+                                                          
.addRegularColumn(COL_R2, FROZEN_UDT)
+                                                          
.addRegularColumn(COL_R3, FROZEN_SET)
+                                                          
.addRegularColumn(COL_R4, UDT)
+                                                          
.addRegularColumn(COL_R5, SET)
+                                                          
.partitioner(Murmur3Partitioner.instance)
+                                                          .build();
+
+    private static final ByteBuffer INT_1 = ByteBufferUtil.bytes(1);
+
     private static Gen<TxnCondition.Kind> BOOLEAN_KIND_GEN = 
Gens.pick(TxnCondition.Kind.AND, TxnCondition.Kind.OR);
     private static Gen<TxnCondition.Kind> EXISTS_KIND_GEN = 
Gens.pick(TxnCondition.Kind.IS_NOT_NULL, TxnCondition.Kind.IS_NULL);
     private static Gen<TxnCondition.Kind> VALUE_KIND_GEN = 
Gens.pick(TxnCondition.Kind.EQUAL, TxnCondition.Kind.NOT_EQUAL,
                                                                      
TxnCondition.Kind.GREATER_THAN, TxnCondition.Kind.GREATER_THAN_OR_EQUAL,
                                                                      
TxnCondition.Kind.LESS_THAN, TxnCondition.Kind.LESS_THAN_OR_EQUAL);
     private static Gen<ProtocolVersion> PROTOCOL_VERSION_GEN = 
Gens.enums().all(ProtocolVersion.class);
-    private static Gen<ColumnMetadata> COLUM_METADATA_GEN = 
Generators.toGen(CassandraGenerators.columnMetadataGen()).map(cm -> {
+    private static Gen<ColumnMetadata> COLUM_METADATA_GEN = 
toGen(CassandraGenerators.columnMetadataGen()).map(cm -> {
         SCHEMA.add(cm);
         return cm;
     });
-    private static Gen<ByteBuffer> BYTES_GEN = 
Generators.toGen(Generators.directAndHeapBytes(0, 10));
+    private static Gen<ByteBuffer> BYTES_GEN = 
toGen(Generators.directAndHeapBytes(0, 10));
     private static Gen<TxnReference> TXN_REF_GEN = rs -> {
         {
             ColumnMetadata cm = COLUM_METADATA_GEN.next(rs);
@@ -79,12 +159,12 @@ public class TxnConditionTest
             if (!cm.isPartitionKey())
                 builder.addPartitionKeyColumn(cm.name.toString().equals("_") ? 
"__" : "_", Int32Type.instance);
             TableMetadata tm = builder.build();
-            cm = tm.getColumn(cm.name);
-            return rs.nextBoolean() ? new TxnReference(rs.nextInt(0, 
Integer.MAX_VALUE), cm, tm)
-                                    : new TxnReference(rs.nextInt(0, 
Integer.MAX_VALUE), tm, cm, CellPath.create(BYTES_GEN.next(rs)));
+            cm = tm.getExistingColumn(cm.name);
+            return rs.nextBoolean() ? TxnReference.column(rs.nextInt(0, 
Integer.MAX_VALUE), tm, cm)
+                                    : TxnReference.column(rs.nextInt(0, 
Integer.MAX_VALUE), tm, cm, CellPath.create(BYTES_GEN.next(rs)));
         }
     };
-    private static Gen<Clustering<?>> CLUSTERING_GEN = 
Generators.toGen(CassandraGenerators.CLUSTERING_GEN);
+    private static Gen<Clustering<?>> CLUSTERING_GEN = 
toGen(CassandraGenerators.CLUSTERING_GEN);
     private static Gen<ColumnCondition.Bound> BOUND_GEN = 
ColumnConditionTest.boundGen().map(b -> {
         SCHEMA.add(b.column);
         return b;
@@ -98,12 +178,317 @@ public class TxnConditionTest
             TableMetadatas.Collector collector = new 
TableMetadatas.Collector();
             condition.collect(collector);
             TableMetadatas tables = collector.build();
-            for (Version version : Version.V1.greaterThanOrEqual())
-                Serializers.testSerde(output, TxnCondition.serializer, 
condition, tables, version);
+            Serializers.testSerde(output, TxnCondition.serializer, condition, 
tables);
             SCHEMA.clear();
         });
     }
 
+    @Test
+    public void isNullWithEmptyRows()
+    {
+        DecoratedKey key = tb1.partitioner.decorateKey(EMPTY_BYTE_BUFFER);
+        TxnData data = TxnData.of(0, new 
TxnDataKeyValue(EmptyIterators.row(tb1, key, false)));
+        TxnReference row = TxnReference.row(0);
+        TxnReference pk = TxnReference.column(0, tb1, 
tb1.getExistingColumn(COL_PK1));
+        TxnReference s = TxnReference.column(0, tb1, 
tb1.getExistingColumn(COL_CK1));
+        TxnReference ck = TxnReference.column(0, tb1, 
tb1.getExistingColumn(COL_CK1));
+        TxnReference r = TxnReference.column(0, tb1, 
tb1.getExistingColumn(COL_R1));
+
+        for (TxnReference ref : Arrays.asList(row, pk, s, ck, r))
+            assertExists(data, ref, false);
+    }
+
+    @Test
+    public void isNullWithNullColumn()
+    {
+        IsNullTest simpleTest = (partition, clustering, column, nonNullValue) 
-> {
+            // now include empty row (acts as null for most types)
+            partition.clear().add(clustering)
+                     .add(column, EMPTY_BYTE_BUFFER)
+                     .build();
+            TxnData data = TxnData.of(0, new 
TxnDataKeyValue(partition.filtered()));
+            assertExists(data, TxnReference.column(0, tb1, column), 
!column.type.isNull(EMPTY_BYTE_BUFFER));
+
+            // now include row
+            partition.clear().add(clustering)
+                     .add(column, nonNullValue)
+                     .build();
+            data = TxnData.of(0, new TxnDataKeyValue(partition.filtered()));
+            assertExists(data, TxnReference.column(0, tb1, column), true);
+        };
+        IsNullFrozenFieldOrElementTest frozenFieldOrElementTest = (partition, 
clustering, column, path, nonNullValue, expectedValue) -> {
+            var columnType = column.type.unwrap();
+            var fieldOrElementType = columnType.isUDT() ? ((UserType) 
columnType).fieldType(path) : ((CollectionType<?>) columnType).nameComparator();
+            // now include empty row (acts as null for most types)
+            partition.clear().add(clustering)
+                     .add(column, EMPTY_BYTE_BUFFER)
+                     .build();
+            TxnData data = TxnData.of(0, new 
TxnDataKeyValue(partition.filtered()));
+            assertExists(data, TxnReference.column(0, tb1, column, path), 
false);
+
+            // now include row
+            partition.clear().add(clustering)
+                     .add(column, nonNullValue)
+                     .build();
+            data = TxnData.of(0, new TxnDataKeyValue(partition.filtered()));
+            assertExists(data, TxnReference.column(0, tb1, column, path), 
!fieldOrElementType.isNull(expectedValue));
+        };
+        IsNullComplexTest complexTest = (partition, clustering, column, 
values) -> {
+            // now include column without value (tombstone)
+            partition.clear().add(clustering)
+                     .addComplex(column, Collections.emptyList())
+                     .build();
+            TxnData data = TxnData.of(0, new 
TxnDataKeyValue(partition.filtered()));
+            assertExists(data, TxnReference.column(0, tb1, column), false);
+
+            if (values.isEmpty()) return; // already tested
+            // now include row
+            partition.clear().add(clustering)
+                     .addComplex(column, values)
+                     .build();
+            data = TxnData.of(0, new TxnDataKeyValue(partition.filtered()));
+            assertExists(data, TxnReference.column(0, tb1, column), true);
+        };
+        IsNullFieldOrElementTest fieldOrElementTest = (partition, clustering, 
column, path, values, expectedValue) -> {
+            var columnType = column.type.unwrap();
+            var fieldOrElementType = columnType.isUDT() ? ((UserType) 
columnType).fieldType(path) : ((CollectionType<?>) columnType).nameComparator();
+            // now include empty row (acts as null for most types)
+            partition.clear().add(clustering)
+                     .addComplex(column, Collections.emptyList())
+                     .build();
+            TxnData data = TxnData.of(0, new 
TxnDataKeyValue(partition.filtered()));
+            assertExists(data, TxnReference.column(0, tb1, column, path), 
false);
+
+            // now include row
+            partition.clear().add(clustering)
+                     .addComplex(column, values)
+                     .build();
+            data = TxnData.of(0, new TxnDataKeyValue(partition.filtered()));
+            assertExists(data, TxnReference.column(0, tb1, column, path), 
!fieldOrElementType.isNull(expectedValue));
+        };
+
+        DecoratedKey key = tb1.partitioner.decorateKey(INT_1);
+        BufferClustering ck = BufferClustering.make(INT_1);
+        SimplePartition partition = new SimplePartition(tb1, key);
+
+
+        partition.clear().addEmpty(ck);
+        TxnData data = TxnData.of(0, new 
TxnDataKeyValue(partition.filtered()));
+        assertExists(data, TxnReference.column(0, tb1, 
tb1.getExistingColumn(COL_PK1)), true);
+        assertExists(data, TxnReference.column(0, tb1, 
tb1.getExistingColumn(COL_CK1)), false);
+        assertExists(data, TxnReference.column(0, tb1, 
tb1.getExistingColumn(COL_S1)), false);
+        assertExists(data, TxnReference.column(0, tb1, 
tb1.getExistingColumn(COL_R1)), false);
+
+        // now run with liveness set
+        partition.clear().addEmptyAndLive(ck);
+        data = TxnData.of(0, new TxnDataKeyValue(partition.filtered()));
+        assertExists(data, TxnReference.column(0, tb1, 
tb1.getExistingColumn(COL_CK1)), true);
+
+        for (Clustering<?> clustering : 
Arrays.asList(Clustering.STATIC_CLUSTERING, ck))
+        {
+            ColumnMetadata.Kind kind = clustering == 
Clustering.STATIC_CLUSTERING ? ColumnMetadata.Kind.STATIC : 
ColumnMetadata.Kind.REGULAR;
+
+            simpleTest.test(partition, clustering, 
tb1.getExistingColumn(name(kind, 1)), INT_1);
+
+            simpleTest.test(partition, clustering, 
tb1.getExistingColumn(name(kind, 2)), FROZEN_UDT.pack(INT_1));
+            frozenFieldOrElementTest.test(partition, clustering, 
tb1.getExistingColumn(name(kind, 2)), FROZEN_UDT.cellPathForField(UDT_F1), 
FROZEN_UDT.pack(INT_1), INT_1);
+            frozenFieldOrElementTest.test(partition, clustering, 
tb1.getExistingColumn(name(kind, 2)), FROZEN_UDT.cellPathForField(UDT_F1), 
FROZEN_UDT.pack(EMPTY_BYTE_BUFFER), EMPTY_BYTE_BUFFER);
+
+            //TODO (coverage): test list type, which supports by-offset and 
by-value
+            //TODO (coverage): test map type, which has key/value which allows 
empty
+            simpleTest.test(partition, clustering, 
tb1.getExistingColumn(name(kind, 3)), 
FROZEN_SET.pack(Collections.singletonList(INT_1)));
+            frozenFieldOrElementTest.test(partition, clustering, 
tb1.getExistingColumn(name(kind, 3)), CellPath.create(INT_1), 
FROZEN_SET.pack(Collections.singletonList(INT_1)), INT_1);
+            frozenFieldOrElementTest.test(partition, clustering, 
tb1.getExistingColumn(name(kind, 3)), CellPath.create(EMPTY_BYTE_BUFFER), 
FROZEN_SET.pack(Collections.singletonList(EMPTY_BYTE_BUFFER)), 
EMPTY_BYTE_BUFFER);
+
+            complexTest.test(partition, clustering, 
tb1.getExistingColumn(name(kind, 4)), Collections.singletonList(INT_1));
+            fieldOrElementTest.test(partition, clustering, 
tb1.getExistingColumn(name(kind, 4)), UDT.cellPathForField(UDT_F1), 
Collections.singletonList(INT_1), INT_1);
+            fieldOrElementTest.test(partition, clustering, 
tb1.getExistingColumn(name(kind, 4)), UDT.cellPathForField(UDT_F1), 
Collections.singletonList(EMPTY_BYTE_BUFFER), EMPTY_BYTE_BUFFER);
+
+            complexTest.test(partition, clustering, 
tb1.getExistingColumn(name(kind, 5)), Collections.singletonList(INT_1));
+            fieldOrElementTest.test(partition, clustering, 
tb1.getExistingColumn(name(kind, 5)), CellPath.create(INT_1), 
Collections.singletonList(INT_1), INT_1);
+            fieldOrElementTest.test(partition, clustering, 
tb1.getExistingColumn(name(kind, 5)), CellPath.create(EMPTY_BYTE_BUFFER), 
Collections.singletonList(EMPTY_BYTE_BUFFER), EMPTY_BYTE_BUFFER);
+        }
+    }
+
+    @Test
+    public void harryPotterAndTheMeaninglessEmptyness()
+    {
+        for (var type : AbstractTypeGenerators.MEANINGLESS_EMPTYNESS)
+        {
+            if (type == CounterColumnType.instance) continue;
+            TableMetadata metadata = TableMetadata.builder("ks", "tbl")
+                                                  .addPartitionKeyColumn("pk", 
type)
+                                                  .addClusteringColumn("ck", 
type)
+                                                  .addRegularColumn("r", type)
+                                                  .addStaticColumn("s", type)
+                                                  
.partitioner(Murmur3Partitioner.instance)
+                                                  .build();
+            ByteBuffer nonEmpty = 
toGen(AbstractTypeGenerators.getTypeSupport(type).bytesGen()).next(new 
DefaultRandom(42));
+            Assertions.assertThat(nonEmpty).isNotEqualTo(EMPTY_BYTE_BUFFER); 
// double check...
+
+            SimplePartition partition = new SimplePartition(metadata, 
metadata.partitioner.decorateKey(nonEmpty));
+            Clustering<?> clustering = BufferClustering.make(nonEmpty);
+            for (var c : metadata.regularAndStaticColumns())
+                partition.add(c.isStatic() ? Clustering.STATIC_CLUSTERING : 
clustering)
+                         .add(c, nonEmpty)
+                         .build();
+            TxnData data = TxnData.of(0, new 
TxnDataKeyValue(partition.filtered()));
+            // the test is against empty, so can not ever apply
+            for (var column : metadata.columns())
+            {
+                for (TxnCondition.Kind kind : TxnCondition.Value.supported())
+                {
+                    for (ProtocolVersion version : ProtocolVersion.SUPPORTED)
+                    {
+                        TxnReference ref = TxnReference.column(0, metadata, 
column);
+                        // empty logic reuses this which doesn't make the most 
sense... flesh it out
+                        TxnCondition.Value condition = new 
TxnCondition.Value(ref.asColumn(), kind, EMPTY_BYTE_BUFFER, version);
+                        Assertions.assertThat(condition.applies(data))
+                                  .describedAs("column=%s, type=%s, kind=%s", 
column.name, type.asCQL3Type(), kind.name())
+                                  .isFalse();
+                    }
+                }
+            }
+
+            // partition values have empty, so can not ever apply
+            partition = new SimplePartition(metadata, 
metadata.partitioner.decorateKey(EMPTY_BYTE_BUFFER));
+            clustering = BufferClustering.make(EMPTY_BYTE_BUFFER);
+            for (var c : metadata.regularAndStaticColumns())
+                partition.add(c.isStatic() ? Clustering.STATIC_CLUSTERING : 
clustering)
+                         .add(c, EMPTY_BYTE_BUFFER)
+                         .build();
+            data = TxnData.of(0, new TxnDataKeyValue(partition.filtered()));
+            for (var column : metadata.columns())
+            {
+                for (TxnCondition.Kind kind : TxnCondition.Value.supported())
+                {
+                    for (ProtocolVersion version : ProtocolVersion.SUPPORTED)
+                    {
+                        TxnReference ref = TxnReference.column(0, metadata, 
column);
+                        // empty logic reuses this which doesn't make the most 
sense... flesh it out
+                        TxnCondition.Value condition = new 
TxnCondition.Value(ref.asColumn(), kind, nonEmpty, version);
+                        Assertions.assertThat(condition.applies(data))
+                                  .describedAs("column=%s, type=%s, kind=%s", 
column.name, type.asCQL3Type(), kind.name())
+                                  .isFalse();
+                    }
+                }
+            }
+        }
+    }
+
+    @Test
+    public void value()
+    {
+        Gen<AbstractType<?>> typeGen = toGen(new 
AbstractTypeGenerators.TypeGenBuilder()
+                                             .withoutUnsafeEquality()
+                                             .build());
+        qt().check(rs -> {
+            AbstractType<?> type = typeGen.next(rs);
+            TableMetadata metadata = TableMetadata.builder("ks", "tbl")
+                                                  .addPartitionKeyColumn("pk", 
type.freeze())
+                                                  .addClusteringColumn("ck", 
type.freeze())
+                                                  .addRegularColumn("r", type)
+                                                  .addStaticColumn("s", type)
+                                                  
.partitioner(Murmur3Partitioner.instance)
+                                                  .build();
+            ByteBuffer value = 
toGen(AbstractTypeGenerators.getTypeSupport(type).bytesGen()).next(rs);
+            List<ByteBuffer> complexValue = type.isMultiCell() ? split(type, 
value) : null;
+            Clustering<?> clustering = BufferClustering.make(value);
+            SimplePartition partition = new SimplePartition(metadata, 
metadata.partitioner.decorateKey(value));
+            for (TxnCondition.Kind kind : TxnCondition.Value.supported())
+            {
+                for (ProtocolVersion version : ProtocolVersion.SUPPORTED)
+                {
+                    for (ColumnMetadata column : metadata.columns())
+                    {
+                        TxnReference ref = TxnReference.column(0, metadata, 
column);
+                        // empty logic reuses this which doesn't make the most 
sense... flesh it out
+                        TxnCondition.Value condition = new 
TxnCondition.Value(ref.asColumn(), kind, value, version);
+
+                        partition.clear().addEmptyAndLive(clustering);
+                        // empty partition
+                        boolean expected;
+                        switch (kind)
+                        {
+                            case EQUAL:
+                            case LESS_THAN_OR_EQUAL:
+                            case GREATER_THAN_OR_EQUAL:
+                                expected = column.isPrimaryKeyColumn();
+                                break;
+                            case NOT_EQUAL:
+                            case LESS_THAN:
+                            case GREATER_THAN:
+                                expected = false;
+                                break;
+                            default:
+                                throw new 
UnsupportedOperationException(kind.name());
+                        }
+                        Assertions.assertThat(condition.applies(TxnData.of(0, 
new TxnDataKeyValue(partition.filtered()))))
+                                  .describedAs("column=%s, type=%s, kind=%s", 
column.name, type.asCQL3Type(), kind.name())
+                                  .isEqualTo(expected);
+
+
+                        if (column.isPrimaryKeyColumn()) continue;
+
+                        // with value
+                        if (type.isMultiCell())
+                        {
+                            partition.clear()
+                                     .add(column.isStatic() ? 
Clustering.STATIC_CLUSTERING : clustering)
+                                     .addComplex(column, complexValue)
+                                     .build();
+                        }
+                        else
+                        {
+                            partition.clear()
+                                     .add(column.isStatic() ? 
Clustering.STATIC_CLUSTERING : clustering)
+                                     .add(column, value)
+                                     .build();
+                        }
+                        switch (kind)
+                        {
+                            case EQUAL:
+                            case LESS_THAN_OR_EQUAL:
+                            case GREATER_THAN_OR_EQUAL:
+                                expected = true;
+                                break;
+                            case NOT_EQUAL:
+                            case LESS_THAN:
+                            case GREATER_THAN:
+                                expected = false;
+                                break;
+                            default:
+                                throw new 
UnsupportedOperationException(kind.name());
+                        }
+                        Assertions.assertThat(condition.applies(TxnData.of(0, 
new TxnDataKeyValue(partition.filtered()))))
+                                  .describedAs("column=%s, type=%s, kind=%s", 
column.name, type.asCQL3Type(), kind.name())
+                                  .isEqualTo(expected);
+                    }
+                }
+            }
+        });
+    }
+
+    private static List<ByteBuffer> split(AbstractType<?> type, ByteBuffer 
value)
+    {
+        type = type.unwrap();
+        if (type.isUDT())
+        {
+            return ((UserType) type).unpack(value);
+        }
+        else if (type.isCollection())
+        {
+            return ((CollectionType<?>) type).unpack(value);
+        }
+        throw new UnsupportedOperationException(type.asCQL3Type().toString());
+    }
+
+    private static void assertExists(TxnData data, TxnReference ref, boolean 
exists)
+    {
+        Assertions.assertThat(new TxnCondition.Exists(ref, 
TxnCondition.Kind.IS_NULL).applies(data)).describedAs("ref=%s %s", ref, exists 
? "exists but shouldn't have applied" : "does not exist but should have 
applied").isEqualTo(!exists);
+        Assertions.assertThat(new TxnCondition.Exists(ref, 
TxnCondition.Kind.IS_NOT_NULL).applies(data)).describedAs("ref=%s %s", ref, 
exists ? "exists but should have applied" : "does not exist but shouldn't have 
applied").isEqualTo(exists);
+    }
+
     private Gen<TxnCondition> txnConditionGen()
     {
         return rs -> {
@@ -111,7 +496,7 @@ public class TxnConditionTest
             {
                 case 0: return TxnCondition.none();
                 case 1: return new TxnCondition.Exists(TXN_REF_GEN.next(rs), 
EXISTS_KIND_GEN.next(rs));
-                case 2: return new TxnCondition.Value(TXN_REF_GEN.next(rs), 
VALUE_KIND_GEN.next(rs), BYTES_GEN.next(rs), PROTOCOL_VERSION_GEN.next(rs));
+                case 2: return new 
TxnCondition.Value(TXN_REF_GEN.next(rs).asColumn(), VALUE_KIND_GEN.next(rs), 
BYTES_GEN.next(rs), PROTOCOL_VERSION_GEN.next(rs));
                 case 3: return new 
TxnCondition.ColumnConditionsAdapter(CLUSTERING_GEN.next(rs), 
Gens.lists(BOUND_GEN).ofSizeBetween(0, 3).next(rs));
                 case 4: return new 
TxnCondition.BooleanGroup(BOOLEAN_KIND_GEN.next(rs), 
Gens.lists(txnConditionGen()).ofSizeBetween(0, 3).next(rs));
                 default: throw new AssertionError();
@@ -119,6 +504,26 @@ public class TxnConditionTest
         };
     }
 
+    private interface IsNullTest // jdk16+ lets this be in-lined with the test 
method rather than be here
+    {
+        void test(SimplePartition partition, Clustering<?> clustering, 
ColumnMetadata column, ByteBuffer nonNullValue);
+    }
+
+    private interface IsNullFrozenFieldOrElementTest // jdk16+ lets this be 
in-lined with the test method rather than be here
+    {
+        void test(SimplePartition partition, Clustering<?> clustering, 
ColumnMetadata column, CellPath path, ByteBuffer nonNullValue, ByteBuffer 
expectedValue);
+    }
+
+    private interface IsNullFieldOrElementTest // jdk16+ lets this be in-lined 
with the test method rather than be here
+    {
+        void test(SimplePartition partition, Clustering<?> clustering, 
ColumnMetadata column, CellPath path, List<ByteBuffer> values, ByteBuffer 
expectedValue);
+    }
+
+    private interface IsNullComplexTest // jdk16+ lets this be in-lined with 
the test method rather than be here
+    {
+        void test(SimplePartition partition, Clustering<?> clustering, 
ColumnMetadata column, List<ByteBuffer> values);
+    }
+
     private static class SchemaProvider extends MockSchema.MockSchemaProvider
     {
         private final class Key
diff --git a/test/unit/org/apache/cassandra/utils/AbstractTypeGenerators.java 
b/test/unit/org/apache/cassandra/utils/AbstractTypeGenerators.java
index d44d32d043..90b3f26dca 100644
--- a/test/unit/org/apache/cassandra/utils/AbstractTypeGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/AbstractTypeGenerators.java
@@ -184,6 +184,30 @@ public final class AbstractTypeGenerators
                                                                                
               .add(DynamicCompositeType.class)
                                                                                
               .add(CounterColumnType.class)
                                                                                
               .build();
+    // NEVER EVER EVER UPDATE THIS LIST!
+    // meaningless emptyness is a legacy thrift concept, so only types from 
back then apply and all new types will never apply
+    public static final ImmutableUniqueList<AbstractType<?>> 
MEANINGLESS_EMPTYNESS = ImmutableUniqueList.of(
+    CounterColumnType.instance,
+
+    BooleanType.instance,
+
+    DateType.instance,
+    TimestampType.instance,
+
+    InetAddressType.instance,
+
+    TimeUUIDType.instance,
+    LegacyTimeUUIDType.instance,
+    LexicalUUIDType.instance,
+    UUIDType.instance,
+
+    DecimalType.instance,
+    DoubleType.instance,
+    FloatType.instance,
+    Int32Type.instance,
+    IntegerType.instance,
+    LongType.instance
+    );
 
     private AbstractTypeGenerators()
     {
@@ -415,6 +439,11 @@ public final class AbstractTypeGenerators
             return this;
         }
 
+        public TypeGenBuilder withoutUnsafeEquality()
+        {
+            return AbstractTypeGenerators.withoutUnsafeEquality(this);
+        }
+
         @SuppressWarnings("unused")
         public TypeGenBuilder withPrimitives(AbstractType<?> first, 
AbstractType<?>... remaining)
         {
@@ -1091,6 +1120,16 @@ public final class AbstractTypeGenerators
         {
             support = (TypeSupport<T>) 
TypeSupport.of(CounterColumnType.instance, SourceDSL.longs().all());
         }
+        else if (type == DateType.instance)
+        {
+            // this type isn't supported in most places, but can still be 
supported here
+            support = (TypeSupport<T>) TypeSupport.of(DateType.instance, 
Generators.DATE_GEN);
+        }
+        else if (type == LegacyTimeUUIDType.instance)
+        {
+            // this type isn't supported in most places, but can still be 
supported here
+            support = (TypeSupport<T>) 
TypeSupport.of(LegacyTimeUUIDType.instance, 
Generators.UUID_TIME_GEN.mix(Generators.UUID_RANDOM_GEN));
+        }
         else
         {
             throw new UnsupportedOperationException("No TypeSupport for: " + 
type);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to