This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8d641e06fc Accord: BEGIN TRANSACTIONs IF condition logic does not
properly support meaningless emptiness and null values
8d641e06fc is described below
commit 8d641e06fcc15f93fdb74cfe12511345e9b9ee22
Author: David Capwell <[email protected]>
AuthorDate: Tue Jun 24 15:37:12 2025 -0700
Accord: BEGIN TRANSACTIONs IF condition logic does not properly support
meaningless emptiness and null values
patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-20667
---
CHANGES.txt | 1 +
.../cassandra/cql3/conditions/ColumnCondition.java | 27 +-
.../cassandra/cql3/statements/CQL3CasRequest.java | 4 +-
.../cql3/statements/TransactionStatement.java | 2 +-
.../org/apache/cassandra/cql3/terms/Lists.java | 6 +-
.../cql3/transactions/ConditionStatement.java | 7 +-
.../cql3/transactions/ReferenceValue.java | 2 +-
.../cql3/transactions/RowDataReference.java | 2 +-
.../cassandra/service/accord/AccordResult.java | 3 +
...AbstractParameterisedUnversionedSerialized.java | 42 ++
... AbstractParameterisedVersionedSerialized.java} | 42 +-
.../service/accord/txn/AbstractSerialized.java | 59 +-
.../cassandra/service/accord/txn/TxnCondition.java | 218 ++++---
.../cassandra/service/accord/txn/TxnNamedRead.java | 2 +-
.../cassandra/service/accord/txn/TxnReference.java | 680 ++++++++++++++-------
.../service/accord/txn/TxnReferenceOperation.java | 17 +-
.../service/accord/txn/TxnReferenceOperations.java | 12 +-
.../service/accord/txn/TxnReferenceValue.java | 65 +-
.../cassandra/service/accord/txn/TxnUpdate.java | 10 +-
.../cassandra/service/accord/txn/TxnWrite.java | 2 +-
.../cassandra/utils/CollectionSerializers.java | 9 +
.../cql3/validation/operations/InsertTest.java | 52 ++
.../cassandra/db/marshal/AbstractTypeTest.java | 75 +++
.../cassandra/db/partitions/SimplePartition.java | 223 +++++++
.../service/accord/txn/TxnConditionTest.java | 425 ++++++++++++-
.../cassandra/utils/AbstractTypeGenerators.java | 39 ++
26 files changed, 1590 insertions(+), 436 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 3a14287612..4b04683842 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Accord: BEGIN TRANSACTIONs IF condition logic does not properly support
meaningless emptiness and null values (CASSANDRA-20667)
* Accord: startup race condition where accord journal tries to access the 2i
index before its ready (CASSANDRA-20686)
* Adopt Unsafe::invokeCleaner for Direct ByteBuffer cleaning (CASSANDRA-20677)
* Add additional metrics around hints (CASSANDRA-20499)
diff --git a/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
b/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
index 1f9ba51862..b996f00e26 100644
--- a/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
+++ b/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
@@ -236,6 +236,8 @@ public final class ColumnCondition
*/
public abstract boolean appliesTo(Row row);
+ public abstract boolean isNull(Row row);
+
public abstract BoundKind kind();
public static final ParameterisedUnversionedSerializer<Bound,
TableMetadatas> serializer = new ParameterisedUnversionedSerializer<>() {
@@ -294,6 +296,12 @@ public final class ColumnCondition
return operator.isSatisfiedBy(column.type, rowValue(row), value);
}
+ @Override
+ public boolean isNull(Row row)
+ {
+ return column.type.isNull(rowValue(row));
+ }
+
protected ByteBuffer rowValue(Row row)
{
// If we're asking for a given cell, and we didn't get any row
from our read, it's
@@ -399,10 +407,21 @@ public final class ColumnCondition
@Override
public boolean appliesTo(Row row)
{
- ByteBuffer element = ((MultiElementType<?>)
column.type).getElement(columnData(row), keyOrIndex);
+ ByteBuffer element = elementValue(row);
return operator.isSatisfiedBy(elementType, element, value);
}
+ private ByteBuffer elementValue(Row row)
+ {
+ return ((MultiElementType<?>)
column.type).getElement(columnData(row), keyOrIndex);
+ }
+
+ @Override
+ public boolean isNull(Row row)
+ {
+ return column.type.isNull(elementValue(row));
+ }
+
/**
* Returns the column data for the given row.
* @param row the row
@@ -454,6 +473,12 @@ public final class ColumnCondition
return operator.isSatisfiedBy((MultiElementType<?>) column.type,
columnData, value);
}
+ @Override
+ public boolean isNull(Row row)
+ {
+ return row == null || row.getComplexColumnData(column) == null;
+ }
+
@Override
public boolean equals(Object o)
{
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 816fc9a814..e7a561c76c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -428,7 +428,7 @@ public class CQL3CasRequest implements CASRequest
public TxnCondition asTxnCondition()
{
- TxnReference txnReference = new
TxnReference(txnDataName(CAS_READ), null, null);
+ TxnReference txnReference =
TxnReference.row(txnDataName(CAS_READ));
return new TxnCondition.Exists(txnReference,
TxnCondition.Kind.IS_NULL);
}
}
@@ -453,7 +453,7 @@ public class CQL3CasRequest implements CASRequest
public TxnCondition asTxnCondition()
{
- TxnReference txnReference = new
TxnReference(txnDataName(CAS_READ), null, null);
+ TxnReference txnReference =
TxnReference.row(txnDataName(CAS_READ));
return new TxnCondition.Exists(txnReference,
TxnCondition.Kind.IS_NOT_NULL);
}
}
diff --git
a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
index f27c49cfcb..67a796a28a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
@@ -565,7 +565,7 @@ public class TransactionStatement implements
CQLStatement.CompositeCQLStatement,
{
RowDataReference reference = returningReferences.get(i);
TxnReference txnReference = reference.toTxnReference(options);
- ByteBuffer buffer = txnReference.toByteBuffer(data,
resultType.get(i));
+ ByteBuffer buffer = txnReference.asColumn().toByteBuffer(data,
resultType.get(i));
result.add(buffer);
}
diff --git a/src/java/org/apache/cassandra/cql3/terms/Lists.java
b/src/java/org/apache/cassandra/cql3/terms/Lists.java
index 579b783d8d..d0b0a6eb80 100644
--- a/src/java/org/apache/cassandra/cql3/terms/Lists.java
+++ b/src/java/org/apache/cassandra/cql3/terms/Lists.java
@@ -300,7 +300,7 @@ public abstract class Lists
@Override
public boolean requiresTimestamp()
{
- return true;
+ return column.type.isMultiCell();
}
public void execute(DecoratedKey partitionKey, UpdateParameters
params) throws InvalidRequestException
@@ -397,7 +397,7 @@ public abstract class Lists
@Override
public boolean requiresTimestamp()
{
- return true;
+ return column.type.isMultiCell();
}
static void doAppend(Term.Terminal value, ColumnMetadata column,
UpdateParameters params) throws InvalidRequestException
@@ -455,7 +455,7 @@ public abstract class Lists
@Override
public boolean requiresTimestamp()
{
- return true;
+ return column.type.isMultiCell();
}
public void execute(DecoratedKey partitionKey, UpdateParameters
params) throws InvalidRequestException
diff --git
a/src/java/org/apache/cassandra/cql3/transactions/ConditionStatement.java
b/src/java/org/apache/cassandra/cql3/transactions/ConditionStatement.java
index 2ce6f33502..25715b34e1 100644
--- a/src/java/org/apache/cassandra/cql3/transactions/ConditionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/transactions/ConditionStatement.java
@@ -25,6 +25,9 @@ import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.terms.Term;
import org.apache.cassandra.cql3.VariableSpecifications;
import org.apache.cassandra.service.accord.txn.TxnCondition;
+import org.apache.cassandra.service.accord.txn.TxnReference;
+
+import static
org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
public class ConditionStatement
{
@@ -137,7 +140,9 @@ public class ConditionStatement
case LT:
case LTE:
// TODO: Support for references on LHS and RHS
- return new
TxnCondition.Value(reference.toTxnReference(options),
+ TxnReference ref = reference.toTxnReference(options);
+ checkTrue(ref.kind == TxnReference.Kind.COLUMN, "Condition %s
requires COLUMN reference but given %s", kind, ref.kind);
+ return new TxnCondition.Value(ref.asColumn(),
kind.toTxnKind(reversed),
value.bindAndGet(options),
options.getProtocolVersion());
diff --git
a/src/java/org/apache/cassandra/cql3/transactions/ReferenceValue.java
b/src/java/org/apache/cassandra/cql3/transactions/ReferenceValue.java
index d6a4ab8acf..0cd09e783b 100644
--- a/src/java/org/apache/cassandra/cql3/transactions/ReferenceValue.java
+++ b/src/java/org/apache/cassandra/cql3/transactions/ReferenceValue.java
@@ -106,7 +106,7 @@ public abstract class ReferenceValue
@Override
public TxnReferenceValue bindAndGet(QueryOptions options)
{
- return new
TxnReferenceValue.Substitution(reference.toTxnReference(options));
+ return new
TxnReferenceValue.Substitution(reference.toTxnReference(options).asColumn());
}
public static class Raw extends ReferenceValue.Raw
diff --git
a/src/java/org/apache/cassandra/cql3/transactions/RowDataReference.java
b/src/java/org/apache/cassandra/cql3/transactions/RowDataReference.java
index 038bd45963..161e1d5706 100644
--- a/src/java/org/apache/cassandra/cql3/transactions/RowDataReference.java
+++ b/src/java/org/apache/cassandra/cql3/transactions/RowDataReference.java
@@ -187,7 +187,7 @@ public class RowDataReference extends Term.NonTerminal
Preconditions.checkState(elementPath == null || column.isComplex() ||
column.type.isFrozenCollection());
Preconditions.checkState(fieldPath == null || column.isComplex() ||
column.type.isUDT());
- return new TxnReference(txnDataName, table, column,
bindCellPath(options));
+ return TxnReference.columnOrRow(txnDataName, table, column,
bindCellPath(options));
}
public ColumnIdentifier getFullyQualifiedName()
diff --git a/src/java/org/apache/cassandra/service/accord/AccordResult.java
b/src/java/org/apache/cassandra/service/accord/AccordResult.java
index 86315b6d60..9d97d3108d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordResult.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordResult.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.service.accord.api.AccordAgent;
import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.service.accord.txn.RetryWithNewProtocolResult;
import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
@@ -142,6 +143,8 @@ public class AccordResult<V> extends AsyncFuture<V>
implements BiConsumer<V, Thr
}
else
{
+ logger.error("Unexpected exception", fail);
+ JVMStabilityInspector.inspectThrowable(fail);
report = bookkeeping.newFailed(txnId, keysOrRanges);
}
report.addSuppressed(fail);
diff --git
a/src/java/org/apache/cassandra/service/accord/txn/AbstractParameterisedUnversionedSerialized.java
b/src/java/org/apache/cassandra/service/accord/txn/AbstractParameterisedUnversionedSerialized.java
new file mode 100644
index 0000000000..b0a827b7bf
--- /dev/null
+++
b/src/java/org/apache/cassandra/service/accord/txn/AbstractParameterisedUnversionedSerialized.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.io.ParameterisedUnversionedSerializer;
+
+public abstract class AbstractParameterisedUnversionedSerialized<T, P> extends
AbstractSerialized<T>
+{
+ public AbstractParameterisedUnversionedSerialized(@Nullable ByteBuffer
latestVersionBytes)
+ {
+ super(latestVersionBytes);
+ }
+
+ protected abstract ParameterisedUnversionedSerializer<T, P> serializer();
+
+ protected T deserialize(P param)
+ {
+ T result = memoized;
+ if (result == null && latestVersionBytes != null)
+ memoized = result = serializer().deserializeUnchecked(param,
latestVersionBytes);
+ return result;
+ }
+}
diff --git
a/src/java/org/apache/cassandra/service/accord/txn/AbstractSerialized.java
b/src/java/org/apache/cassandra/service/accord/txn/AbstractParameterisedVersionedSerialized.java
similarity index 66%
copy from
src/java/org/apache/cassandra/service/accord/txn/AbstractSerialized.java
copy to
src/java/org/apache/cassandra/service/accord/txn/AbstractParameterisedVersionedSerialized.java
index ac172a221b..6eb0bb8b9e 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/AbstractSerialized.java
+++
b/src/java/org/apache/cassandra/service/accord/txn/AbstractParameterisedVersionedSerialized.java
@@ -31,42 +31,17 @@ import
org.apache.cassandra.service.accord.serializers.Version;
* Item that is serialized by default
*/
@NotThreadSafe
-public abstract class AbstractSerialized<T, P>
+public abstract class AbstractParameterisedVersionedSerialized<T, P> extends
AbstractSerialized<T>
{
- private @Nullable final ByteBuffer latestVersionBytes;
- private transient @Nullable T memoized = null;
-
- protected AbstractSerialized(@Nullable ByteBuffer latestVersionBytes)
+ protected AbstractParameterisedVersionedSerialized(@Nullable ByteBuffer
latestVersionBytes)
{
- this.latestVersionBytes = latestVersionBytes;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (o == null || (o.getClass() != getClass())) return false;
-
- AbstractSerialized<?,?> that = (AbstractSerialized<?,?>) o;
- return Objects.equals(latestVersionBytes, that.latestVersionBytes);
+ super(latestVersionBytes);
}
- @Override
- public int hashCode()
- {
- return latestVersionBytes != null ? latestVersionBytes.hashCode() : 0;
- }
-
- public abstract long estimatedSizeOnHeap();
protected abstract ByteBuffer serialize(T value, P param, Version version);
protected abstract ByteBuffer reserialize(ByteBuffer bytes, P param,
Version srcVersion, Version trgVersion);
protected abstract T deserialize(P param, ByteBuffer bytes, Version
version);
- protected boolean isNull()
- {
- return latestVersionBytes == null;
- }
-
@Nullable
protected T deserialize(P param)
{
@@ -76,17 +51,6 @@ public abstract class AbstractSerialized<T, P>
return result;
}
- public void unmemoize()
- {
- memoized = null;
- }
-
- @Nullable
- protected ByteBuffer unsafeBytes()
- {
- return latestVersionBytes;
- }
-
@Nonnull
protected ByteBuffer bytes(P param, Version target)
{
diff --git
a/src/java/org/apache/cassandra/service/accord/txn/AbstractSerialized.java
b/src/java/org/apache/cassandra/service/accord/txn/AbstractSerialized.java
index ac172a221b..d3d86dfa85 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/AbstractSerialized.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/AbstractSerialized.java
@@ -25,57 +25,28 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import accord.utils.Invariants;
-import org.apache.cassandra.service.accord.serializers.Version;
/**
* Item that is serialized by default
*/
@NotThreadSafe
-public abstract class AbstractSerialized<T, P>
+public abstract class AbstractSerialized<T>
{
- private @Nullable final ByteBuffer latestVersionBytes;
- private transient @Nullable T memoized = null;
+ protected @Nullable final ByteBuffer latestVersionBytes;
+ protected transient @Nullable T memoized = null;
- protected AbstractSerialized(@Nullable ByteBuffer latestVersionBytes)
+ public AbstractSerialized(@Nullable ByteBuffer latestVersionBytes)
{
this.latestVersionBytes = latestVersionBytes;
}
- @Override
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (o == null || (o.getClass() != getClass())) return false;
-
- AbstractSerialized<?,?> that = (AbstractSerialized<?,?>) o;
- return Objects.equals(latestVersionBytes, that.latestVersionBytes);
- }
-
- @Override
- public int hashCode()
- {
- return latestVersionBytes != null ? latestVersionBytes.hashCode() : 0;
- }
-
public abstract long estimatedSizeOnHeap();
- protected abstract ByteBuffer serialize(T value, P param, Version version);
- protected abstract ByteBuffer reserialize(ByteBuffer bytes, P param,
Version srcVersion, Version trgVersion);
- protected abstract T deserialize(P param, ByteBuffer bytes, Version
version);
protected boolean isNull()
{
return latestVersionBytes == null;
}
- @Nullable
- protected T deserialize(P param)
- {
- T result = memoized;
- if (result == null && latestVersionBytes != null)
- memoized = result = deserialize(param, latestVersionBytes,
Version.LATEST);
- return result;
- }
-
public void unmemoize()
{
memoized = null;
@@ -88,11 +59,25 @@ public abstract class AbstractSerialized<T, P>
}
@Nonnull
- protected ByteBuffer bytes(P param, Version target)
+ protected ByteBuffer bytes()
{
Invariants.nonNull(latestVersionBytes);
- if (Version.LATEST == target)
- return latestVersionBytes;
- return reserialize(latestVersionBytes, param, Version.LATEST, target);
+ return latestVersionBytes;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || (o.getClass() != getClass())) return false;
+
+ AbstractSerialized<?> that = (AbstractSerialized<?>) o;
+ return Objects.equals(latestVersionBytes, that.latestVersionBytes);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return latestVersionBytes != null ? latestVersionBytes.hashCode() : 0;
}
}
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
index 881a332162..589f954fb6 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnCondition.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.service.accord.txn;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -38,19 +39,19 @@ import
org.apache.cassandra.cql3.conditions.ColumnCondition.Bound;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.UserType;
import org.apache.cassandra.db.partitions.FilteredPartition;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.ColumnData;
import org.apache.cassandra.db.rows.ComplexColumnData;
import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.io.ParameterisedVersionedSerializer;
+import org.apache.cassandra.io.ParameterisedUnversionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.accord.serializers.TableMetadatas;
-import org.apache.cassandra.service.accord.serializers.Version;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ObjectSizes;
@@ -67,7 +68,7 @@ import static
org.apache.cassandra.utils.CollectionSerializers.serializedListSiz
public abstract class TxnCondition
{
- public static class SerializedTxnCondition extends
AbstractSerialized<TxnCondition, TableMetadatas>
+ public static class SerializedTxnCondition extends
AbstractParameterisedUnversionedSerialized<TxnCondition, TableMetadatas>
{
private static final long EMPTY_SIZE = ObjectSizes.measure(new
SerializedTxnCondition(null));
@@ -76,41 +77,29 @@ public abstract class TxnCondition
super(latestVersionBytes);
}
- protected SerializedTxnCondition(TxnCondition condition,
TableMetadatas param)
- {
- this(serializer.serializeUnchecked(condition, param,
Version.LATEST));
- }
-
@Override
- public long estimatedSizeOnHeap()
+ protected ParameterisedUnversionedSerializer<TxnCondition,
TableMetadatas> serializer()
{
- return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(unsafeBytes());
+ return serializer;
}
- @Override
- protected ByteBuffer serialize(TxnCondition value, TableMetadatas
param, Version version)
- {
- return serializer.serializeUnchecked(value, param, version);
- }
-
- @Override
- protected ByteBuffer reserialize(ByteBuffer bytes, TableMetadatas
param, Version srcVersion, Version trgVersion)
+ protected SerializedTxnCondition(TxnCondition condition,
TableMetadatas param)
{
- return bytes;
+ this(serializer.serializeUnchecked(condition, param));
}
@Override
- protected TxnCondition deserialize(TableMetadatas param, ByteBuffer
bytes, Version version)
+ public long estimatedSizeOnHeap()
{
- return serializer.deserializeUnchecked(param, bytes, version);
+ return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(unsafeBytes());
}
}
private interface ConditionSerializer<T extends TxnCondition>
{
- void serialize(T condition, TableMetadatas tables, DataOutputPlus out,
Version version) throws IOException;
- T deserialize(TableMetadatas tables, DataInputPlus in, Version
version, Kind kind) throws IOException;
- long serializedSize(T condition, TableMetadatas tables, Version
version);
+ void serialize(T condition, TableMetadatas tables, DataOutputPlus out)
throws IOException;
+ T deserialize(TableMetadatas tables, DataInputPlus in, Kind kind)
throws IOException;
+ long serializedSize(T condition, TableMetadatas tables);
}
public enum Kind
@@ -227,11 +216,11 @@ public abstract class TxnCondition
private static final ConditionSerializer<None> serializer = new
ConditionSerializer<>()
{
@Override
- public void serialize(None condition, TableMetadatas tables,
DataOutputPlus out, Version version) {}
+ public void serialize(None condition, TableMetadatas tables,
DataOutputPlus out) {}
@Override
- public None deserialize(TableMetadatas tables, DataInputPlus in,
Version version, Kind kind) { return instance; }
+ public None deserialize(TableMetadatas tables, DataInputPlus in,
Kind kind) { return instance; }
@Override
- public long serializedSize(None condition, TableMetadatas tables,
Version version) { return 0; }
+ public long serializedSize(None condition, TableMetadatas tables)
{ return 0; }
};
}
@@ -266,9 +255,7 @@ public abstract class TxnCondition
@Override
public void collect(TableMetadatas.Collector collector)
{
- TableMetadata table = reference.table();
- if (table != null)
- collector.add(table);
+ reference.collect(collector);
}
@Override
@@ -283,12 +270,10 @@ public abstract class TxnCondition
return reference.toString() + ' ' + kind.toString();
}
- @Override
- public boolean applies(TxnData data)
+ private boolean applies(FilteredPartition partition, boolean exists,
TxnReference.ColumnReference ref)
{
- FilteredPartition partition = reference.getPartition(data);
- boolean exists = partition != null && !partition.isEmpty();
-
+ ColumnMetadata column = ref.column();
+ if (column.isPartitionKey()) return exists;
Row row = null;
if (exists)
{
@@ -296,9 +281,9 @@ public abstract class TxnCondition
exists = row != null && !row.isEmpty();
}
- if (exists && reference.selectsColumn())
+ if (exists)
{
- ColumnData columnData = reference.getColumnData(row);
+ ColumnData columnData = ref.getColumnData(row);
if (columnData == null)
{
@@ -306,10 +291,43 @@ public abstract class TxnCondition
}
else if (columnData.column().isComplex())
{
- if (reference.isElementSelection() ||
reference.isFieldSelection())
+ if (ref.isElementSelection())
{
Cell<?> cell = (Cell<?>) columnData;
exists = !cell.isTombstone();
+ // Collections don't support NULL but meangingless
null types are supported, so byte[0] is allowed!
+ // This is NULL when touched, so need to still check
each value
+ if (exists)
+ {
+ CollectionType<?> type = (CollectionType<?>)
column.type.unwrap();
+ switch (type.kind)
+ {
+ case MAP:
+ {
+ exists =
!type.nameComparator().isNull(cell.path().get(0));
+ if (exists)
+ exists =
!type.valueComparator().isNull(cell.buffer());
+ }
+ break;
+ case SET:
+ {
+ exists =
!type.nameComparator().isNull(cell.path().get(0));
+ }
+ break;
+ case LIST:
+ {
+ exists =
!type.valueComparator().isNull(cell.buffer());
+ }
+ break;
+ default:
+ throw new
UnsupportedOperationException(type.kind.name());
+ }
+ }
+ }
+ else if (ref.isFieldSelection())
+ {
+ Cell<?> cell = (Cell<?>) columnData;
+ exists = exists(cell, ref.getFieldSelectionType());
}
else
{
@@ -318,26 +336,45 @@ public abstract class TxnCondition
exists = false;
}
}
- else if (reference.isElementSelection())
+ else if (ref.isElementSelection())
{
// This is frozen, so check if the Cell is a tombstone and
that the element is present.
Cell<?> cell = (Cell<?>) columnData;
- ByteBuffer element =
reference.getFrozenCollectionElement(cell);
- exists = element != null && !cell.isTombstone();
+ exists = exists(cell, column.type);
+ if (exists)
+ {
+ ByteBuffer element =
ref.getFrozenCollectionElement(cell);
+ exists =
!ref.getFrozenCollectionElementType().isNull(element);
+ }
}
- else if (reference.isFieldSelection())
+ else if (ref.isFieldSelection())
{
// This is frozen, so check if the Cell is a tombstone and
that the field is present.
Cell<?> cell = (Cell<?>) columnData;
- ByteBuffer fieldValue =
reference.getFrozenFieldValue(cell);
- exists = fieldValue != null && !cell.isTombstone();
+ exists = exists(cell, column.type);
+ if (exists)
+ {
+ ByteBuffer fieldValue = ref.getFrozenFieldValue(cell);
+ exists =
!ref.getFieldSelectionType().isNull(fieldValue);
+ }
}
else
{
Cell<?> cell = (Cell<?>) columnData;
- exists = !cell.isTombstone();
+ exists = exists(cell, column.type);
}
}
+ return exists;
+ }
+
+ @Override
+ public boolean applies(TxnData data)
+ {
+ FilteredPartition partition = reference.getPartition(data);
+ boolean exists = partition != null && !partition.isEmpty();
+
+ if (reference.kind == TxnReference.Kind.COLUMN)
+ exists = applies(partition, exists, reference.asColumn());
switch (kind())
{
@@ -350,24 +387,29 @@ public abstract class TxnCondition
}
}
- private static final ConditionSerializer<Exists> serializer = new
ConditionSerializer<Exists>()
+ private static boolean exists(Cell<?> cell, AbstractType<?> type)
+ {
+ return !cell.isTombstone() && !type.unwrap().isNull(cell.buffer());
+ }
+
+ private static final ConditionSerializer<Exists> serializer = new
ConditionSerializer<>()
{
@Override
- public void serialize(Exists condition, TableMetadatas tables,
DataOutputPlus out, Version version) throws IOException
+ public void serialize(Exists condition, TableMetadatas tables,
DataOutputPlus out) throws IOException
{
- TxnReference.serializer.serialize(condition.reference, tables,
out, version);
+ TxnReference.serializer.serialize(condition.reference, tables,
out);
}
@Override
- public Exists deserialize(TableMetadatas tables, DataInputPlus in,
Version version, Kind kind) throws IOException
+ public Exists deserialize(TableMetadatas tables, DataInputPlus in,
Kind kind) throws IOException
{
- return new Exists(TxnReference.serializer.deserialize(tables,
in, version), kind);
+ return new Exists(TxnReference.serializer.deserialize(tables,
in), kind);
}
@Override
- public long serializedSize(Exists condition, TableMetadatas
tables, Version version)
+ public long serializedSize(Exists condition, TableMetadatas tables)
{
- return
TxnReference.serializer.serializedSize(condition.reference, tables, version);
+ return
TxnReference.serializer.serializedSize(condition.reference, tables);
}
};
}
@@ -417,14 +459,14 @@ public abstract class TxnCondition
private static final ConditionSerializer<ColumnConditionsAdapter>
serializer = new ConditionSerializer<ColumnConditionsAdapter>()
{
@Override
- public void serialize(ColumnConditionsAdapter condition,
TableMetadatas tables, DataOutputPlus out, Version version) throws IOException
+ public void serialize(ColumnConditionsAdapter condition,
TableMetadatas tables, DataOutputPlus out) throws IOException
{
clusteringSerializer.serialize(condition.clustering, out);
serializeCollection(condition.bounds, tables, out,
Bound.serializer);
}
@Override
- public ColumnConditionsAdapter deserialize(TableMetadatas tables,
DataInputPlus in, Version version, Kind ignored) throws IOException
+ public ColumnConditionsAdapter deserialize(TableMetadatas tables,
DataInputPlus in, Kind ignored) throws IOException
{
Clustering<?> clustering =
clusteringSerializer.deserialize(in);
List<Bound> bounds = deserializeList(tables, in,
Bound.serializer);
@@ -432,7 +474,7 @@ public abstract class TxnCondition
}
@Override
- public long serializedSize(ColumnConditionsAdapter condition,
TableMetadatas tables, Version version)
+ public long serializedSize(ColumnConditionsAdapter condition,
TableMetadatas tables)
{
return
clusteringSerializer.serializedSize(condition.clustering)
+ serializedCollectionSize(condition.bounds, tables,
Bound.serializer);
@@ -442,24 +484,28 @@ public abstract class TxnCondition
public static class Value extends TxnCondition
{
- private static final Set<Kind> KINDS = ImmutableSet.of(Kind.EQUAL,
Kind.NOT_EQUAL,
-
Kind.GREATER_THAN, Kind.GREATER_THAN_OR_EQUAL,
- Kind.LESS_THAN,
Kind.LESS_THAN_OR_EQUAL);
+ private static final EnumSet<Kind> KINDS = EnumSet.of(Kind.EQUAL,
Kind.NOT_EQUAL,
+
Kind.GREATER_THAN, Kind.GREATER_THAN_OR_EQUAL,
+ Kind.LESS_THAN,
Kind.LESS_THAN_OR_EQUAL);
- private final TxnReference reference;
+ private final TxnReference.ColumnReference reference;
private final ByteBuffer value;
private final ProtocolVersion version;
- public Value(TxnReference reference, Kind kind, ByteBuffer value,
ProtocolVersion version)
+ public Value(TxnReference.ColumnReference reference, Kind kind,
ByteBuffer value, ProtocolVersion version)
{
super(kind);
Invariants.requireArgument(KINDS.contains(kind), "Kind " + kind +
" cannot be used with a value condition");
- Invariants.requireArgument(reference.selectsColumn(), "Reference "
+ reference + " does not select a column");
this.reference = reference;
this.value = value;
this.version = version;
}
+ public static EnumSet<Kind> supported()
+ {
+ return EnumSet.copyOf(KINDS);
+ }
+
@Override
public boolean equals(Object o)
{
@@ -473,9 +519,7 @@ public abstract class TxnCondition
@Override
public void collect(TableMetadatas.Collector collector)
{
- TableMetadata table = reference.table();
- if (table != null)
- collector.add(table);
+ reference.collect(collector);
}
@Override
@@ -532,33 +576,39 @@ public abstract class TxnCondition
@Override
public boolean applies(TxnData data)
{
- return getBounds(data).appliesTo(reference.getRow(data));
+ Bound bounds = getBounds(data);
+ if (reference.column().type.unwrap().isNull(bounds.value))
+ return false;
+ Row row = reference.getRow(data);
+ if (bounds.isNull(row))
+ return false;
+ return bounds.appliesTo(row);
}
private static final ConditionSerializer<Value> serializer = new
ConditionSerializer<>()
{
@Override
- public void serialize(Value condition, TableMetadatas tables,
DataOutputPlus out, Version version) throws IOException
+ public void serialize(Value condition, TableMetadatas tables,
DataOutputPlus out) throws IOException
{
- TxnReference.serializer.serialize(condition.reference, tables,
out, version);
+ TxnReference.serializer.serialize(condition.reference, tables,
out);
ByteBufferUtil.writeWithVIntLength(condition.value, out);
out.writeUTF(condition.version.name());
}
@Override
- public Value deserialize(TableMetadatas tables, DataInputPlus in,
Version version, Kind kind) throws IOException
+ public Value deserialize(TableMetadatas tables, DataInputPlus in,
Kind kind) throws IOException
{
- TxnReference reference =
TxnReference.serializer.deserialize(tables, in, version);
+ TxnReference.ColumnReference reference =
TxnReference.serializer.deserialize(tables, in).asColumn();
ByteBuffer value = ByteBufferUtil.readWithVIntLength(in);
ProtocolVersion protocolVersion =
ProtocolVersion.valueOf(in.readUTF());
return new Value(reference, kind, value, protocolVersion);
}
@Override
- public long serializedSize(Value condition, TableMetadatas tables,
Version version)
+ public long serializedSize(Value condition, TableMetadatas tables)
{
long size = 0;
- size +=
TxnReference.serializer.serializedSize(condition.reference, tables, version);
+ size +=
TxnReference.serializer.serializedSize(condition.reference, tables);
size +=
ByteBufferUtil.serializedSizeWithVIntLength(condition.value);
size += TypeSizes.sizeof(condition.version.name());
return size;
@@ -625,48 +675,48 @@ public abstract class TxnCondition
private static final ConditionSerializer<BooleanGroup> serializer =
new ConditionSerializer<>()
{
@Override
- public void serialize(BooleanGroup condition, TableMetadatas
tables, DataOutputPlus out, Version version) throws IOException
+ public void serialize(BooleanGroup condition, TableMetadatas
tables, DataOutputPlus out) throws IOException
{
- serializeList(condition.conditions, tables, out, version,
TxnCondition.serializer);
+ serializeList(condition.conditions, tables, out,
TxnCondition.serializer);
}
@Override
- public BooleanGroup deserialize(TableMetadatas tables,
DataInputPlus in, Version version, Kind kind) throws IOException
+ public BooleanGroup deserialize(TableMetadatas tables,
DataInputPlus in, Kind kind) throws IOException
{
- return new BooleanGroup(kind, deserializeList(tables, in,
version, TxnCondition.serializer));
+ return new BooleanGroup(kind, deserializeList(tables, in,
TxnCondition.serializer));
}
@Override
- public long serializedSize(BooleanGroup condition, TableMetadatas
tables, Version version)
+ public long serializedSize(BooleanGroup condition, TableMetadatas
tables)
{
- return serializedListSize(condition.conditions, tables,
version, TxnCondition.serializer);
+ return serializedListSize(condition.conditions, tables,
TxnCondition.serializer);
}
};
}
- public static final ParameterisedVersionedSerializer<TxnCondition,
TableMetadatas, Version> serializer = new ParameterisedVersionedSerializer<>()
+ public static final ParameterisedUnversionedSerializer<TxnCondition,
TableMetadatas> serializer = new ParameterisedUnversionedSerializer<>()
{
@SuppressWarnings("unchecked")
@Override
- public void serialize(TxnCondition condition, TableMetadatas tables,
DataOutputPlus out, Version version) throws IOException
+ public void serialize(TxnCondition condition, TableMetadatas tables,
DataOutputPlus out) throws IOException
{
out.writeUnsignedVInt32(condition.kind.ordinal());
- condition.kind.serializer().serialize(condition, tables, out,
version);
+ condition.kind.serializer().serialize(condition, tables, out);
}
@Override
- public TxnCondition deserialize(TableMetadatas tables, DataInputPlus
in, Version version) throws IOException
+ public TxnCondition deserialize(TableMetadatas tables, DataInputPlus
in) throws IOException
{
Kind kind = Kind.values()[in.readUnsignedVInt32()];
- return kind.serializer().deserialize(tables, in, version, kind);
+ return kind.serializer().deserialize(tables, in, kind);
}
@SuppressWarnings("unchecked")
@Override
- public long serializedSize(TxnCondition condition, TableMetadatas
tables, Version version)
+ public long serializedSize(TxnCondition condition, TableMetadatas
tables)
{
long size = TypeSizes.sizeofUnsignedVInt(condition.kind.ordinal());
- size += condition.kind.serializer().serializedSize(condition,
tables, version);
+ size += condition.kind.serializer().serializedSize(condition,
tables);
return size;
}
};
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
index ea27a398c2..ab774c806a 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
@@ -78,7 +78,7 @@ import static
org.apache.cassandra.utils.ByteBufferUtil.readWithVIntLength;
import static
org.apache.cassandra.utils.ByteBufferUtil.serializedSizeWithVIntLength;
import static org.apache.cassandra.utils.ByteBufferUtil.writeWithVIntLength;
-public class TxnNamedRead extends AbstractSerialized<ReadCommand,
TableMetadatas>
+public class TxnNamedRead extends
AbstractParameterisedVersionedSerialized<ReadCommand, TableMetadatas>
{
@SuppressWarnings("unused")
private static final Logger logger =
LoggerFactory.getLogger(TxnNamedRead.class);
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnReference.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnReference.java
index 59a1c5afb6..13d021930a 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnReference.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnReference.java
@@ -20,141 +20,118 @@ package org.apache.cassandra.service.accord.txn;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.Objects;
+import javax.annotation.Nullable;
+
+import accord.utils.Invariants;
+import accord.utils.TinyEnumSet;
+import accord.utils.UnhandledEnum;
import accord.utils.VIntCoding;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.marshal.ValueAccessor;
import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.rows.AbstractCell;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.CellPath;
import org.apache.cassandra.db.rows.ColumnData;
import org.apache.cassandra.db.rows.ComplexColumnData;
import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.io.ParameterisedVersionedSerializer;
+import org.apache.cassandra.io.ParameterisedUnversionedSerializer;
+import org.apache.cassandra.io.UnversionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.accord.serializers.TableMetadatas;
-import org.apache.cassandra.service.accord.serializers.Version;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
import static org.apache.cassandra.db.marshal.CollectionType.Kind.SET;
import static
org.apache.cassandra.service.accord.AccordSerializers.columnMetadataSerializer;
-public class TxnReference
+public abstract class TxnReference
{
- private final int tuple;
- private final TableMetadata table;
- private final ColumnMetadata column;
- private final CellPath path;
+ public final Kind kind;
+ protected final int tuple;
- public TxnReference(int tuple, TableMetadata table, ColumnMetadata column,
CellPath path)
+ private TxnReference(Kind kind, int tuple)
{
+ this.kind = kind;
this.tuple = tuple;
- this.table = table;
- this.column = column;
- this.path = path;
- }
-
- public TxnReference(int tuple, ColumnMetadata column, TableMetadata table)
- {
- this(tuple, table, column, null);
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- TxnReference reference = (TxnReference) o;
- return tuple == reference.tuple && Objects.equals(column,
reference.column) && Objects.equals(path, reference.path);
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(tuple, column, path);
- }
-
- @Override
- public String toString()
- {
- StringBuilder sb = new StringBuilder().append(tuple);
- if (column != null)
-
sb.append(':').append(column.ksName).append('.').append(column.cfName).append('.').append(column.name.toString());
- if (path != null)
- sb.append(path);
- return sb.toString();
- }
-
- public ColumnMetadata column()
- {
- return column;
- }
-
- public TableMetadata table()
- {
- return table;
}
- public void collect(TableMetadatas.Collector collector)
+ /**
+ * Creates a reference to a "row". This method isn't directly used by the
main logic and instead
+ * exists to aid in testing.
+ */
+ public static RowReference row(int tuple)
{
- collector.add(table);
+ return new RowReference(tuple);
}
- public CellPath path()
+ /**
+ * Creates a reference to a "column". This method isn't directly used by
the main logic and instead
+ * exists to aid in testing.
+ */
+ public static ColumnReference column(int tuple, TableMetadata table,
ColumnMetadata column)
{
- return path;
- }
-
- public boolean selectsColumn()
- {
- return column != null;
+ return column(tuple, table, column, null);
}
- public boolean selectsPath()
+ /**
+ * Creates a reference to a "column". This method isn't directly used by
the main logic and instead
+ * exists to aid in testing.
+ */
+ public static ColumnReference column(int tuple, TableMetadata table,
ColumnMetadata column, CellPath path)
{
- return selectsColumn() && path != null;
+ Invariants.nonNull(table, "table is null");
+ Invariants.nonNull(column, "column is null");
+ return new ColumnReference(tuple, table, column, path);
}
- public boolean isElementSelection()
+ public static TxnReference columnOrRow(int tuple,
+ @Nullable TableMetadata table,
+ @Nullable ColumnMetadata column,
+ @Nullable CellPath path)
{
- return selectsPath() && column.type.isCollection();
+ if (column == null)
+ {
+ Invariants.require(table == null, "Column is null but table isn't;
unknown reference type");
+ Invariants.require(path == null, "Column is null but path isn't;
unknown reference type");
+ return row(tuple);
+ }
+ return column(tuple, table, column, path);
}
- public boolean isFieldSelection()
- {
- return selectsPath() && column.type.isUDT();
- }
+ public abstract void collect(TableMetadatas.Collector collector);
- public ByteBuffer getPartitionKey(TxnData data)
+ public RowReference asRow()
{
- FilteredPartition partition = getPartition(data);
- if (partition == null) return null;
- return partition.metadata().partitionKeyColumns().size() == 1
- ? partition.partitionKey().getKey()
- : ((CompositeType)
partition.metadata().partitionKeyType).split(partition.partitionKey().getKey())[column.position()];
+ Invariants.require(kind == Kind.ROW, "Expected to be a row but was a
%s", kind);
+ return (RowReference) this;
}
- public ByteBuffer getClusteringKey(TxnData data)
+ public ColumnReference asColumn()
{
- Row row = getRow(data);
- if (row == null)
- return null;
- return row.clustering().bufferAt(column.position());
+ Invariants.require(kind == Kind.COLUMN, "Expected to be a column but
was a %s", kind);
+ return (ColumnReference) this;
}
public TxnDataKeyValue getPartition(TxnData data)
{
return (TxnDataKeyValue)data.get(tuple);
}
-
+
public Row getRow(TxnData data)
{
FilteredPartition partition = getPartition(data);
@@ -163,7 +140,7 @@ public class TxnReference
public Row getRow(FilteredPartition partition)
{
- if (column != null && column.isStatic())
+ if (kind == Kind.COLUMN && asColumn().column.isStatic())
return partition.staticRow();
assert partition.rowCount() <= 1 : "Multi-row references are not
allowed";
if (partition.rowCount() == 0)
@@ -171,184 +148,477 @@ public class TxnReference
return partition.getAtIdx(0);
}
- public ColumnData getColumnData(Row row)
+ public static final UnversionedSerializer<RowReference> rowSerializer =
new UnversionedSerializer<>()
{
- if (column.isComplex() && path == null)
- return row.getComplexColumnData(column);
+ @Override
+ public void serialize(RowReference reference, DataOutputPlus out)
throws IOException
+ {
+ out.writeUnsignedVInt32(reference.tuple);
+ }
- if (path != null && column.type.isMultiCell())
+ @Override
+ public RowReference deserialize(DataInputPlus in) throws IOException
{
- if (column.type.isCollection())
- {
- CollectionType<?> collectionType = (CollectionType<?>)
column.type;
+ int tuple = in.readUnsignedVInt32();
+ return row(tuple);
+ }
- if (collectionType.kind == CollectionType.Kind.LIST)
- return
row.getComplexColumnData(column).getCellByIndex(ByteBufferUtil.toInt(path.get(0)));
- }
+ @Override
+ public long serializedSize(RowReference reference)
+ {
+ return VIntCoding.sizeOfUnsignedVInt(reference.tuple);
+ }
+ };
- return row.getCell(column, path);
+ public static final ParameterisedUnversionedSerializer<ColumnReference,
TableMetadatas> columnSerializer = new
ParameterisedUnversionedSerializer<ColumnReference, TableMetadatas>()
+ {
+ @Override
+ public void serialize(ColumnReference reference, TableMetadatas
tables, DataOutputPlus out) throws IOException
+ {
+ out.writeUnsignedVInt32(reference.tuple);
+ tables.serialize(reference.table, out);
+ columnMetadataSerializer.serialize(reference.column,
reference.table, out);
+ out.writeBoolean(reference.path != null);
+ if (reference.path != null)
+ CollectionType.cellPathSerializer.serialize(reference.path,
out);
}
- return row.getCell(column);
- }
+ @Override
+ public ColumnReference deserialize(TableMetadatas tables,
DataInputPlus in) throws IOException
+ {
+ int tuple = in.readUnsignedVInt32();
+ TableMetadata table = tables.deserialize(in);
+ ColumnMetadata column =
columnMetadataSerializer.deserialize(table, in);
+ CellPath path = in.readBoolean() ?
CollectionType.cellPathSerializer.deserialize(in) : null;
+ return TxnReference.column(tuple, table, column, path);
+ }
- public ColumnData getColumnData(TxnData data)
- {
- Row row = getRow(data);
- return row != null ? getColumnData(row) : null;
- }
+ @Override
+ public long serializedSize(ColumnReference reference, TableMetadatas
tables)
+ {
+ long size = 0;
+ size += VIntCoding.sizeOfUnsignedVInt(reference.tuple);
+ size += tables.serializedSize(reference.table);
+ size += columnMetadataSerializer.serializedSize(reference.column,
reference.table);
+ size += TypeSizes.BOOL_SIZE;
+ if (reference.path != null)
+ size +=
CollectionType.cellPathSerializer.serializedSize(reference.path);
+ return size;
+ }
+ };
- public ByteBuffer getFrozenCollectionElement(Cell<?> collection)
- {
- CollectionType<?> collectionType = (CollectionType<?>) column.type;
- return
collectionType.getSerializer().getSerializedValue(collection.buffer(),
path.get(0), collectionType.nameComparator());
- }
- public ByteBuffer getFrozenFieldValue(Cell<?> udt)
+ static final ParameterisedUnversionedSerializer<TxnReference,
TableMetadatas> serializer = new ParameterisedUnversionedSerializer<>()
{
- UserType userType = (UserType) column.type;
- int field = ByteBufferUtil.getUnsignedShort(path.get(0), 0);
- return userType.unpack(udt.buffer()).get(field);
- }
+ @Override
+ public void serialize(TxnReference reference, TableMetadatas tables,
DataOutputPlus out) throws IOException
+ {
+ out.writeUnsignedVInt32(TinyEnumSet.encode(reference.kind));
+ switch (reference.kind)
+ {
+ case ROW:
+ rowSerializer.serialize(reference.asRow(), out);
+ break;
+ case COLUMN:
+ columnSerializer.serialize(reference.asColumn(), tables,
out);
+ break;
+ default:
+ throw new UnhandledEnum(reference.kind);
+ }
+ }
+
+ @Override
+ public TxnReference deserialize(TableMetadatas tables, DataInputPlus
in) throws IOException
+ {
+ TinyEnumSet<TxnReference.Kind> kind = new
TinyEnumSet<>(in.readUnsignedVInt32());
+ if (kind.contains(Kind.ROW)) return rowSerializer.deserialize(in);
+ if (kind.contains(Kind.COLUMN)) return
columnSerializer.deserialize(tables, in);
+ throw Invariants.illegalArgument("Unexpected kind: " + kind);
+ }
+
+ @Override
+ public long serializedSize(TxnReference reference, TableMetadatas
tables)
+ {
+ long size =
VIntCoding.sizeOfUnsignedVInt(TinyEnumSet.encode(reference.kind));
+ switch (reference.kind)
+ {
+ case ROW:
+ size += rowSerializer.serializedSize(reference.asRow());
+ break;
+ case COLUMN:
+ size +=
columnSerializer.serializedSize(reference.asColumn(), tables);
+ break;
+ default:
+ throw new UnhandledEnum(reference.kind);
+ }
+ return size;
+ }
+ };
- public AbstractType<?> getFieldSelectionType()
+ public enum Kind { ROW, COLUMN }
+
+ public static class RowReference extends TxnReference
{
- assert isFieldSelection() : "No field selection type exists";
- UserType userType = (UserType) column.type;
- int field = ByteBufferUtil.getUnsignedShort(path.get(0), 0);
- return userType.fieldType(field);
+ public RowReference(int tuple)
+ {
+ super(Kind.ROW, tuple);
+ }
+
+ @Override
+ public void collect(TableMetadatas.Collector collector)
+ {
+ // no-op
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (o == null || getClass() != o.getClass()) return false;
+ RowReference that = (RowReference) o;
+ return tuple == that.tuple;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(tuple);
+ }
+
+ @Override
+ public String toString()
+ {
+ return Integer.toString(tuple);
+ }
}
- public ByteBuffer toByteBuffer(TxnData data, AbstractType<?> receiver)
+ public static class ColumnReference extends TxnReference
{
- // TODO: confirm all references can be satisfied as part of the txn
condition
- AbstractType<?> type = column().type;
+ private final TableMetadata table;
+ private final ColumnMetadata column;
+ @Nullable
+ private final CellPath path;
+
+ public ColumnReference(int tuple, TableMetadata table, ColumnMetadata
column, @Nullable CellPath path)
+ {
+ super(Kind.COLUMN, tuple);
+ this.table = table;
+ this.column = column;
+ this.path = path;
+ }
+
+ public ColumnMetadata column()
+ {
+ return column;
+ }
+
+ public TableMetadata table()
+ {
+ return table;
+ }
+
+ @Nullable
+ public CellPath path()
+ {
+ return path;
+ }
+
+ public boolean selectsPath()
+ {
+ return path != null;
+ }
+
+ public boolean isElementSelection()
+ {
+ return selectsPath() && column.type.isCollection();
+ }
+
+ public boolean isFieldSelection()
+ {
+ return selectsPath() && column.type.isUDT();
+ }
+
+ public ByteBuffer getPartitionKey(TxnData data)
+ {
+ FilteredPartition partition = getPartition(data);
+ if (partition == null) return null;
+ return partition.metadata().partitionKeyColumns().size() == 1
+ ? partition.partitionKey().getKey()
+ : ((CompositeType)
partition.metadata().partitionKeyType).split(partition.partitionKey().getKey())[column.position()];
+ }
+
+ @Override
+ public void collect(TableMetadatas.Collector collector)
+ {
+ collector.add(table);
+ }
+
+ public ByteBuffer getClusteringKey(TxnData data)
+ {
+ Row row = getRow(data);
+ if (row == null)
+ return null;
+ return row.clustering().bufferAt(column.position());
+ }
- // Modify the type we'll check if the reference is to a collection
element.
- if (selectsPath())
+ public ColumnData getColumnData(Row row)
{
- if (type.isCollection())
+ if (column.isClusteringColumn())
+ return new ClusteringColumnData(column,
row.clustering().bufferAt(column.position()));
+ if (column.isComplex() && path == null)
+ return row.getComplexColumnData(column);
+
+ if (path != null && column.type.isMultiCell())
{
- CollectionType<?> collectionType = (CollectionType<?>) type;
- type = collectionType.kind == SET ?
collectionType.nameComparator() : collectionType.valueComparator();
+ if (column.type.isCollection())
+ {
+ CollectionType<?> collectionType = (CollectionType<?>)
column.type;
+
+ if (collectionType.kind == CollectionType.Kind.LIST)
+ return
row.getComplexColumnData(column).getCellByIndex(ByteBufferUtil.toInt(path.get(0)));
+ }
+
+ return row.getCell(column, path);
}
- else if (type.isUDT())
- type = getFieldSelectionType();
+
+ return row.getCell(column);
+ }
+
+ public ColumnData getColumnData(TxnData data)
+ {
+ Row row = getRow(data);
+ return row != null ? getColumnData(row) : null;
}
- // Account for frozen collection and reversed clustering key
references:
- AbstractType<?> receiveType = type.isFrozenCollection() ?
receiver.freeze().unwrap() : receiver.unwrap();
- if (!(receiveType == type.unwrap()))
- throw new IllegalArgumentException("Receiving type " + receiveType
+ " does not match " + type.unwrap());
+ public ByteBuffer getFrozenCollectionElement(Cell<?> collection)
+ {
+ CollectionType<?> collectionType = (CollectionType<?>)
column.type.unwrap();
+ return
collectionType.getSerializer().getSerializedValue(collection.buffer(),
path.get(0), collectionType.nameComparator());
+ }
- if (column().isPartitionKey())
- return getPartitionKey(data);
- else if (column().isClusteringColumn())
- return getClusteringKey(data);
+ public AbstractType<?> getFrozenCollectionElementType()
+ {
+ CollectionType<?> type = (CollectionType<?>) column.type.unwrap();
+ if (type instanceof ListType) return Int32Type.instance; // by
index is the only thing supported right now; see getFrozenCollectionElement
+ return type.nameComparator();
+ }
- ColumnData columnData = getColumnData(data);
+ public ByteBuffer getFrozenFieldValue(Cell<?> udt)
+ {
+ UserType userType = (UserType) column.type.unwrap();
+ int field = ByteBufferUtil.getUnsignedShort(path.get(0), 0);
+ List<ByteBuffer> tuple = userType.unpack(udt.buffer());
+ return tuple.size() > field ? tuple.get(field) : null;
+ }
- if (columnData == null)
- return null;
+ public AbstractType<?> getFieldSelectionType()
+ {
+ assert isFieldSelection() : "No field selection type exists";
+ UserType userType = (UserType) column.type;
+ int field = ByteBufferUtil.getUnsignedShort(path.get(0), 0);
+ return userType.fieldType(field);
+ }
- if (selectsComplex())
+ public ByteBuffer toByteBuffer(TxnData data, AbstractType<?> receiver)
{
- ComplexColumnData complex = (ComplexColumnData) columnData;
+ // TODO: confirm all references can be satisfied as part of the
txn condition
+ AbstractType<?> type = column().type;
+
+ // Modify the type we'll check if the reference is to a collection
element.
+ if (selectsPath())
+ {
+ if (type.isCollection())
+ {
+ CollectionType<?> collectionType = (CollectionType<?>)
type;
+ type = collectionType.kind == SET ?
collectionType.nameComparator() : collectionType.valueComparator();
+ }
+ else if (type.isUDT())
+ type = getFieldSelectionType();
+ }
+
+ // Account for frozen collection and reversed clustering key
references:
+ AbstractType<?> receiveType = type.isFrozenCollection() ?
receiver.freeze().unwrap() : receiver.unwrap();
+ if (!(receiveType == type.unwrap()))
+ throw new IllegalArgumentException("Receiving type " +
receiveType + " does not match " + type.unwrap());
+
+ if (column().isPartitionKey())
+ return getPartitionKey(data);
+ else if (column().isClusteringColumn())
+ return getClusteringKey(data);
+
+ ColumnData columnData = getColumnData(data);
+
+ if (columnData == null)
+ return null;
- if (type instanceof CollectionType)
+ if (selectsComplex())
{
- CollectionType<?> col = (CollectionType<?>) type;
- return col.serializeForNativeProtocol(complex.iterator());
+ ComplexColumnData complex = (ComplexColumnData) columnData;
+
+ if (type instanceof CollectionType)
+ {
+ CollectionType<?> col = (CollectionType<?>) type;
+ return col.serializeForNativeProtocol(complex.iterator());
+ }
+ else if (type instanceof UserType)
+ {
+ UserType udt = (UserType) type;
+ return udt.serializeForNativeProtocol(complex.iterator());
+ }
+
+ throw new UnsupportedOperationException("Unsupported complex
type: " + type);
}
- else if (type instanceof UserType)
+ else if (selectsFrozenCollectionElement())
{
- UserType udt = (UserType) type;
- return udt.serializeForNativeProtocol(complex.iterator());
+ // If a path is selected for a non-frozen collection, the
element will already be materialized.
+ return getFrozenCollectionElement((Cell<?>) columnData);
+ }
+ else if (selectsFrozenUDTField())
+ {
+ return getFrozenFieldValue((Cell<?>) columnData);
}
- throw new UnsupportedOperationException("Unsupported complex type:
" + type);
+ Cell<?> cell = (Cell<?>) columnData;
+ return selectsSetElement() ? cell.path().get(0) : cell.buffer();
}
- else if (selectsFrozenCollectionElement())
+
+ private boolean selectsComplex()
{
- // If a path is selected for a non-frozen collection, the element
will already be materialized.
- return getFrozenCollectionElement((Cell<?>) columnData);
+ return column.isComplex() && path == null;
}
- else if (selectsFrozenUDTField())
+
+ private boolean selectsSetElement()
{
- return getFrozenFieldValue((Cell<?>) columnData);
+ return selectsPath() && column.type instanceof SetType;
}
- Cell<?> cell = (Cell<?>) columnData;
- return selectsSetElement() ? cell.path().get(0) : cell.buffer();
- }
+ private boolean selectsFrozenCollectionElement()
+ {
+ return selectsPath() && column.type.isFrozenCollection();
+ }
- private boolean selectsComplex()
- {
- return column.isComplex() && path == null;
- }
+ private boolean selectsFrozenUDTField()
+ {
+ return selectsPath() && column.type.isUDT() &&
!column.type.isMultiCell();
+ }
- private boolean selectsSetElement()
- {
- return selectsPath() && column.type instanceof SetType;
- }
+ @Override
+ public boolean equals(Object o)
+ {
+ if (o == null || getClass() != o.getClass()) return false;
+ ColumnReference that = (ColumnReference) o;
+ return tuple == that.tuple && Objects.equals(table, that.table) &&
Objects.equals(column, that.column) && Objects.equals(path, that.path);
+ }
- private boolean selectsFrozenCollectionElement()
- {
- return selectsPath() && column.type.isFrozenCollection();
- }
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(tuple, table, column, path);
+ }
- private boolean selectsFrozenUDTField()
- {
- return selectsPath() && column.type.isUDT() &&
!column.type.isMultiCell();
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder().append(tuple);
+
sb.append(':').append(column.ksName).append('.').append(column.cfName).append('.').append(column.name.toString());
+ if (path != null)
+ sb.append('#').append(path);
+ return sb.toString();
+ }
}
- static final ParameterisedVersionedSerializer<TxnReference,
TableMetadatas, Version> serializer = new ParameterisedVersionedSerializer<>()
+ private static class ClusteringColumnData extends AbstractCell<ByteBuffer>
{
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new
ClusteringColumnData(null, null));
+ private final ByteBuffer value;
+
+ private ClusteringColumnData(ColumnMetadata column, ByteBuffer value)
+ {
+ super(column);
+ this.value = value;
+ }
+
@Override
- public void serialize(TxnReference reference, TableMetadatas tables,
DataOutputPlus out, Version version) throws IOException
+ public ByteBuffer value()
{
- out.writeUnsignedVInt32(reference.tuple);
- out.writeBoolean(reference.column != null);
- if (reference.column != null)
- {
- tables.serialize(reference.table, out);
- columnMetadataSerializer.serialize(reference.column,
reference.table, out);
- }
- out.writeBoolean(reference.path != null);
- if (reference.path != null)
- CollectionType.cellPathSerializer.serialize(reference.path,
out);
+ return value;
}
@Override
- public TxnReference deserialize(TableMetadatas tables, DataInputPlus
in, Version version) throws IOException
+ public ValueAccessor<ByteBuffer> accessor()
{
- int name = in.readUnsignedVInt32();
- TableMetadata table = null;
- ColumnMetadata column = null;
- if (in.readBoolean())
- {
- table = tables.deserialize(in);
- column = columnMetadataSerializer.deserialize(table, in);
- }
- CellPath path = in.readBoolean() ?
CollectionType.cellPathSerializer.deserialize(in) : null;
- return new TxnReference(name, table, column, path);
+ return ByteBufferAccessor.instance;
}
@Override
- public long serializedSize(TxnReference reference, TableMetadatas
tables, Version version)
+ public boolean isTombstone()
{
- long size = 0;
- size += VIntCoding.sizeOfUnsignedVInt(reference.tuple);
- size += TypeSizes.BOOL_SIZE;
- if (reference.column != null)
- {
- size += tables.serializedSize(reference.table);
- size +=
columnMetadataSerializer.serializedSize(reference.column, reference.table);
- }
- size += TypeSizes.BOOL_SIZE;
- if (reference.path != null)
- size +=
CollectionType.cellPathSerializer.serializedSize(reference.path);
- return size;
+ return false;
}
- };
+
+ @Override
+ public long unsharedHeapSizeExcludingData()
+ {
+ return EMPTY_SIZE;
+ }
+
+ @Override
+ public long unsharedHeapSize()
+ {
+ return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(value);
+ }
+
+ @Override
+ public long timestamp()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int ttl()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CellPath path()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Cell<?> withUpdatedColumn(ColumnMetadata newColumn)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Cell<?> withUpdatedValue(ByteBuffer newValue)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Cell<?> withUpdatedTimestamp(long newTimestamp)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Cell<?> withUpdatedTimestampAndLocalDeletionTime(long
newTimestamp, long newLocalDeletionTime)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Cell<?> withSkippedValue()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected int localDeletionTimeAsUnsignedInt()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
}
diff --git
a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java
index 21c22b5a57..c453f612b2 100644
---
a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java
+++
b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperation.java
@@ -43,14 +43,13 @@ import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.db.marshal.TupleType;
import org.apache.cassandra.db.marshal.UserType;
import org.apache.cassandra.db.rows.CellPath;
-import org.apache.cassandra.io.ParameterisedVersionedSerializer;
+import org.apache.cassandra.io.ParameterisedUnversionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.accord.AccordSerializers;
import org.apache.cassandra.service.accord.serializers.TableMetadatas;
-import org.apache.cassandra.service.accord.serializers.Version;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.apache.cassandra.db.marshal.CollectionType.Kind.MAP;
@@ -265,15 +264,15 @@ public class TxnReferenceOperation
return new Constants.Value(bytes);
}
- static final ParameterisedVersionedSerializer<TxnReferenceOperation,
TableMetadatas, Version> serializer = new ParameterisedVersionedSerializer<>()
+ static final ParameterisedUnversionedSerializer<TxnReferenceOperation,
TableMetadatas> serializer = new ParameterisedUnversionedSerializer<>()
{
@Override
- public void serialize(TxnReferenceOperation operation, TableMetadatas
tables, DataOutputPlus out, Version version) throws IOException
+ public void serialize(TxnReferenceOperation operation, TableMetadatas
tables, DataOutputPlus out) throws IOException
{
out.writeByte(operation.kind.id);
tables.serialize(operation.table, out);
columnMetadataSerializer.serialize(operation.receiver,
operation.table, out);
- TxnReferenceValue.serializer.serialize(operation.value, tables,
out, version);
+ TxnReferenceValue.serializer.serialize(operation.value, tables,
out);
out.writeBoolean(operation.key != null);
if (operation.key != null)
@@ -285,24 +284,24 @@ public class TxnReferenceOperation
}
@Override
- public TxnReferenceOperation deserialize(TableMetadatas tables,
DataInputPlus in, Version version) throws IOException
+ public TxnReferenceOperation deserialize(TableMetadatas tables,
DataInputPlus in) throws IOException
{
Kind kind = Kind.from(in.readByte());
TableMetadata table = tables.deserialize(in);
ColumnMetadata receiver =
columnMetadataSerializer.deserialize(table, in);
- TxnReferenceValue value =
TxnReferenceValue.serializer.deserialize(tables, in, version);
+ TxnReferenceValue value =
TxnReferenceValue.serializer.deserialize(tables, in);
ByteBuffer key = in.readBoolean() ?
ByteBufferUtil.readWithVIntLength(in) : null;
ByteBuffer field = in.readBoolean() ?
ByteBufferUtil.readWithVIntLength(in) : null;
return new TxnReferenceOperation(kind, receiver, table, key,
field, value);
}
@Override
- public long serializedSize(TxnReferenceOperation operation,
TableMetadatas tables, Version version)
+ public long serializedSize(TxnReferenceOperation operation,
TableMetadatas tables)
{
long size = Byte.BYTES;
size += tables.serializedSize(operation.table);
size +=
columnMetadataSerializer.serializedSize(operation.receiver, operation.table);
- size +=
TxnReferenceValue.serializer.serializedSize(operation.value, tables, version);
+ size +=
TxnReferenceValue.serializer.serializedSize(operation.value, tables);
if (operation.key != null)
size +=
ByteBufferUtil.serializedSizeWithVIntLength(operation.key);
diff --git
a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java
index 679106be11..7170d14503 100644
---
a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java
+++
b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java
@@ -100,8 +100,8 @@ public class TxnReferenceOperations
out.writeBoolean(operations.clustering != null);
if (operations.clustering != null)
Clustering.serializer.serialize(operations.clustering, out,
version.messageVersion(), operations.metadata.comparator.subtypes());
- serializeList(operations.regulars, tables, out, version,
TxnReferenceOperation.serializer);
- serializeList(operations.statics, tables, out, version,
TxnReferenceOperation.serializer);
+ serializeList(operations.regulars, tables, out,
TxnReferenceOperation.serializer);
+ serializeList(operations.statics, tables, out,
TxnReferenceOperation.serializer);
}
@Override
@@ -112,8 +112,8 @@ public class TxnReferenceOperations
TableMetadata metadata = tables.deserialize(in);
Clustering<?> clustering = in.readBoolean() ?
Clustering.serializer.deserialize(in, version.messageVersion(),
metadata.comparator.subtypes()) : null;
- return new TxnReferenceOperations(metadata, clustering,
deserializeList(tables, in, version, TxnReferenceOperation.serializer),
- deserializeList(tables, in,
version, TxnReferenceOperation.serializer));
+ return new TxnReferenceOperations(metadata, clustering,
deserializeList(tables, in, TxnReferenceOperation.serializer),
+ deserializeList(tables, in,
TxnReferenceOperation.serializer));
}
@Override
@@ -126,8 +126,8 @@ public class TxnReferenceOperations
size += TypeSizes.BOOL_SIZE;
if (operations.clustering != null)
size +=
Clustering.serializer.serializedSize(operations.clustering,
version.messageVersion(), operations.metadata.comparator.subtypes());
- size += serializedListSize(operations.regulars, tables, version,
TxnReferenceOperation.serializer);
- size += serializedListSize(operations.statics, tables, version,
TxnReferenceOperation.serializer);
+ size += serializedListSize(operations.regulars, tables,
TxnReferenceOperation.serializer);
+ size += serializedListSize(operations.statics, tables,
TxnReferenceOperation.serializer);
return size;
}
diff --git
a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceValue.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceValue.java
index 7dbcea1c93..9ff76c026c 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceValue.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceValue.java
@@ -22,22 +22,23 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
+import javax.annotation.Nullable;
+
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.ParameterisedVersionedSerializer;
+import org.apache.cassandra.io.ParameterisedUnversionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.service.accord.serializers.TableMetadatas;
-import org.apache.cassandra.service.accord.serializers.Version;
import org.apache.cassandra.utils.ByteBufferUtil;
public abstract class TxnReferenceValue
{
private interface Serializer<T extends TxnReferenceValue>
{
- void serialize(T t, TableMetadatas tables, DataOutputPlus out, Version
version) throws IOException;
- T deserialize(TableMetadatas tables, DataInputPlus in, Version
version, Kind kind) throws IOException;
- long serializedSize(T t, TableMetadatas tables, Version version);
+ void serialize(T t, TableMetadatas tables, DataOutputPlus out) throws
IOException;
+ T deserialize(TableMetadatas tables, DataInputPlus in, Kind kind)
throws IOException;
+ long serializedSize(T t, TableMetadatas tables);
}
enum Kind
@@ -60,9 +61,10 @@ public abstract class TxnReferenceValue
public static class Constant extends TxnReferenceValue
{
+ @Nullable
private final ByteBuffer value;
- public Constant(ByteBuffer value)
+ public Constant(@Nullable ByteBuffer value)
{
this.value = value;
}
@@ -78,7 +80,7 @@ public abstract class TxnReferenceValue
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Constant constant = (Constant) o;
- return value.equals(constant.value);
+ return Objects.equals(value, constant.value);
}
@Override
@@ -90,7 +92,7 @@ public abstract class TxnReferenceValue
@Override
public String toString()
{
- return "Constant=" + ByteBufferUtil.bytesToHex(value);
+ return "Constant=" + value == null ? "null" :
ByteBufferUtil.bytesToHex(value);
}
@Override
@@ -113,30 +115,35 @@ public abstract class TxnReferenceValue
private static final Serializer<Constant> serializer = new
Serializer<Constant>()
{
@Override
- public void serialize(Constant constant, TableMetadatas tables,
DataOutputPlus out, Version version) throws IOException
+ public void serialize(Constant constant, TableMetadatas tables,
DataOutputPlus out) throws IOException
{
- ByteBufferUtil.writeWithVIntLength(constant.value, out);
+ out.writeBoolean(constant.value != null);
+ if (constant.value != null)
+ ByteBufferUtil.writeWithVIntLength(constant.value, out);
}
@Override
- public Constant deserialize(TableMetadatas tables, DataInputPlus
in, Version version, Kind kind) throws IOException
+ public Constant deserialize(TableMetadatas tables, DataInputPlus
in, Kind kind) throws IOException
{
- return new Constant(ByteBufferUtil.readWithVIntLength(in));
+ return new Constant(in.readBoolean() ?
ByteBufferUtil.readWithVIntLength(in) : null);
}
@Override
- public long serializedSize(Constant constant, TableMetadatas
tables, Version version)
+ public long serializedSize(Constant constant, TableMetadatas
tables)
{
- return
ByteBufferUtil.serializedSizeWithVIntLength(constant.value);
+ long size = TypeSizes.sizeof(constant.value != null);
+ if (constant.value != null)
+ size +=
ByteBufferUtil.serializedSizeWithVIntLength(constant.value);
+ return size;
}
};
}
public static class Substitution extends TxnReferenceValue
{
- private final TxnReference reference;
+ private final TxnReference.ColumnReference reference;
- public Substitution(TxnReference reference)
+ public Substitution(TxnReference.ColumnReference reference)
{
this.reference = reference;
}
@@ -183,47 +190,47 @@ public abstract class TxnReferenceValue
private static final Serializer<Substitution> serializer = new
Serializer<>()
{
@Override
- public void serialize(Substitution substitution, TableMetadatas
tables, DataOutputPlus out, Version version) throws IOException
+ public void serialize(Substitution substitution, TableMetadatas
tables, DataOutputPlus out) throws IOException
{
- TxnReference.serializer.serialize(substitution.reference,
tables, out, version);
+
TxnReference.columnSerializer.serialize(substitution.reference, tables, out);
}
@Override
- public Substitution deserialize(TableMetadatas tables,
DataInputPlus in, Version version, Kind kind) throws IOException
+ public Substitution deserialize(TableMetadatas tables,
DataInputPlus in, Kind kind) throws IOException
{
- return new
Substitution(TxnReference.serializer.deserialize(tables, in, version));
+ return new
Substitution(TxnReference.columnSerializer.deserialize(tables, in));
}
@Override
- public long serializedSize(Substitution substitution,
TableMetadatas tables, Version version)
+ public long serializedSize(Substitution substitution,
TableMetadatas tables)
{
- return
TxnReference.serializer.serializedSize(substitution.reference, tables, version);
+ return
TxnReference.columnSerializer.serializedSize(substitution.reference, tables);
}
};
}
- static final ParameterisedVersionedSerializer<TxnReferenceValue,
TableMetadatas, Version> serializer = new ParameterisedVersionedSerializer<>()
+ static final ParameterisedUnversionedSerializer<TxnReferenceValue,
TableMetadatas> serializer = new ParameterisedUnversionedSerializer<>()
{
@SuppressWarnings("unchecked")
@Override
- public void serialize(TxnReferenceValue value, TableMetadatas tables,
DataOutputPlus out, Version version) throws IOException
+ public void serialize(TxnReferenceValue value, TableMetadatas tables,
DataOutputPlus out) throws IOException
{
out.writeUnsignedVInt32(value.kind().ordinal());
- value.kind().serializer.serialize(value, tables, out, version);
+ value.kind().serializer.serialize(value, tables, out);
}
@Override
- public TxnReferenceValue deserialize(TableMetadatas tables,
DataInputPlus in, Version version) throws IOException
+ public TxnReferenceValue deserialize(TableMetadatas tables,
DataInputPlus in) throws IOException
{
Kind kind = Kind.values()[in.readUnsignedVInt32()];
- return kind.serializer.deserialize(tables, in, version, kind);
+ return kind.serializer.deserialize(tables, in, kind);
}
@SuppressWarnings("unchecked")
@Override
- public long serializedSize(TxnReferenceValue value, TableMetadatas
tables, Version version)
+ public long serializedSize(TxnReferenceValue value, TableMetadatas
tables)
{
- return TypeSizes.sizeofUnsignedVInt(value.kind().ordinal()) +
value.kind().serializer.serializedSize(value, tables, version);
+ return TypeSizes.sizeofUnsignedVInt(value.kind().ordinal()) +
value.kind().serializer.serializedSize(value, tables);
}
};
}
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
index 1fdc0e54f9..25e85fd0bc 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
@@ -77,7 +77,7 @@ public class TxnUpdate extends AccordUpdate
final TableMetadatas tables;
private final Keys keys;
private final ByteBuffer[] fragments;
- private final AbstractSerialized<TxnCondition, TableMetadatas> condition;
+ private final SerializedTxnCondition condition;
@Nullable
private final ConsistencyLevel cassandraCommitCL;
@@ -107,7 +107,7 @@ public class TxnUpdate extends AccordUpdate
this.preserveTimestamps = preserveTimestamps;
}
- private TxnUpdate(TableMetadatas tables, Keys keys, ByteBuffer[]
fragments, AbstractSerialized<TxnCondition, TableMetadatas> condition,
ConsistencyLevel cassandraCommitCL, boolean preserveTimestamps)
+ private TxnUpdate(TableMetadatas tables, Keys keys, ByteBuffer[]
fragments, SerializedTxnCondition condition, ConsistencyLevel
cassandraCommitCL, boolean preserveTimestamps)
{
this.tables = tables;
this.keys = keys;
@@ -261,14 +261,14 @@ public class TxnUpdate extends AccordUpdate
return updates;
}
- public static final AccordUpdateSerializer<TxnUpdate> serializer = new
AccordUpdateSerializer<TxnUpdate>()
+ public static final AccordUpdateSerializer<TxnUpdate> serializer = new
AccordUpdateSerializer<>()
{
@Override
public void serialize(TxnUpdate update, TableMetadatasAndKeys
tablesAndKeys, DataOutputPlus out, Version version) throws IOException
{
out.writeByte(update.preserveTimestamps ? FLAG_PRESERVE_TIMESTAMPS
: 0);
tablesAndKeys.serializeKeys(update.keys, out);
- writeWithVIntLength(update.condition.bytes(tablesAndKeys.tables,
version), out);
+ writeWithVIntLength(update.condition.bytes(), out);
serializeArray(update.fragments, out,
ByteBufferUtil.byteBufferSerializer);
serializeNullable(update.cassandraCommitCL, out,
consistencyLevelSerializer);
}
@@ -290,7 +290,7 @@ public class TxnUpdate extends AccordUpdate
{
long size = 1; // flags
size += tablesAndKeys.serializedKeysSize(update.keys);
- size +=
serializedSizeWithVIntLength(update.condition.bytes(tablesAndKeys.tables,
version));
+ size += serializedSizeWithVIntLength(update.condition.bytes());
size += serializedArraySize(update.fragments,
ByteBufferUtil.byteBufferSerializer);
size += serializedNullableSize(update.cassandraCommitCL,
consistencyLevelSerializer);
return size;
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
index abd27ae588..1df846ed2c 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
@@ -85,7 +85,7 @@ public class TxnWrite extends
AbstractKeySorted<TxnWrite.Update> implements Writ
private static final long EMPTY_SIZE =
ObjectSizes.measure(EMPTY_CONDITION_FAILED);
- public static class Update extends AbstractSerialized<PartitionUpdate,
TableMetadatas>
+ public static class Update extends
AbstractParameterisedVersionedSerialized<PartitionUpdate, TableMetadatas>
{
private static final long EMPTY_SIZE = ObjectSizes.measure(new
Update(null, 0, ByteBufferUtil.EMPTY_BYTE_BUFFER));
public final PartitionKey key;
diff --git a/src/java/org/apache/cassandra/utils/CollectionSerializers.java
b/src/java/org/apache/cassandra/utils/CollectionSerializers.java
index 81173bdf1e..d4718096a3 100644
--- a/src/java/org/apache/cassandra/utils/CollectionSerializers.java
+++ b/src/java/org/apache/cassandra/utils/CollectionSerializers.java
@@ -435,6 +435,15 @@ public class CollectionSerializers
return size;
}
+ public static <V, P, L extends List<V>> long serializedListSize(L values,
P p, AsymmetricParameterisedUnversionedSerializer<V, P, ?> valueSerializer)
+ {
+ int items = values.size();
+ long size = sizeofUnsignedVInt(items);
+ for (int i = 0 ; i < items ; ++i)
+ size += valueSerializer.serializedSize(values.get(i), p);
+ return size;
+ }
+
public static <V, P, L extends List<V>, Version> long serializedListSize(L
values, P p, Version version, AsymmetricParameterisedVersionedSerializer<V, P,
?, Version> valueSerializer)
{
int items = values.size();
diff --git
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java
index f382675955..03da925f98 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertTest.java
@@ -19,6 +19,14 @@
package org.apache.cassandra.cql3.validation.operations;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
@@ -28,7 +36,12 @@ import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.Duration;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.UntypedResultSet.Row;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.utils.ByteBufferUtil;
public class InsertTest extends CQLTester.Fuzzed
{
@@ -296,4 +309,43 @@ public class InsertTest extends CQLTester.Fuzzed
assertInvalidThrow(InvalidRequestException.class,
"INSERT INTO %s (a, b) VALUES ('foo', ?)", new
String(TOO_BIG.array()));
}
+
+ @Test
+ public void testMapEmptyValueMeaningless()
+ {
+ createTable("CREATE TABLE %s(pk int primary key, v1 map<int, int>, v2
frozen<map<int, int>>)");
+ ByteBuffer value = MapType.getInstance(Int32Type.instance,
Int32Type.instance, true).pack(Arrays.asList(ByteBufferUtil.EMPTY_BYTE_BUFFER,
ByteBufferUtil.EMPTY_BYTE_BUFFER));
+ execute("INSERT INTO %s(pk, v1, v2) VALUES (?, ?, ?)", 0, value,
value);
+
+ Map<Integer, Integer> expected = new HashMap<>();
+ expected.put(null, null);
+ assertRows(execute("SELECT * FROM %s"),
+ row(0, expected, expected));
+ }
+
+ @Test
+ public void testListEmptyValueMeaningless()
+ {
+ createTable("CREATE TABLE %s(pk int primary key, v1 list<int>, v2
frozen<list<int>>)");
+ ByteBuffer value = ListType.getInstance(Int32Type.instance,
true).pack(Collections.singletonList(ByteBufferUtil.EMPTY_BYTE_BUFFER));
+ execute("INSERT INTO %s(pk, v1, v2) VALUES (?, ?, ?)", 0, value,
value);
+
+ List<Integer> expected = new ArrayList<>();
+ expected.add(null);
+ assertRows(execute("SELECT * FROM %s"),
+ row(0, expected, expected));
+ }
+
+ @Test
+ public void testSetEmptyValueMeaningless()
+ {
+ createTable("CREATE TABLE %s(pk int primary key, v1 set<int>, v2
frozen<set<int>>)");
+ ByteBuffer value = SetType.getInstance(Int32Type.instance,
true).pack(Collections.singletonList(ByteBufferUtil.EMPTY_BYTE_BUFFER));
+ execute("INSERT INTO %s(pk, v1, v2) VALUES (?, ?, ?)", 0, value,
value);
+
+ Set<Integer> expected = new HashSet<>();
+ expected.add(null);
+ assertRows(execute("SELECT * FROM %s"),
+ row(0, expected, expected));
+ }
}
diff --git a/test/unit/org/apache/cassandra/db/marshal/AbstractTypeTest.java
b/test/unit/org/apache/cassandra/db/marshal/AbstractTypeTest.java
index 84701a2115..fd3b9b68a8 100644
--- a/test/unit/org/apache/cassandra/db/marshal/AbstractTypeTest.java
+++ b/test/unit/org/apache/cassandra/db/marshal/AbstractTypeTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.db.marshal;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
import java.nio.file.Files;
@@ -111,6 +112,7 @@ import
org.apache.cassandra.utils.asserts.SoftAssertionsWithLimit;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.cassandra.utils.bytecomparable.ByteSource;
import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
+import org.assertj.core.api.Assertions;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.description.Description;
import org.quicktheories.core.Gen;
@@ -244,6 +246,79 @@ public class AbstractTypeTest
throw new AssertionError("Uncovered types:\n" + sb);
}
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void meaninglessEmptyness()
+ {
+ // this test just makes sure that all types are covered and no new
type is left out
+ Set<Class<? extends AbstractType>> subTypes =
reflections.getSubTypesOf(AbstractType.class);
+ for (var klass : subTypes)
+ {
+ if (Modifier.isAbstract(klass.getModifiers()))
+ continue;
+ if (isTestType(klass))
+ continue;
+ if (isPrefixCompositeType(klass))
+ continue;
+ AbstractType<?> type = null;
+ for (var f : klass.getDeclaredFields())
+ {
+ if (!(Modifier.isPublic(f.getModifiers()) &&
Modifier.isStatic(f.getModifiers())))
+ continue;
+ if (AbstractType.class.isAssignableFrom(f.getType()))
+ {
+ try
+ {
+ type = (AbstractType<?>) f.get(null);
+ break;
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ if (type == null)
+ {
+ for (var c : klass.getDeclaredConstructors())
+ {
+ if (c.getParameterCount() == 0)
+ {
+ try
+ {
+ type = (AbstractType<?>) c.newInstance();
+ break;
+ }
+ catch (InstantiationException | IllegalAccessException
| InvocationTargetException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+ if (type == null)
+ continue;
+ if (type.isEmptyValueMeaningless())
+ {
+ AbstractType<?> finalType = type;
+ // some types (such as TimeUUID) have the same equqlas, so
equality checks don't work here, need reference checks
+
Assertions.assertThat(AbstractTypeGenerators.MEANINGLESS_EMPTYNESS)
+ .describedAs("New type %s detected that says its
emptyness is meaningless, but it isn't allowed to be! This is a legacy concept
only!", type.getClass())
+ .anyMatch(t -> t == finalType);
+
+
Assertions.assertThat(type.isNull(ByteBufferUtil.EMPTY_BYTE_BUFFER)).isTrue();
+ }
+ }
+ }
+
+ @Test
+ public void onlyMeaninglessEmptyness()
+ {
+
qt().forAll(Generators.filter(AbstractTypeGenerators.builder().withDefaultSizeGen(1).build(),
t ->
!AbstractTypeGenerators.MEANINGLESS_EMPTYNESS.contains(t))).checkAssert(type ->
{
+ Assertions.assertThat(type.isEmptyValueMeaningless()).isFalse();
+ });
+ }
+
@SuppressWarnings("rawtypes")
private boolean isTestType(Class<? extends AbstractType> klass)
{
diff --git a/test/unit/org/apache/cassandra/db/partitions/SimplePartition.java
b/test/unit/org/apache/cassandra/db/partitions/SimplePartition.java
new file mode 100644
index 0000000000..d139874687
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/partitions/SimplePartition.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.transform.FilteredRows;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.TimeUUID;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.HeapCloner;
+import org.apache.cassandra.utils.memory.HeapPool;
+
+public class SimplePartition extends AbstractBTreePartition
+{
+ static
+ {
+ DatabaseDescriptor.clientInitialization(false); // if the user setup
DD respect w/e was done
+ if (DatabaseDescriptor.getPartitioner() == null)
+
DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ }
+ public static final int DEFAULT_TIMESTAMP = 42;
+ private static final HeapPool POOL = new HeapPool(Long.MAX_VALUE, 1.0f, ()
-> ImmediateFuture.success(Boolean.TRUE));
+
+ private final OpOrder writeOrder = new OpOrder();
+ private final AtomicBTreePartition delegate;
+
+ public SimplePartition(TableMetadata metadata, DecoratedKey partitionKey)
+ {
+ super(partitionKey);
+ delegate = new
AtomicBTreePartition(TableMetadataRef.forOfflineTools(metadata), partitionKey,
POOL.newAllocator(metadata.toString()));
+ }
+
+ @Override
+ protected BTreePartitionData holder()
+ {
+ return delegate.holder();
+ }
+
+ @Override
+ protected boolean canHaveShadowedData()
+ {
+ return false;
+ }
+
+ @Override
+ public TableMetadata metadata()
+ {
+ return delegate.metadata();
+ }
+
+ public SimplePartition clear()
+ {
+ delegate.unsafeSetHolder(BTreePartitionData.EMPTY);
+ return this;
+ }
+
+ public SimplePartition add(Row row)
+ {
+ PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata(),
partitionKey, row);
+ try (OpOrder.Group group = writeOrder.start())
+ {
+ delegate.addAll(update, HeapCloner.instance, group,
UpdateTransaction.NO_OP);
+ }
+ return this;
+ }
+
+ public RowBuilder add(Clustering<?> ck)
+ {
+ return new RowBuilder(ck);
+ }
+
+ public SimplePartition addEmpty(Clustering<?> ck)
+ {
+ return add(ck).build();
+ }
+
+ public SimplePartition addEmptyAndLive(Clustering<?> ck)
+ {
+ return addEmptyAndLive(ck, DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP);
+ }
+
+ public SimplePartition addEmptyAndLive(Clustering<?> ck, long timestamp,
long nowInSec)
+ {
+ return add(ck).liveness(timestamp, nowInSec).build();
+ }
+
+ public RowIterator filtered()
+ {
+ return FilteredRows.filter(unfilteredIterator(), DEFAULT_TIMESTAMP);
+ }
+
+ public class RowBuilder
+ {
+ private final Row.Builder builder = BTreeRow.unsortedBuilder();
+ private long timestamp = DEFAULT_TIMESTAMP;
+
+ public RowBuilder(Clustering<?> ck)
+ {
+ builder.newRow(ck);
+ }
+
+ public RowBuilder timestamp(long timestamp)
+ {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ public RowBuilder liveness(long timestamp, long nowInSec)
+ {
+ builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(timestamp,
nowInSec));
+ return this;
+ }
+
+ public RowBuilder add(ColumnMetadata column, ByteBuffer value)
+ {
+ if (column.type.unwrap().isMultiCell())
+ throw new IllegalArgumentException("Unable to add a single
value to a multi cell column " + column);
+ builder.addCell(BufferCell.live(column, timestamp, value));
+ return this;
+ }
+
+ public RowBuilder addComplex(ColumnMetadata column, List<ByteBuffer>
values)
+ {
+ if (!column.type.unwrap().isMultiCell())
+ throw new IllegalArgumentException("Unable to add multiple
values to a regular column " + column);
+ builder.addComplexDeletion(column, DeletionTime.build(timestamp -
1, timestamp - 1));
+ // map needs to be specially handled as its key/value
+ if (column.type.unwrap() instanceof MapType)
+ {
+ for (int i = 0; i < values.size(); i = i + 2)
+ {
+ ByteBuffer key = values.get(i);
+ ByteBuffer value = values.get(i + 1);
+ builder.addCell(BufferCell.live(column, timestamp, value,
CellPath.create(key)));
+ }
+ }
+ else
+ {
+ for (int i = 0; i < values.size(); i++)
+ builder.addCell(complexCell(column, i, values.get(i),
timestamp));
+ }
+ return this;
+ }
+
+ private Cell<?> complexCell(ColumnMetadata column, int idx, ByteBuffer
value, long timestamp)
+ {
+ var type = column.type.unwrap();
+ if (type.isCollection())
+ {
+ CollectionType<?> ct = (CollectionType<?>) type;
+ switch (ct.kind)
+ {
+ case SET:
+ {
+ // this isn't correct... the value isn't actually
known... so only support map with key/value matching...
+ return BufferCell.live(column, timestamp, value,
CellPath.create(value));
+ }
+ case LIST:
+ {
+ // this isn't actually correct, as the cellpath is
based off time, but a counter is used to keep things deterministic
+ CellPath path =
CellPath.create(ByteBuffer.wrap(TimeUUID.Generator.atUnixMillisAsBytes(idx)));
+ return BufferCell.live(column, timestamp, value, path);
+ }
+ case MAP:
+ throw new UnsupportedOperationException("Map isn't
supported due to API being single element rather than multi-element");
+ default:
+ throw new
UnsupportedOperationException(ct.kind.name());
+ }
+ }
+ else if (type.isUDT())
+ {
+ UserType ut = (UserType) type;
+ CellPath path = ut.cellPathForField(ut.fieldName(idx));
+ return BufferCell.live(column, timestamp, value, path);
+ }
+
+ throw new UnsupportedOperationException(type.toString());
+ }
+
+ public SimplePartition build()
+ {
+ SimplePartition.this.add(builder.build());
+ return SimplePartition.this;
+ }
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/service/accord/txn/TxnConditionTest.java
b/test/unit/org/apache/cassandra/service/accord/txn/TxnConditionTest.java
index 1cb1199f7f..01d9d50aa3 100644
--- a/test/unit/org/apache/cassandra/service/accord/txn/TxnConditionTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/txn/TxnConditionTest.java
@@ -20,11 +20,29 @@ package org.apache.cassandra.service.accord.txn;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
+import accord.utils.DefaultRandom;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.FieldIdentifier;
+import org.apache.cassandra.db.BufferClustering;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.EmptyIterators;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.db.partitions.SimplePartition;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.utils.AbstractTypeGenerators;
+import org.assertj.core.api.Assertions;
import org.junit.Test;
import accord.utils.Gen;
@@ -42,35 +60,97 @@ import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.accord.serializers.TableMetadatas;
-import org.apache.cassandra.service.accord.serializers.Version;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CassandraGenerators;
import org.apache.cassandra.utils.Generators;
import static accord.utils.Property.qt;
+import static org.apache.cassandra.utils.ByteBufferUtil.EMPTY_BYTE_BUFFER;
+import static org.apache.cassandra.utils.Generators.toGen;
//TOOD (maintaince): rather than copy the condition supported kinds, maybe
references directly from the type?
public class TxnConditionTest
{
private static final SchemaProvider SCHEMA = new SchemaProvider();
+
static
{
// ColumnMetadata serializer only stores the ks/table/name and uses
Schema to load it
Schema.instance = SCHEMA;
}
+ private static final FieldIdentifier UDT_F1 =
FieldIdentifier.forInternalString("f1");
+ private static final UserType UDT = new UserType("ks",
ByteBufferUtil.bytes("udt"), Collections.singletonList(UDT_F1),
Collections.singletonList(Int32Type.instance), true);
+ private static final UserType FROZEN_UDT = UDT.freeze();
+
+ private static final SetType<Integer> SET =
SetType.getInstance(Int32Type.instance, true);
+ private static final SetType<Integer> FROZEN_SET = SET.freeze();
+
+ private static ColumnIdentifier name(ColumnMetadata.Kind kind, int offset)
+ {
+ StringBuilder sb = new StringBuilder(3);
+ switch (kind)
+ {
+ case PARTITION_KEY:
+ sb.append("pk");
+ break;
+ case CLUSTERING:
+ sb.append("ck");
+ break;
+ case STATIC:
+ sb.append('s');
+ break;
+ case REGULAR:
+ sb.append('v');
+ break;
+ default:
+ throw new UnsupportedOperationException(kind.name());
+ }
+ sb.append(offset);
+ return ColumnIdentifier.getInterned(sb.toString(), false);
+ }
+ private static final ColumnIdentifier COL_PK1 =
name(ColumnMetadata.Kind.PARTITION_KEY, 1);
+ private static final ColumnIdentifier COL_S1 =
name(ColumnMetadata.Kind.STATIC, 1);
+ private static final ColumnIdentifier COL_S2 =
name(ColumnMetadata.Kind.STATIC, 2);
+ private static final ColumnIdentifier COL_S3 =
name(ColumnMetadata.Kind.STATIC, 3);
+ private static final ColumnIdentifier COL_S4 =
name(ColumnMetadata.Kind.STATIC, 4);
+ private static final ColumnIdentifier COL_S5 =
name(ColumnMetadata.Kind.STATIC, 5);
+ private static final ColumnIdentifier COL_CK1 =
name(ColumnMetadata.Kind.CLUSTERING, 1);
+ private static final ColumnIdentifier COL_R1 =
name(ColumnMetadata.Kind.REGULAR, 1);
+ private static final ColumnIdentifier COL_R2 =
name(ColumnMetadata.Kind.REGULAR, 2);
+ private static final ColumnIdentifier COL_R3 =
name(ColumnMetadata.Kind.REGULAR, 3);
+ private static final ColumnIdentifier COL_R4 =
name(ColumnMetadata.Kind.REGULAR, 4);
+ private static final ColumnIdentifier COL_R5 =
name(ColumnMetadata.Kind.REGULAR, 5);
+ private static final TableMetadata tb1 = TableMetadata.builder("ks",
"tbl1")
+
.addPartitionKeyColumn(COL_PK1, Int32Type.instance)
+
.addStaticColumn(COL_S1, Int32Type.instance)
+
.addStaticColumn(COL_S2, FROZEN_UDT)
+
.addStaticColumn(COL_S3, FROZEN_SET)
+
.addStaticColumn(COL_S4, UDT)
+
.addStaticColumn(COL_S5, SET)
+
.addClusteringColumn(COL_CK1, Int32Type.instance)
+
.addRegularColumn(COL_R1, Int32Type.instance)
+
.addRegularColumn(COL_R2, FROZEN_UDT)
+
.addRegularColumn(COL_R3, FROZEN_SET)
+
.addRegularColumn(COL_R4, UDT)
+
.addRegularColumn(COL_R5, SET)
+
.partitioner(Murmur3Partitioner.instance)
+ .build();
+
+ private static final ByteBuffer INT_1 = ByteBufferUtil.bytes(1);
+
private static Gen<TxnCondition.Kind> BOOLEAN_KIND_GEN =
Gens.pick(TxnCondition.Kind.AND, TxnCondition.Kind.OR);
private static Gen<TxnCondition.Kind> EXISTS_KIND_GEN =
Gens.pick(TxnCondition.Kind.IS_NOT_NULL, TxnCondition.Kind.IS_NULL);
private static Gen<TxnCondition.Kind> VALUE_KIND_GEN =
Gens.pick(TxnCondition.Kind.EQUAL, TxnCondition.Kind.NOT_EQUAL,
TxnCondition.Kind.GREATER_THAN, TxnCondition.Kind.GREATER_THAN_OR_EQUAL,
TxnCondition.Kind.LESS_THAN, TxnCondition.Kind.LESS_THAN_OR_EQUAL);
private static Gen<ProtocolVersion> PROTOCOL_VERSION_GEN =
Gens.enums().all(ProtocolVersion.class);
- private static Gen<ColumnMetadata> COLUM_METADATA_GEN =
Generators.toGen(CassandraGenerators.columnMetadataGen()).map(cm -> {
+ private static Gen<ColumnMetadata> COLUM_METADATA_GEN =
toGen(CassandraGenerators.columnMetadataGen()).map(cm -> {
SCHEMA.add(cm);
return cm;
});
- private static Gen<ByteBuffer> BYTES_GEN =
Generators.toGen(Generators.directAndHeapBytes(0, 10));
+ private static Gen<ByteBuffer> BYTES_GEN =
toGen(Generators.directAndHeapBytes(0, 10));
private static Gen<TxnReference> TXN_REF_GEN = rs -> {
{
ColumnMetadata cm = COLUM_METADATA_GEN.next(rs);
@@ -79,12 +159,12 @@ public class TxnConditionTest
if (!cm.isPartitionKey())
builder.addPartitionKeyColumn(cm.name.toString().equals("_") ?
"__" : "_", Int32Type.instance);
TableMetadata tm = builder.build();
- cm = tm.getColumn(cm.name);
- return rs.nextBoolean() ? new TxnReference(rs.nextInt(0,
Integer.MAX_VALUE), cm, tm)
- : new TxnReference(rs.nextInt(0,
Integer.MAX_VALUE), tm, cm, CellPath.create(BYTES_GEN.next(rs)));
+ cm = tm.getExistingColumn(cm.name);
+ return rs.nextBoolean() ? TxnReference.column(rs.nextInt(0,
Integer.MAX_VALUE), tm, cm)
+ : TxnReference.column(rs.nextInt(0,
Integer.MAX_VALUE), tm, cm, CellPath.create(BYTES_GEN.next(rs)));
}
};
- private static Gen<Clustering<?>> CLUSTERING_GEN =
Generators.toGen(CassandraGenerators.CLUSTERING_GEN);
+ private static Gen<Clustering<?>> CLUSTERING_GEN =
toGen(CassandraGenerators.CLUSTERING_GEN);
private static Gen<ColumnCondition.Bound> BOUND_GEN =
ColumnConditionTest.boundGen().map(b -> {
SCHEMA.add(b.column);
return b;
@@ -98,12 +178,317 @@ public class TxnConditionTest
TableMetadatas.Collector collector = new
TableMetadatas.Collector();
condition.collect(collector);
TableMetadatas tables = collector.build();
- for (Version version : Version.V1.greaterThanOrEqual())
- Serializers.testSerde(output, TxnCondition.serializer,
condition, tables, version);
+ Serializers.testSerde(output, TxnCondition.serializer, condition,
tables);
SCHEMA.clear();
});
}
+ @Test
+ public void isNullWithEmptyRows()
+ {
+ DecoratedKey key = tb1.partitioner.decorateKey(EMPTY_BYTE_BUFFER);
+ TxnData data = TxnData.of(0, new
TxnDataKeyValue(EmptyIterators.row(tb1, key, false)));
+ TxnReference row = TxnReference.row(0);
+ TxnReference pk = TxnReference.column(0, tb1,
tb1.getExistingColumn(COL_PK1));
+ TxnReference s = TxnReference.column(0, tb1,
tb1.getExistingColumn(COL_CK1));
+ TxnReference ck = TxnReference.column(0, tb1,
tb1.getExistingColumn(COL_CK1));
+ TxnReference r = TxnReference.column(0, tb1,
tb1.getExistingColumn(COL_R1));
+
+ for (TxnReference ref : Arrays.asList(row, pk, s, ck, r))
+ assertExists(data, ref, false);
+ }
+
+ @Test
+ public void isNullWithNullColumn()
+ {
+ IsNullTest simpleTest = (partition, clustering, column, nonNullValue)
-> {
+ // now include empty row (acts as null for most types)
+ partition.clear().add(clustering)
+ .add(column, EMPTY_BYTE_BUFFER)
+ .build();
+ TxnData data = TxnData.of(0, new
TxnDataKeyValue(partition.filtered()));
+ assertExists(data, TxnReference.column(0, tb1, column),
!column.type.isNull(EMPTY_BYTE_BUFFER));
+
+ // now include row
+ partition.clear().add(clustering)
+ .add(column, nonNullValue)
+ .build();
+ data = TxnData.of(0, new TxnDataKeyValue(partition.filtered()));
+ assertExists(data, TxnReference.column(0, tb1, column), true);
+ };
+ IsNullFrozenFieldOrElementTest frozenFieldOrElementTest = (partition,
clustering, column, path, nonNullValue, expectedValue) -> {
+ var columnType = column.type.unwrap();
+ var fieldOrElementType = columnType.isUDT() ? ((UserType)
columnType).fieldType(path) : ((CollectionType<?>) columnType).nameComparator();
+ // now include empty row (acts as null for most types)
+ partition.clear().add(clustering)
+ .add(column, EMPTY_BYTE_BUFFER)
+ .build();
+ TxnData data = TxnData.of(0, new
TxnDataKeyValue(partition.filtered()));
+ assertExists(data, TxnReference.column(0, tb1, column, path),
false);
+
+ // now include row
+ partition.clear().add(clustering)
+ .add(column, nonNullValue)
+ .build();
+ data = TxnData.of(0, new TxnDataKeyValue(partition.filtered()));
+ assertExists(data, TxnReference.column(0, tb1, column, path),
!fieldOrElementType.isNull(expectedValue));
+ };
+ IsNullComplexTest complexTest = (partition, clustering, column,
values) -> {
+ // now include column without value (tombstone)
+ partition.clear().add(clustering)
+ .addComplex(column, Collections.emptyList())
+ .build();
+ TxnData data = TxnData.of(0, new
TxnDataKeyValue(partition.filtered()));
+ assertExists(data, TxnReference.column(0, tb1, column), false);
+
+ if (values.isEmpty()) return; // already tested
+ // now include row
+ partition.clear().add(clustering)
+ .addComplex(column, values)
+ .build();
+ data = TxnData.of(0, new TxnDataKeyValue(partition.filtered()));
+ assertExists(data, TxnReference.column(0, tb1, column), true);
+ };
+ IsNullFieldOrElementTest fieldOrElementTest = (partition, clustering,
column, path, values, expectedValue) -> {
+ var columnType = column.type.unwrap();
+ var fieldOrElementType = columnType.isUDT() ? ((UserType)
columnType).fieldType(path) : ((CollectionType<?>) columnType).nameComparator();
+ // now include empty row (acts as null for most types)
+ partition.clear().add(clustering)
+ .addComplex(column, Collections.emptyList())
+ .build();
+ TxnData data = TxnData.of(0, new
TxnDataKeyValue(partition.filtered()));
+ assertExists(data, TxnReference.column(0, tb1, column, path),
false);
+
+ // now include row
+ partition.clear().add(clustering)
+ .addComplex(column, values)
+ .build();
+ data = TxnData.of(0, new TxnDataKeyValue(partition.filtered()));
+ assertExists(data, TxnReference.column(0, tb1, column, path),
!fieldOrElementType.isNull(expectedValue));
+ };
+
+ DecoratedKey key = tb1.partitioner.decorateKey(INT_1);
+ BufferClustering ck = BufferClustering.make(INT_1);
+ SimplePartition partition = new SimplePartition(tb1, key);
+
+
+ partition.clear().addEmpty(ck);
+ TxnData data = TxnData.of(0, new
TxnDataKeyValue(partition.filtered()));
+ assertExists(data, TxnReference.column(0, tb1,
tb1.getExistingColumn(COL_PK1)), true);
+ assertExists(data, TxnReference.column(0, tb1,
tb1.getExistingColumn(COL_CK1)), false);
+ assertExists(data, TxnReference.column(0, tb1,
tb1.getExistingColumn(COL_S1)), false);
+ assertExists(data, TxnReference.column(0, tb1,
tb1.getExistingColumn(COL_R1)), false);
+
+ // now run with liveness set
+ partition.clear().addEmptyAndLive(ck);
+ data = TxnData.of(0, new TxnDataKeyValue(partition.filtered()));
+ assertExists(data, TxnReference.column(0, tb1,
tb1.getExistingColumn(COL_CK1)), true);
+
+ for (Clustering<?> clustering :
Arrays.asList(Clustering.STATIC_CLUSTERING, ck))
+ {
+ ColumnMetadata.Kind kind = clustering ==
Clustering.STATIC_CLUSTERING ? ColumnMetadata.Kind.STATIC :
ColumnMetadata.Kind.REGULAR;
+
+ simpleTest.test(partition, clustering,
tb1.getExistingColumn(name(kind, 1)), INT_1);
+
+ simpleTest.test(partition, clustering,
tb1.getExistingColumn(name(kind, 2)), FROZEN_UDT.pack(INT_1));
+ frozenFieldOrElementTest.test(partition, clustering,
tb1.getExistingColumn(name(kind, 2)), FROZEN_UDT.cellPathForField(UDT_F1),
FROZEN_UDT.pack(INT_1), INT_1);
+ frozenFieldOrElementTest.test(partition, clustering,
tb1.getExistingColumn(name(kind, 2)), FROZEN_UDT.cellPathForField(UDT_F1),
FROZEN_UDT.pack(EMPTY_BYTE_BUFFER), EMPTY_BYTE_BUFFER);
+
+ //TODO (coverage): test list type, which supports by-offset and
by-value
+ //TODO (coverage): test map type, which has key/value which allows
empty
+ simpleTest.test(partition, clustering,
tb1.getExistingColumn(name(kind, 3)),
FROZEN_SET.pack(Collections.singletonList(INT_1)));
+ frozenFieldOrElementTest.test(partition, clustering,
tb1.getExistingColumn(name(kind, 3)), CellPath.create(INT_1),
FROZEN_SET.pack(Collections.singletonList(INT_1)), INT_1);
+ frozenFieldOrElementTest.test(partition, clustering,
tb1.getExistingColumn(name(kind, 3)), CellPath.create(EMPTY_BYTE_BUFFER),
FROZEN_SET.pack(Collections.singletonList(EMPTY_BYTE_BUFFER)),
EMPTY_BYTE_BUFFER);
+
+ complexTest.test(partition, clustering,
tb1.getExistingColumn(name(kind, 4)), Collections.singletonList(INT_1));
+ fieldOrElementTest.test(partition, clustering,
tb1.getExistingColumn(name(kind, 4)), UDT.cellPathForField(UDT_F1),
Collections.singletonList(INT_1), INT_1);
+ fieldOrElementTest.test(partition, clustering,
tb1.getExistingColumn(name(kind, 4)), UDT.cellPathForField(UDT_F1),
Collections.singletonList(EMPTY_BYTE_BUFFER), EMPTY_BYTE_BUFFER);
+
+ complexTest.test(partition, clustering,
tb1.getExistingColumn(name(kind, 5)), Collections.singletonList(INT_1));
+ fieldOrElementTest.test(partition, clustering,
tb1.getExistingColumn(name(kind, 5)), CellPath.create(INT_1),
Collections.singletonList(INT_1), INT_1);
+ fieldOrElementTest.test(partition, clustering,
tb1.getExistingColumn(name(kind, 5)), CellPath.create(EMPTY_BYTE_BUFFER),
Collections.singletonList(EMPTY_BYTE_BUFFER), EMPTY_BYTE_BUFFER);
+ }
+ }
+
+ @Test
+ public void harryPotterAndTheMeaninglessEmptyness()
+ {
+ for (var type : AbstractTypeGenerators.MEANINGLESS_EMPTYNESS)
+ {
+ if (type == CounterColumnType.instance) continue;
+ TableMetadata metadata = TableMetadata.builder("ks", "tbl")
+ .addPartitionKeyColumn("pk",
type)
+ .addClusteringColumn("ck",
type)
+ .addRegularColumn("r", type)
+ .addStaticColumn("s", type)
+
.partitioner(Murmur3Partitioner.instance)
+ .build();
+ ByteBuffer nonEmpty =
toGen(AbstractTypeGenerators.getTypeSupport(type).bytesGen()).next(new
DefaultRandom(42));
+ Assertions.assertThat(nonEmpty).isNotEqualTo(EMPTY_BYTE_BUFFER);
// double check...
+
+ SimplePartition partition = new SimplePartition(metadata,
metadata.partitioner.decorateKey(nonEmpty));
+ Clustering<?> clustering = BufferClustering.make(nonEmpty);
+ for (var c : metadata.regularAndStaticColumns())
+ partition.add(c.isStatic() ? Clustering.STATIC_CLUSTERING :
clustering)
+ .add(c, nonEmpty)
+ .build();
+ TxnData data = TxnData.of(0, new
TxnDataKeyValue(partition.filtered()));
+ // the test is against empty, so can not ever apply
+ for (var column : metadata.columns())
+ {
+ for (TxnCondition.Kind kind : TxnCondition.Value.supported())
+ {
+ for (ProtocolVersion version : ProtocolVersion.SUPPORTED)
+ {
+ TxnReference ref = TxnReference.column(0, metadata,
column);
+ // empty logic reuses this which doesn't make the most
sense... flesh it out
+ TxnCondition.Value condition = new
TxnCondition.Value(ref.asColumn(), kind, EMPTY_BYTE_BUFFER, version);
+ Assertions.assertThat(condition.applies(data))
+ .describedAs("column=%s, type=%s, kind=%s",
column.name, type.asCQL3Type(), kind.name())
+ .isFalse();
+ }
+ }
+ }
+
+ // partition values have empty, so can not ever apply
+ partition = new SimplePartition(metadata,
metadata.partitioner.decorateKey(EMPTY_BYTE_BUFFER));
+ clustering = BufferClustering.make(EMPTY_BYTE_BUFFER);
+ for (var c : metadata.regularAndStaticColumns())
+ partition.add(c.isStatic() ? Clustering.STATIC_CLUSTERING :
clustering)
+ .add(c, EMPTY_BYTE_BUFFER)
+ .build();
+ data = TxnData.of(0, new TxnDataKeyValue(partition.filtered()));
+ for (var column : metadata.columns())
+ {
+ for (TxnCondition.Kind kind : TxnCondition.Value.supported())
+ {
+ for (ProtocolVersion version : ProtocolVersion.SUPPORTED)
+ {
+ TxnReference ref = TxnReference.column(0, metadata,
column);
+ // empty logic reuses this which doesn't make the most
sense... flesh it out
+ TxnCondition.Value condition = new
TxnCondition.Value(ref.asColumn(), kind, nonEmpty, version);
+ Assertions.assertThat(condition.applies(data))
+ .describedAs("column=%s, type=%s, kind=%s",
column.name, type.asCQL3Type(), kind.name())
+ .isFalse();
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void value()
+ {
+ Gen<AbstractType<?>> typeGen = toGen(new
AbstractTypeGenerators.TypeGenBuilder()
+ .withoutUnsafeEquality()
+ .build());
+ qt().check(rs -> {
+ AbstractType<?> type = typeGen.next(rs);
+ TableMetadata metadata = TableMetadata.builder("ks", "tbl")
+ .addPartitionKeyColumn("pk",
type.freeze())
+ .addClusteringColumn("ck",
type.freeze())
+ .addRegularColumn("r", type)
+ .addStaticColumn("s", type)
+
.partitioner(Murmur3Partitioner.instance)
+ .build();
+ ByteBuffer value =
toGen(AbstractTypeGenerators.getTypeSupport(type).bytesGen()).next(rs);
+ List<ByteBuffer> complexValue = type.isMultiCell() ? split(type,
value) : null;
+ Clustering<?> clustering = BufferClustering.make(value);
+ SimplePartition partition = new SimplePartition(metadata,
metadata.partitioner.decorateKey(value));
+ for (TxnCondition.Kind kind : TxnCondition.Value.supported())
+ {
+ for (ProtocolVersion version : ProtocolVersion.SUPPORTED)
+ {
+ for (ColumnMetadata column : metadata.columns())
+ {
+ TxnReference ref = TxnReference.column(0, metadata,
column);
+ // empty logic reuses this which doesn't make the most
sense... flesh it out
+ TxnCondition.Value condition = new
TxnCondition.Value(ref.asColumn(), kind, value, version);
+
+ partition.clear().addEmptyAndLive(clustering);
+ // empty partition
+ boolean expected;
+ switch (kind)
+ {
+ case EQUAL:
+ case LESS_THAN_OR_EQUAL:
+ case GREATER_THAN_OR_EQUAL:
+ expected = column.isPrimaryKeyColumn();
+ break;
+ case NOT_EQUAL:
+ case LESS_THAN:
+ case GREATER_THAN:
+ expected = false;
+ break;
+ default:
+ throw new
UnsupportedOperationException(kind.name());
+ }
+ Assertions.assertThat(condition.applies(TxnData.of(0,
new TxnDataKeyValue(partition.filtered()))))
+ .describedAs("column=%s, type=%s, kind=%s",
column.name, type.asCQL3Type(), kind.name())
+ .isEqualTo(expected);
+
+
+ if (column.isPrimaryKeyColumn()) continue;
+
+ // with value
+ if (type.isMultiCell())
+ {
+ partition.clear()
+ .add(column.isStatic() ?
Clustering.STATIC_CLUSTERING : clustering)
+ .addComplex(column, complexValue)
+ .build();
+ }
+ else
+ {
+ partition.clear()
+ .add(column.isStatic() ?
Clustering.STATIC_CLUSTERING : clustering)
+ .add(column, value)
+ .build();
+ }
+ switch (kind)
+ {
+ case EQUAL:
+ case LESS_THAN_OR_EQUAL:
+ case GREATER_THAN_OR_EQUAL:
+ expected = true;
+ break;
+ case NOT_EQUAL:
+ case LESS_THAN:
+ case GREATER_THAN:
+ expected = false;
+ break;
+ default:
+ throw new
UnsupportedOperationException(kind.name());
+ }
+ Assertions.assertThat(condition.applies(TxnData.of(0,
new TxnDataKeyValue(partition.filtered()))))
+ .describedAs("column=%s, type=%s, kind=%s",
column.name, type.asCQL3Type(), kind.name())
+ .isEqualTo(expected);
+ }
+ }
+ }
+ });
+ }
+
+ private static List<ByteBuffer> split(AbstractType<?> type, ByteBuffer
value)
+ {
+ type = type.unwrap();
+ if (type.isUDT())
+ {
+ return ((UserType) type).unpack(value);
+ }
+ else if (type.isCollection())
+ {
+ return ((CollectionType<?>) type).unpack(value);
+ }
+ throw new UnsupportedOperationException(type.asCQL3Type().toString());
+ }
+
+ private static void assertExists(TxnData data, TxnReference ref, boolean
exists)
+ {
+ Assertions.assertThat(new TxnCondition.Exists(ref,
TxnCondition.Kind.IS_NULL).applies(data)).describedAs("ref=%s %s", ref, exists
? "exists but shouldn't have applied" : "does not exist but should have
applied").isEqualTo(!exists);
+ Assertions.assertThat(new TxnCondition.Exists(ref,
TxnCondition.Kind.IS_NOT_NULL).applies(data)).describedAs("ref=%s %s", ref,
exists ? "exists but should have applied" : "does not exist but shouldn't have
applied").isEqualTo(exists);
+ }
+
private Gen<TxnCondition> txnConditionGen()
{
return rs -> {
@@ -111,7 +496,7 @@ public class TxnConditionTest
{
case 0: return TxnCondition.none();
case 1: return new TxnCondition.Exists(TXN_REF_GEN.next(rs),
EXISTS_KIND_GEN.next(rs));
- case 2: return new TxnCondition.Value(TXN_REF_GEN.next(rs),
VALUE_KIND_GEN.next(rs), BYTES_GEN.next(rs), PROTOCOL_VERSION_GEN.next(rs));
+ case 2: return new
TxnCondition.Value(TXN_REF_GEN.next(rs).asColumn(), VALUE_KIND_GEN.next(rs),
BYTES_GEN.next(rs), PROTOCOL_VERSION_GEN.next(rs));
case 3: return new
TxnCondition.ColumnConditionsAdapter(CLUSTERING_GEN.next(rs),
Gens.lists(BOUND_GEN).ofSizeBetween(0, 3).next(rs));
case 4: return new
TxnCondition.BooleanGroup(BOOLEAN_KIND_GEN.next(rs),
Gens.lists(txnConditionGen()).ofSizeBetween(0, 3).next(rs));
default: throw new AssertionError();
@@ -119,6 +504,26 @@ public class TxnConditionTest
};
}
+ private interface IsNullTest // jdk16+ lets this be in-lined with the test
method rather than be here
+ {
+ void test(SimplePartition partition, Clustering<?> clustering,
ColumnMetadata column, ByteBuffer nonNullValue);
+ }
+
+ private interface IsNullFrozenFieldOrElementTest // jdk16+ lets this be
in-lined with the test method rather than be here
+ {
+ void test(SimplePartition partition, Clustering<?> clustering,
ColumnMetadata column, CellPath path, ByteBuffer nonNullValue, ByteBuffer
expectedValue);
+ }
+
+ private interface IsNullFieldOrElementTest // jdk16+ lets this be in-lined
with the test method rather than be here
+ {
+ void test(SimplePartition partition, Clustering<?> clustering,
ColumnMetadata column, CellPath path, List<ByteBuffer> values, ByteBuffer
expectedValue);
+ }
+
+ private interface IsNullComplexTest // jdk16+ lets this be in-lined with
the test method rather than be here
+ {
+ void test(SimplePartition partition, Clustering<?> clustering,
ColumnMetadata column, List<ByteBuffer> values);
+ }
+
private static class SchemaProvider extends MockSchema.MockSchemaProvider
{
private final class Key
diff --git a/test/unit/org/apache/cassandra/utils/AbstractTypeGenerators.java
b/test/unit/org/apache/cassandra/utils/AbstractTypeGenerators.java
index d44d32d043..90b3f26dca 100644
--- a/test/unit/org/apache/cassandra/utils/AbstractTypeGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/AbstractTypeGenerators.java
@@ -184,6 +184,30 @@ public final class AbstractTypeGenerators
.add(DynamicCompositeType.class)
.add(CounterColumnType.class)
.build();
+ // NEVER EVER EVER UPDATE THIS LIST!
+ // meaningless emptyness is a legacy thrift concept, so only types from
back then apply and all new types will never apply
+ public static final ImmutableUniqueList<AbstractType<?>>
MEANINGLESS_EMPTYNESS = ImmutableUniqueList.of(
+ CounterColumnType.instance,
+
+ BooleanType.instance,
+
+ DateType.instance,
+ TimestampType.instance,
+
+ InetAddressType.instance,
+
+ TimeUUIDType.instance,
+ LegacyTimeUUIDType.instance,
+ LexicalUUIDType.instance,
+ UUIDType.instance,
+
+ DecimalType.instance,
+ DoubleType.instance,
+ FloatType.instance,
+ Int32Type.instance,
+ IntegerType.instance,
+ LongType.instance
+ );
private AbstractTypeGenerators()
{
@@ -415,6 +439,11 @@ public final class AbstractTypeGenerators
return this;
}
+ public TypeGenBuilder withoutUnsafeEquality()
+ {
+ return AbstractTypeGenerators.withoutUnsafeEquality(this);
+ }
+
@SuppressWarnings("unused")
public TypeGenBuilder withPrimitives(AbstractType<?> first,
AbstractType<?>... remaining)
{
@@ -1091,6 +1120,16 @@ public final class AbstractTypeGenerators
{
support = (TypeSupport<T>)
TypeSupport.of(CounterColumnType.instance, SourceDSL.longs().all());
}
+ else if (type == DateType.instance)
+ {
+ // this type isn't supported in most places, but can still be
supported here
+ support = (TypeSupport<T>) TypeSupport.of(DateType.instance,
Generators.DATE_GEN);
+ }
+ else if (type == LegacyTimeUUIDType.instance)
+ {
+ // this type isn't supported in most places, but can still be
supported here
+ support = (TypeSupport<T>)
TypeSupport.of(LegacyTimeUUIDType.instance,
Generators.UUID_TIME_GEN.mix(Generators.UUID_RANDOM_GEN));
+ }
else
{
throw new UnsupportedOperationException("No TypeSupport for: " +
type);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]