This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new b56edf2a5d Add support in CAS for -= on numeric types, and fixed
improper handling of empty bytes which lead to NPE
b56edf2a5d is described below
commit b56edf2a5df8c320b33e38116f2742ab69f7b4fd
Author: David Capwell <[email protected]>
AuthorDate: Fri Mar 28 11:48:51 2025 -0700
Add support in CAS for -= on numeric types, and fixed improper handling of
empty bytes which lead to NPE
patch by David Capwell; reviewed by Ariel Weisberg for CASSANDRA-20477
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/cql3/Operation.java | 12 +-
.../cassandra/cql3/statements/BatchStatement.java | 12 +-
.../cassandra/cql3/statements/CQL3CasRequest.java | 12 +-
.../cql3/statements/ModificationStatement.java | 12 +-
.../org/apache/cassandra/cql3/terms/Constants.java | 64 ++--
.../cassandra/db/RegularAndStaticColumns.java | 7 +
.../apache/cassandra/db/marshal/AbstractType.java | 26 +-
.../org/apache/cassandra/service/CASRequest.java | 3 +
.../org/apache/cassandra/service/paxos/Paxos.java | 9 +-
.../org/apache/cassandra/transport/Dispatcher.java | 2 +
.../test/cql3/CasMultiNodeTableWalkBase.java | 129 ++++++++
.../test/cql3/MultiNodeTableWalkBase.java | 21 +-
.../cql3/MultiNodeTableWalkWithReadRepairTest.java | 4 +-
.../MultiNodeTableWalkWithoutReadRepairTest.java | 4 +-
...est.java => PaxosV1MultiNodeTableWalkTest.java} | 12 +-
...est.java => PaxosV2MultiNodeTableWalkTest.java} | 12 +-
.../test/cql3/SingleNodeTableWalkTest.java | 26 +-
.../distributed/test/cql3/StatefulASTBase.java | 79 ++++-
.../cassandra/harry/model/ASTSingleTableModel.java | 358 +++++++++++++++++++--
.../cassandra/harry/model/BytesPartitionState.java | 17 +-
.../unit/org/apache/cassandra/cql3/KnownIssue.java | 4 +-
.../cassandra/cql3/ast/AssignmentOperator.java | 7 +-
test/unit/org/apache/cassandra/cql3/ast/Bind.java | 1 +
.../apache/cassandra/cql3/ast/CasCondition.java | 20 +-
.../org/apache/cassandra/cql3/ast/Conditional.java | 15 +
.../cassandra/cql3/ast/ExpressionEvaluator.java | 5 +
.../org/apache/cassandra/cql3/ast/Literal.java | 6 +
.../org/apache/cassandra/cql3/ast/Mutation.java | 53 ++-
.../org/apache/cassandra/cql3/ast/Reference.java | 5 +
.../cassandra/cql3/ast/ReferenceExpression.java | 2 +-
test/unit/org/apache/cassandra/cql3/ast/Value.java | 4 +
.../org/apache/cassandra/cql3/ast/Visitor.java | 5 +
.../org/apache/cassandra/utils/ASTGenerators.java | 17 +-
34 files changed, 864 insertions(+), 102 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 80eabfd705..361fb3cd07 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Add support in CAS for -= on numeric types, and fixed improper handling of
empty bytes which lead to NPE (CASSANDRA-20477)
* Do not fail to start a node with materialized views after they are turned
off in config (CASSANDRA-20452)
* Fix nodetool gcstats output, support human-readable units and more output
formats (CASSANDRA-19022)
* Various gossip to TCM upgrade fixes (CASSANDRA-20483)
diff --git a/src/java/org/apache/cassandra/cql3/Operation.java
b/src/java/org/apache/cassandra/cql3/Operation.java
index 7a7c0e8420..7c5e02eb63 100644
--- a/src/java/org/apache/cassandra/cql3/Operation.java
+++ b/src/java/org/apache/cassandra/cql3/Operation.java
@@ -374,8 +374,16 @@ public abstract class Operation
{
if (!(receiver.type instanceof CollectionType))
{
- if (!(receiver.type instanceof CounterColumnType))
- throw new InvalidRequestException(String.format("Invalid
operation (%s) for non counter column %s", toString(receiver), receiver.name));
+ if (canReadExistingState)
+ {
+ if (!(receiver.type instanceof NumberType<?>))
+ throw new
InvalidRequestException(String.format("Invalid operation (%s) for non-numeric
type %s", toString(receiver), receiver.name));
+ }
+ else
+ {
+ if (!(receiver.type instanceof CounterColumnType))
+ throw new
InvalidRequestException(String.format("Invalid operation (%s) for non counter
column %s", toString(receiver), receiver.name));
+ }
return new Constants.Substracter(receiver,
value.prepare(metadata.keyspace, receiver));
}
else if (!(receiver.type.isMultiCell()))
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index bfec675464..6c5b121992 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -468,7 +468,7 @@ public class BatchStatement implements CQLStatement
private ResultMessage executeWithConditions(BatchQueryOptions options,
QueryState state, Dispatcher.RequestTime requestTime)
{
- Pair<CQL3CasRequest, Set<ColumnMetadata>> p = makeCasRequest(options,
state);
+ Pair<CQL3CasRequest, Set<ColumnMetadata>> p = makeCasRequest(options,
state, requestTime);
CQL3CasRequest casRequest = p.left;
Set<ColumnMetadata> columnsWithConditions = p.right;
@@ -495,7 +495,7 @@ public class BatchStatement implements CQLStatement
}
}
- private Pair<CQL3CasRequest,Set<ColumnMetadata>>
makeCasRequest(BatchQueryOptions options, QueryState state)
+ private Pair<CQL3CasRequest,Set<ColumnMetadata>>
makeCasRequest(BatchQueryOptions options, QueryState state,
Dispatcher.RequestTime requestTime)
{
long batchTimestamp = options.getTimestamp(state);
long nowInSeconds = options.getNowInSeconds(state);
@@ -514,7 +514,7 @@ public class BatchStatement implements CQLStatement
if (key == null)
{
key = statement.metadata().partitioner.decorateKey(pks.get(0));
- casRequest = new CQL3CasRequest(statement.metadata(), key,
conditionColumns, updatesRegularRows, updatesStaticRow);
+ casRequest = new CQL3CasRequest(statement.metadata(), key,
conditionColumns, updatesRegularRows, updatesStaticRow, requestTime);
}
else if (!key.getKey().equals(pks.get(0)))
{
@@ -570,7 +570,7 @@ public class BatchStatement implements CQLStatement
BatchQueryOptions batchOptions =
BatchQueryOptions.withoutPerStatementVariables(options);
if (hasConditions)
- return executeInternalWithConditions(batchOptions, queryState);
+ return executeInternalWithConditions(batchOptions, queryState,
Dispatcher.RequestTime.forImmediateExecution());
executeInternalWithoutCondition(queryState, batchOptions,
Dispatcher.RequestTime.forImmediateExecution());
return new ResultMessage.Void();
@@ -586,9 +586,9 @@ public class BatchStatement implements CQLStatement
return null;
}
- private ResultMessage executeInternalWithConditions(BatchQueryOptions
options, QueryState state)
+ private ResultMessage executeInternalWithConditions(BatchQueryOptions
options, QueryState state, Dispatcher.RequestTime requestTime)
{
- Pair<CQL3CasRequest, Set<ColumnMetadata>> p = makeCasRequest(options,
state);
+ Pair<CQL3CasRequest, Set<ColumnMetadata>> p = makeCasRequest(options,
state, requestTime);
CQL3CasRequest request = p.left;
Set<ColumnMetadata> columnsWithConditions = p.right;
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 0d322691c6..4db98459ec 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -34,6 +34,7 @@ import
org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.service.CASRequest;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.paxos.Ballot;
+import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -49,6 +50,7 @@ public class CQL3CasRequest implements CASRequest
private final RegularAndStaticColumns conditionColumns;
private final boolean updatesRegularRows;
private final boolean updatesStaticRow;
+ private final Dispatcher.RequestTime requestTime;
private boolean hasExists; // whether we have an exist or if not exist
condition
// Conditions on the static row. We keep it separate from 'conditions' as
most things related to the static row are
@@ -66,7 +68,8 @@ public class CQL3CasRequest implements CASRequest
DecoratedKey key,
RegularAndStaticColumns conditionColumns,
boolean updatesRegularRows,
- boolean updatesStaticRow)
+ boolean updatesStaticRow,
+ Dispatcher.RequestTime requestTime)
{
this.metadata = metadata;
this.key = key;
@@ -74,6 +77,13 @@ public class CQL3CasRequest implements CASRequest
this.conditionColumns = conditionColumns;
this.updatesRegularRows = updatesRegularRows;
this.updatesStaticRow = updatesStaticRow;
+ this.requestTime = requestTime;
+ }
+
+ @Override
+ public Dispatcher.RequestTime requestTime()
+ {
+ return requestTime;
}
void addRowUpdate(Clustering<?> clustering, ModificationStatement stmt,
QueryOptions options, long timestamp, long nowInSeconds)
diff --git
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index ce9da9a538..21da99c14a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -576,7 +576,7 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
private ResultMessage executeWithCondition(QueryState queryState,
QueryOptions options, Dispatcher.RequestTime requestTime)
{
- CQL3CasRequest request = makeCasRequest(queryState, options);
+ CQL3CasRequest request = makeCasRequest(queryState, options,
requestTime);
try (RowIterator result = StorageProxy.cas(keyspace(),
table(),
@@ -592,7 +592,7 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
}
}
- private CQL3CasRequest makeCasRequest(QueryState queryState, QueryOptions
options)
+ private CQL3CasRequest makeCasRequest(QueryState queryState, QueryOptions
options, Dispatcher.RequestTime requestTime)
{
ClientState clientState = queryState.getClientState();
List<ByteBuffer> keys = buildPartitionKeyNames(options, clientState);
@@ -610,7 +610,7 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
type.isUpdate()? "updates" : "deletions");
Clustering<?> clustering =
Iterables.getOnlyElement(createClustering(options, clientState));
- CQL3CasRequest request = new CQL3CasRequest(metadata(), key,
conditionColumns(), updatesRegularRows(), updatesStaticRow());
+ CQL3CasRequest request = new CQL3CasRequest(metadata(), key,
conditionColumns(), updatesRegularRows(), updatesStaticRow(), requestTime);
addConditions(clustering, request, options);
request.addRowUpdate(clustering, this, options, timestamp,
nowInSeconds);
@@ -718,7 +718,7 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
public ResultMessage executeLocally(QueryState queryState, QueryOptions
options) throws RequestValidationException, RequestExecutionException
{
return hasConditions()
- ? executeInternalWithCondition(queryState, options)
+ ? executeInternalWithCondition(queryState, options,
Dispatcher.RequestTime.forImmediateExecution())
: executeInternalWithoutCondition(queryState, options,
Dispatcher.RequestTime.forImmediateExecution());
}
@@ -732,9 +732,9 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
return null;
}
- public ResultMessage executeInternalWithCondition(QueryState state,
QueryOptions options)
+ public ResultMessage executeInternalWithCondition(QueryState state,
QueryOptions options, Dispatcher.RequestTime requestTime)
{
- CQL3CasRequest request = makeCasRequest(state, options);
+ CQL3CasRequest request = makeCasRequest(state, options, requestTime);
try (RowIterator result = casInternal(state.getClientState(), request,
options.getTimestamp(state), options.getNowInSeconds(state)))
{
diff --git a/src/java/org/apache/cassandra/cql3/terms/Constants.java
b/src/java/org/apache/cassandra/cql3/terms/Constants.java
index f63d7ce28b..3b9ae6b4c9 100644
--- a/src/java/org/apache/cassandra/cql3/terms/Constants.java
+++ b/src/java/org/apache/cassandra/cql3/terms/Constants.java
@@ -44,6 +44,14 @@ import org.apache.cassandra.utils.FastByteOperations;
*/
public abstract class Constants
{
+
+ private static ByteBuffer getCurrentCellBuffer(ColumnMetadata column,
DecoratedKey key, UpdateParameters params)
+ {
+ Row currentRow = params.getPrefetchedRow(key, column.isStatic() ?
Clustering.STATIC_CLUSTERING : params.currentClustering());
+ Cell<?> currentCell = currentRow == null ? null :
currentRow.getCell(column);
+ return currentCell == null ? null : currentCell.buffer();
+ }
+
public enum Type
{
STRING
@@ -489,8 +497,10 @@ public abstract class Constants
else if (column.type instanceof NumberType<?>)
{
@SuppressWarnings("unchecked") NumberType<Number> type =
(NumberType<Number>) column.type;
- ByteBuffer increment = t.bindAndGet(params.options);
- ByteBuffer current = getCurrentCellBuffer(partitionKey,
params);
+ ByteBuffer increment =
type.sanitize(t.bindAndGet(params.options));
+ if (increment == null)
+ return;
+ ByteBuffer current =
type.sanitize(getCurrentCellBuffer(column, partitionKey, params));
if (current == null)
return;
ByteBuffer newValue = type.add(type.compose(current),
type.compose(increment));
@@ -499,7 +509,9 @@ public abstract class Constants
else if (column.type instanceof StringType)
{
ByteBuffer append = t.bindAndGet(params.options);
- ByteBuffer current = getCurrentCellBuffer(partitionKey,
params);
+ if (append == null)
+ return;
+ ByteBuffer current = getCurrentCellBuffer(column,
partitionKey, params);
if (current == null)
return;
ByteBuffer newValue = ByteBuffer.allocate(current.remaining()
+ append.remaining());
@@ -508,13 +520,6 @@ public abstract class Constants
params.addCell(column, newValue);
}
}
-
- private ByteBuffer getCurrentCellBuffer(DecoratedKey key,
UpdateParameters params)
- {
- Row currentRow = params.getPrefetchedRow(key, column.isStatic() ?
Clustering.STATIC_CLUSTERING : params.currentClustering());
- Cell<?> currentCell = currentRow == null ? null :
currentRow.getCell(column);
- return currentCell == null ? null : currentCell.buffer();
- }
}
public static class Substracter extends Operation
@@ -524,19 +529,40 @@ public abstract class Constants
super(column, t);
}
+ @Override
+ public boolean requiresRead()
+ {
+ return !column.type.isCounter();
+ }
+
public void execute(DecoratedKey partitionKey, UpdateParameters
params) throws InvalidRequestException
{
- ByteBuffer bytes = t.bindAndGet(params.options);
- if (bytes == null)
- throw new InvalidRequestException("Invalid null value for
counter increment");
- if (bytes == ByteBufferUtil.UNSET_BYTE_BUFFER)
- return;
+ if (column.type instanceof CounterColumnType)
+ {
+ ByteBuffer bytes = t.bindAndGet(params.options);
+ if (bytes == null)
+ throw new InvalidRequestException("Invalid null value for
counter increment");
+ if (bytes == ByteBufferUtil.UNSET_BYTE_BUFFER)
+ return;
- long increment = ByteBufferUtil.toLong(bytes);
- if (increment == Long.MIN_VALUE)
- throw new InvalidRequestException("The negation of " +
increment + " overflows supported counter precision (signed 8 bytes integer)");
+ long increment = ByteBufferUtil.toLong(bytes);
+ if (increment == Long.MIN_VALUE)
+ throw new InvalidRequestException("The negation of " +
increment + " overflows supported counter precision (signed 8 bytes integer)");
- params.addCounter(column, -increment);
+ params.addCounter(column, -increment);
+ }
+ else if (column.type instanceof NumberType<?>)
+ {
+ @SuppressWarnings("unchecked") NumberType<Number> type =
(NumberType<Number>) column.type;
+ ByteBuffer increment =
type.sanitize(t.bindAndGet(params.options));
+ if (increment == null)
+ return;
+ ByteBuffer current =
type.sanitize(getCurrentCellBuffer(column, partitionKey, params));
+ if (current == null)
+ return;
+ ByteBuffer newValue = type.substract(type.compose(current),
type.compose(increment));
+ params.addCell(column, newValue);
+ }
}
}
diff --git a/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
b/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
index 4daea2f4f7..2032fe6fe9 100644
--- a/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
+++ b/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.db;
import java.util.*;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import com.google.common.collect.Iterators;
@@ -58,6 +60,11 @@ public class RegularAndStaticColumns implements
Iterable<ColumnMetadata>
column.isStatic() ? regulars :
regulars.without(column));
}
+ public Stream<ColumnMetadata> stream()
+ {
+ return StreamSupport.stream(spliterator(), false);
+ }
+
public RegularAndStaticColumns mergeTo(RegularAndStaticColumns that)
{
if (this == that)
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index c624ce505b..5378a4cd3f 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -28,6 +28,8 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
+import javax.annotation.Nullable;
+
import org.apache.cassandra.cql3.AssignmentTestable;
import org.apache.cassandra.cql3.CQL3Type;
import org.apache.cassandra.cql3.ColumnSpecification;
@@ -474,14 +476,6 @@ public abstract class AbstractType<T> implements
Comparator<ByteBuffer>, Assignm
return this;
}
- /**
- * Returns {@code true} for types where empty should be handled like
{@code null} like {@link Int32Type}.
- */
- public boolean isEmptyValueMeaningless()
- {
- return false;
- }
-
/**
* @param ignoreFreezing if true, the type string will not be wrapped with
FrozenType(...), even if this type is frozen.
*/
@@ -537,6 +531,22 @@ public abstract class AbstractType<T> implements
Comparator<ByteBuffer>, Assignm
return false;
}
+ /**
+ * Returns {@code true} for types where empty should be handled like
{@code null} like {@link Int32Type}.
+ */
+ public boolean isEmptyValueMeaningless()
+ {
+ return false;
+ }
+
+ @Nullable
+ public ByteBuffer sanitize(@Nullable ByteBuffer bb)
+ {
+ if (bb == null) return null;
+ // not checking allowsEmpty as this method assumes that the bb has
already passed validation for the type
+ return bb.remaining() == 0 && isEmptyValueMeaningless() ? null : bb;
+ }
+
public boolean isNull(ByteBuffer bb)
{
return isNull(bb, ByteBufferAccessor.instance);
diff --git a/src/java/org/apache/cassandra/service/CASRequest.java
b/src/java/org/apache/cassandra/service/CASRequest.java
index 6fb5eea20c..50ea5852a6 100644
--- a/src/java/org/apache/cassandra/service/CASRequest.java
+++ b/src/java/org/apache/cassandra/service/CASRequest.java
@@ -22,12 +22,15 @@ import org.apache.cassandra.db.partitions.FilteredPartition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.service.paxos.Ballot;
+import org.apache.cassandra.transport.Dispatcher;
/**
* Abstract the conditions and updates for a CAS operation.
*/
public interface CASRequest
{
+ Dispatcher.RequestTime requestTime();
+
/**
* The command to use to fetch the value to compare for the CAS.
*/
diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java
b/src/java/org/apache/cassandra/service/paxos/Paxos.java
index 3412add1a2..06f90907d5 100644
--- a/src/java/org/apache/cassandra/service/paxos/Paxos.java
+++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java
@@ -737,7 +737,7 @@ public class Paxos
Tracing.trace("Reading existing values for CAS precondition");
BeginResult begin = begin(proposeDeadline, readCommand,
consistencyForConsensus,
- true, minimumBallot, failedAttemptsDueToContention);
+ true, minimumBallot, failedAttemptsDueToContention,
request.requestTime());
Ballot ballot = begin.ballot;
Participants participants = begin.participants;
failedAttemptsDueToContention =
begin.failedAttemptsDueToContention;
@@ -914,7 +914,7 @@ public class Paxos
while (true)
{
// does the work of applying in-progress writes; throws UAE or
timeout if it can't
- final BeginResult begin = begin(deadline, read,
consistencyForConsensus, false, minimumBallot, failedAttemptsDueToContention);
+ final BeginResult begin = begin(deadline, read,
consistencyForConsensus, false, minimumBallot, failedAttemptsDueToContention,
requestTime);
failedAttemptsDueToContention =
begin.failedAttemptsDueToContention;
switch (PAXOS_VARIANT)
@@ -1034,7 +1034,8 @@ public class Paxos
ConsistencyLevel consistencyForConsensus,
final boolean isWrite,
Ballot minimumBallot,
- int failedAttemptsDueToContention)
+ int failedAttemptsDueToContention,
+ Dispatcher.RequestTime requestTime)
throws WriteTimeoutException, WriteFailureException,
ReadTimeoutException, ReadFailureException
{
boolean acceptEarlyReadPermission = !isWrite; // if we're reading,
begin by assuming a read permission is sufficient
@@ -1111,7 +1112,7 @@ public class Paxos
PaxosPrepare.Success success = prepare.success();
Supplier<Participants> plan = () -> success.participants;
- DataResolver<?, ?> resolver = new DataResolver<>(query,
plan, NoopReadRepair.instance, new
Dispatcher.RequestTime(query.creationTimeNanos()));
+ DataResolver<?, ?> resolver = new DataResolver<>(query,
plan, NoopReadRepair.instance, requestTime);
for (int i = 0 ; i < success.responses.size() ; ++i)
resolver.preprocess(success.responses.get(i));
diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java
b/src/java/org/apache/cassandra/transport/Dispatcher.java
index d6cb5e822f..f701434d00 100644
--- a/src/java/org/apache/cassandra/transport/Dispatcher.java
+++ b/src/java/org/apache/cassandra/transport/Dispatcher.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -140,6 +141,7 @@ public class Dispatcher implements
CQLMessageHandler.MessageConsumer<Message.Req
public RequestTime(long enqueuedAtNanos, long startedAtNanos)
{
+ Preconditions.checkArgument(enqueuedAtNanos != -1);
this.enqueuedAtNanos = enqueuedAtNanos;
this.startedAtNanos = startedAtNanos;
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java
b/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java
new file mode 100644
index 0000000000..bf8a44dcb9
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.cql3;
+
+import accord.utils.Gen;
+import accord.utils.RandomSource;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.cql3.KnownIssue;
+import org.apache.cassandra.cql3.ast.CasCondition;
+import org.apache.cassandra.cql3.ast.Conditional;
+import org.apache.cassandra.cql3.ast.Mutation;
+import org.apache.cassandra.cql3.ast.Value;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
+import org.apache.cassandra.utils.ASTGenerators;
+
+import static org.apache.cassandra.utils.Generators.toGen;
+
+public abstract class CasMultiNodeTableWalkBase extends MultiNodeTableWalkBase
+{
+ protected final Config.PaxosVariant paxos_variant;
+
+ protected CasMultiNodeTableWalkBase(Config.PaxosVariant paxos_variant)
+ {
+ super(ReadRepairStrategy.NONE);
+ this.paxos_variant = paxos_variant;
+ }
+
+ @Override
+ protected void clusterConfig(IInstanceConfig c)
+ {
+ super.clusterConfig(c);
+ c.set("paxos_variant", paxos_variant);
+ c.set("cas_contention_timeout", "180s");
+ //TODO (now): should these be included? They are in the benchmark
clusters
+// c.set("paxos_contention_min_wait", 0);
+// c.set("paxos_contention_max_wait", "100ms");
+// c.set("paxos_contention_min_delta", 0);
+ }
+
+ @Override
+ protected SingleNodeTableWalkTest.State createState(RandomSource rs,
Cluster cluster)
+ {
+ return new State(rs, cluster);
+ }
+
+ private static boolean isValueUDTSafe(Value value)
+ {
+ var bb = value.valueEncoded();
+ return bb == null ? true : bb.hasRemaining();
+ }
+
+ protected class State extends MultiNodeState
+ {
+ private State(RandomSource rs, Cluster cluster)
+ {
+ super(rs, cluster);
+ }
+
+ @Override
+ protected Gen<Mutation> toMutationGen(ASTGenerators.MutationGenBuilder
mutationGenBuilder)
+ {
+ mutationGenBuilder.withCasGen(i -> true);
+ // generator might not always generate a cas statement... should
fix generator!
+ Gen<Mutation> gen =
toGen(mutationGenBuilder.build()).filter(Mutation::isCas);
+ if (metadata.regularAndStaticColumns().stream().anyMatch(c ->
c.type.isUDT())
+ &&
IGNORED_ISSUES.contains(KnownIssue.CAS_CONDITION_ON_UDT_W_EMPTY_BYTES))
+ {
+ gen = gen.filter(m -> {
+ CasCondition condition;
+ switch (m.kind)
+ {
+ case INSERT:
+ return true;
+ case DELETE:
+ condition = ((Mutation.Delete)
m).casCondition.get();
+ break;
+ case UPDATE:
+ condition = ((Mutation.Update)
m).casCondition.get();
+ break;
+ default:
+ throw new
UnsupportedOperationException(m.kind.name());
+ }
+ return !condition.streamRecursive(true).anyMatch(e -> {
+ if (!(e instanceof Conditional.Where)) return false;
+ var where = (Conditional.Where) e;
+ if (!where.lhs.type().isUDT()) return false;
+ if (where.lhs instanceof Value &&
!isValueUDTSafe((Value) where.lhs))
+ return true;
+ if (where.rhs instanceof Value &&
!isValueUDTSafe((Value) where.rhs))
+ return true;
+ return false;
+ });
+ });
+ }
+ return gen;
+ }
+
+ @Override
+ protected ConsistencyLevel selectCl()
+ {
+ return ConsistencyLevel.SERIAL;
+ }
+
+ @Override
+ protected ConsistencyLevel mutationCl()
+ {
+ return ConsistencyLevel.SERIAL;
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java
b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java
index f10d0edcf4..3e9c1195f6 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import accord.utils.RandomSource;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
@@ -52,13 +53,17 @@ public abstract class MultiNodeTableWalkBase extends
SingleNodeTableWalkTest
@Override
protected Cluster createCluster() throws IOException
{
- return createCluster(mockMultiNode ? 1 : 3, c -> {
- c.set("range_request_timeout", "180s")
- .set("read_request_timeout", "180s")
- .set("write_request_timeout", "180s")
- .set("native_transport_timeout", "180s")
- .set("slow_query_log_timeout", "180s");
- });
+ return createCluster(mockMultiNode ? 1 : 3, this::clusterConfig);
+ }
+
+ @Override
+ protected void clusterConfig(IInstanceConfig c)
+ {
+ c.set("range_request_timeout", "180s")
+ .set("read_request_timeout", "180s")
+ .set("write_request_timeout", "180s")
+ .set("native_transport_timeout", "180s")
+ .set("slow_query_log_timeout", "180s");
}
@Override
@@ -67,7 +72,7 @@ public abstract class MultiNodeTableWalkBase extends
SingleNodeTableWalkTest
return new MultiNodeState(rs, cluster);
}
- private class MultiNodeState extends State
+ protected class MultiNodeState extends State
{
public MultiNodeState(RandomSource rs, Cluster cluster)
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithReadRepairTest.java
b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithReadRepairTest.java
index e8b01f8c71..7727e3a76a 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithReadRepairTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithReadRepairTest.java
@@ -38,7 +38,9 @@ public class MultiNodeTableWalkWithReadRepairTest extends
MultiNodeTableWalkBase
// if a failing seed is detected, populate here
// Example: builder.withSeed(42L);
// CQL operations may have opertors such as +, -, and / (example 4 +
4), to "apply" them to get a constant value
-// CQL_DEBUG_APPLY_OPERATOR = true;
+ // CQL_DEBUG_APPLY_OPERATOR = true;
+ // When mutations look to be lost as seen by more complex SELECTs, it
can be useful to just SELECT the partition/row right after to write to see if
it was safe at the time.
+ // READ_AFTER_WRITE = true;
// When an issue is found, it's a good idea to also run the same seed
against MultiNodeTableWalkWithoutReadRepairTest; if Read Repair is given bad
input, you should expect bad output!
// This test needs to make sure it shares the same random history as
MultiNodeTableWalkWithoutReadRepairTest to always allow the ability to maintain
this property.
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
index a18b80d68a..5a0ce66ccc 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
@@ -35,6 +35,8 @@ public class MultiNodeTableWalkWithoutReadRepairTest extends
MultiNodeTableWalkB
// if a failing seed is detected, populate here
// Example: builder.withSeed(42L);
// CQL operations may have opertors such as +, -, and / (example 4 +
4), to "apply" them to get a constant value
-// CQL_DEBUG_APPLY_OPERATOR = true;
+ // CQL_DEBUG_APPLY_OPERATOR = true;
+ // When mutations look to be lost as seen by more complex SELECTs, it
can be useful to just SELECT the partition/row right after to write to see if
it was safe at the time.
+ // READ_AFTER_WRITE = true;
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
b/test/distributed/org/apache/cassandra/distributed/test/cql3/PaxosV1MultiNodeTableWalkTest.java
similarity index 73%
copy from
test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
copy to
test/distributed/org/apache/cassandra/distributed/test/cql3/PaxosV1MultiNodeTableWalkTest.java
index a18b80d68a..0cf333d2ab 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/cql3/PaxosV1MultiNodeTableWalkTest.java
@@ -19,14 +19,14 @@
package org.apache.cassandra.distributed.test.cql3;
import accord.utils.Property;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
-public class MultiNodeTableWalkWithoutReadRepairTest extends
MultiNodeTableWalkBase
+public class PaxosV1MultiNodeTableWalkTest extends CasMultiNodeTableWalkBase
{
- public MultiNodeTableWalkWithoutReadRepairTest()
+ public PaxosV1MultiNodeTableWalkTest()
{
- super(ReadRepairStrategy.NONE);
+ super(Config.PaxosVariant.v1);
}
@Override
@@ -35,6 +35,8 @@ public class MultiNodeTableWalkWithoutReadRepairTest extends
MultiNodeTableWalkB
// if a failing seed is detected, populate here
// Example: builder.withSeed(42L);
// CQL operations may have opertors such as +, -, and / (example 4 +
4), to "apply" them to get a constant value
-// CQL_DEBUG_APPLY_OPERATOR = true;
+ // CQL_DEBUG_APPLY_OPERATOR = true;
+ // When mutations look to be lost as seen by more complex SELECTs, it
can be useful to just SELECT the partition/row right after to write to see if
it was safe at the time.
+ // READ_AFTER_WRITE = true;
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
b/test/distributed/org/apache/cassandra/distributed/test/cql3/PaxosV2MultiNodeTableWalkTest.java
similarity index 73%
copy from
test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
copy to
test/distributed/org/apache/cassandra/distributed/test/cql3/PaxosV2MultiNodeTableWalkTest.java
index a18b80d68a..fa098edaac 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithoutReadRepairTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/cql3/PaxosV2MultiNodeTableWalkTest.java
@@ -19,14 +19,14 @@
package org.apache.cassandra.distributed.test.cql3;
import accord.utils.Property;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
-public class MultiNodeTableWalkWithoutReadRepairTest extends
MultiNodeTableWalkBase
+public class PaxosV2MultiNodeTableWalkTest extends CasMultiNodeTableWalkBase
{
- public MultiNodeTableWalkWithoutReadRepairTest()
+ public PaxosV2MultiNodeTableWalkTest()
{
- super(ReadRepairStrategy.NONE);
+ super(Config.PaxosVariant.v2);
}
@Override
@@ -35,6 +35,8 @@ public class MultiNodeTableWalkWithoutReadRepairTest extends
MultiNodeTableWalkB
// if a failing seed is detected, populate here
// Example: builder.withSeed(42L);
// CQL operations may have opertors such as +, -, and / (example 4 +
4), to "apply" them to get a constant value
-// CQL_DEBUG_APPLY_OPERATOR = true;
+ // CQL_DEBUG_APPLY_OPERATOR = true;
+ // When mutations look to be lost as seen by more complex SELECTs, it
can be useful to just SELECT the partition/row right after to write to see if
it was safe at the time.
+ // READ_AFTER_WRITE = true;
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java
b/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java
index 755d479e92..1feb9c5f86 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java
@@ -59,6 +59,7 @@ import org.apache.cassandra.db.marshal.InetAddressType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.test.sai.SAIUtil;
import org.apache.cassandra.harry.model.BytesPartitionState;
import org.apache.cassandra.schema.ColumnMetadata;
@@ -92,12 +93,16 @@ public class SingleNodeTableWalkTest extends StatefulASTBase
.collect(Collectors.toList()));
private static final Logger logger =
LoggerFactory.getLogger(SingleNodeTableWalkTest.class);
+ protected static boolean READ_AFTER_WRITE = false;
+
protected void preCheck(Cluster cluster, Property.StatefulBuilder builder)
{
// if a failing seed is detected, populate here
// Example: builder.withSeed(42L);
// CQL operations may have opertors such as +, -, and / (example 4 +
4), to "apply" them to get a constant value
// CQL_DEBUG_APPLY_OPERATOR = true;
+ // When mutations look to be lost as seen by more complex SELECTs, it
can be useful to just SELECT the partition/row right after to write to see if
it was safe at the time.
+ // READ_AFTER_WRITE = true;
}
protected TypeGenBuilder supportedTypes(RandomSource rs)
@@ -345,7 +350,12 @@ public class SingleNodeTableWalkTest extends
StatefulASTBase
protected Cluster createCluster() throws IOException
{
- return createCluster(1, i -> {});
+ return createCluster(1, this::clusterConfig);
+ }
+
+ protected void clusterConfig(IInstanceConfig config)
+ {
+
}
@Test
@@ -460,7 +470,8 @@ public class SingleNodeTableWalkTest extends StatefulASTBase
{
model.factory.regularAndStaticColumns.forEach(mutationGenBuilder::allowEmpty);
}
- this.mutationGen = toGen(mutationGenBuilder.build());
+
model.factory.regularAndStaticColumns.forEach(mutationGenBuilder::allowNull);
+ this.mutationGen = toMutationGen(mutationGenBuilder);
var nonPartitionColumns = ImmutableList.<Symbol>builder()
.addAll(model.factory.clusteringColumns)
@@ -480,6 +491,17 @@ public class SingleNodeTableWalkTest extends
StatefulASTBase
.collect(Collectors.toList());
}
+ @Override
+ protected boolean readAfterWrite()
+ {
+ return READ_AFTER_WRITE;
+ }
+
+ protected Gen<Mutation> toMutationGen(ASTGenerators.MutationGenBuilder
mutationGenBuilder)
+ {
+ return toGen(mutationGenBuilder.build());
+ }
+
private boolean isSearchable(Symbol symbol)
{
// See org.apache.cassandra.cql3.Operator.validateFor
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
b/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
index ec530b97fd..a90b467979 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
@@ -81,6 +81,7 @@ import org.apache.cassandra.utils.FastByteOperations;
import org.apache.cassandra.utils.Generators;
import org.quicktheories.generators.SourceDSL;
+import static accord.utils.Property.multistep;
import static org.apache.cassandra.distributed.test.JavaDriverUtils.toDriverCL;
import static
org.apache.cassandra.utils.AbstractTypeGenerators.overridePrimitiveTypeSupport;
import static
org.apache.cassandra.utils.AbstractTypeGenerators.stringComparator;
@@ -179,7 +180,35 @@ public class StatefulASTBase extends TestBaseImpl
protected static <S extends CommonState> Property.Command<S, Void, ?>
insert(RandomSource rs, S state)
{
int timestamp = ++state.operations;
- return state.command(rs,
state.mutationGen().next(rs).withTimestamp(timestamp));
+ Mutation mutation =
state.mutationGen().next(rs).withTimestamp(timestamp);
+
+ if (!state.readAfterWrite())
+ return state.command(rs, mutation);
+
+ return multistep(state.command(rs, mutation),
+
state.commandSafeRandomHistory(selectForMutation(state, mutation), "Select for
Mutation Validation"));
+ }
+
+ private static <S extends CommonState> Select selectForMutation(S state,
Mutation mutation)
+ {
+ var select = Select.builder(state.metadata).allowFiltering();
+ switch (mutation.kind)
+ {
+ case INSERT:
+ {
+ var insert = (Mutation.Insert) mutation;
+ for (var c : state.model.factory.partitionColumns)
+ select.value(c, insert.values.get(c));
+ }
+ break;
+ default:
+ {
+ select.where(mutation.kind == Mutation.Kind.UPDATE
+ ? ((Mutation.Update) mutation).where
+ : ((Mutation.Delete) mutation).where);
+ }
+ }
+ return select.build();
}
protected static <S extends BaseState> Property.Command<S, Void, ?>
fullTableScan(RandomSource rs, S state)
@@ -244,6 +273,11 @@ public class StatefulASTBase extends TestBaseImpl
createTable(metadata);
}
+ protected boolean readAfterWrite()
+ {
+ return false;
+ }
+
protected boolean isMultiNode()
{
return cluster.size() > 1;
@@ -314,6 +348,17 @@ public class StatefulASTBase extends TestBaseImpl
});
}
+ protected <S extends BaseState> Property.Command<S, Void, ?>
commandSafeRandomHistory(Select select, @Nullable String annotate)
+ {
+ var inst = cluster.firstAlive();
+ String postfix = "on " + inst;
+ if (annotate == null) annotate = postfix;
+ else annotate += ", " + postfix;
+ return new Property.SimpleCommand<>(humanReadable(select,
annotate), s -> {
+ s.model.validate(s.executeQuery(inst, Integer.MAX_VALUE,
s.selectCl(), select), select);
+ });
+ }
+
protected ConsistencyLevel selectCl()
{
return ConsistencyLevel.LOCAL_QUORUM;
@@ -333,11 +378,18 @@ public class StatefulASTBase extends TestBaseImpl
{
var inst = selectInstance(rs);
String postfix = "on " + inst;
+ if (mutation.isCas())
+ {
+ postfix += ", would apply " + model.shouldApply(mutation);
+ // CAS doesn't allow timestamps
+ mutation = mutation.withoutTimestamp();
+ }
if (annotate == null) annotate = postfix;
else annotate += ", " + postfix;
+ Mutation finalMutation = mutation;
return new Property.SimpleCommand<>(humanReadable(mutation,
annotate), s -> {
- s.executeQuery(inst, Integer.MAX_VALUE, s.mutationCl(),
mutation);
- s.model.update(mutation);
+ s.executeQuery(inst, Integer.MAX_VALUE, s.mutationCl(),
finalMutation);
+ s.model.update(finalMutation);
s.mutation();
});
}
@@ -399,7 +451,26 @@ public class StatefulASTBase extends TestBaseImpl
SimpleStatement ss = new SimpleStatement(stmt.toCQL(),
(Object[]) stmt.bindsEncoded());
if (fetchSize != Integer.MAX_VALUE)
ss.setFetchSize(fetchSize);
- ss.setConsistencyLevel(toDriverCL(cl));
+ if (stmt instanceof Mutation)
+ {
+ switch (cl)
+ {
+ case SERIAL:
+ ss.setSerialConsistencyLevel(toDriverCL(cl));
+
ss.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.QUORUM);
+ break;
+ case LOCAL_SERIAL:
+ ss.setSerialConsistencyLevel(toDriverCL(cl));
+
ss.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM);
+ break;
+ default:
+ ss.setConsistencyLevel(toDriverCL(cl));
+ }
+ }
+ else
+ {
+ ss.setConsistencyLevel(toDriverCL(cl));
+ }
InetSocketAddress broadcastAddress =
instance.config().broadcastAddress();
var host = client.getMetadata().getAllHosts().stream()
diff --git
a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
index d2fbb6edcc..f6a03bdd17 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
@@ -45,6 +45,7 @@ import com.google.common.collect.Sets;
import accord.utils.Invariants;
import org.apache.cassandra.cql3.ast.AssignmentOperator;
+import org.apache.cassandra.cql3.ast.CasCondition;
import org.apache.cassandra.cql3.ast.Conditional;
import org.apache.cassandra.cql3.ast.Conditional.Where.Inequality;
import org.apache.cassandra.cql3.ast.Element;
@@ -54,10 +55,13 @@ import org.apache.cassandra.cql3.ast.FunctionCall;
import org.apache.cassandra.cql3.ast.Literal;
import org.apache.cassandra.cql3.ast.Mutation;
import org.apache.cassandra.cql3.ast.Operator;
+import org.apache.cassandra.cql3.ast.Reference;
+import org.apache.cassandra.cql3.ast.ReferenceExpression;
import org.apache.cassandra.cql3.ast.Select;
import org.apache.cassandra.cql3.ast.StandardVisitors;
import org.apache.cassandra.cql3.ast.Symbol;
import org.apache.cassandra.cql3.ast.Value;
+import org.apache.cassandra.cql3.ast.Visitor;
import org.apache.cassandra.db.BufferClustering;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -75,6 +79,8 @@ import static
org.apache.cassandra.harry.model.BytesPartitionState.asCQL;
public class ASTSingleTableModel
{
+ private static final ByteBuffer[][] NO_ROWS = new ByteBuffer[0][];
+
public final BytesPartitionState.Factory factory;
private final TreeMap<BytesPartitionState.Ref, BytesPartitionState>
partitions = new TreeMap<>();
@@ -184,6 +190,7 @@ public class ASTSingleTableModel
public void update(Mutation mutation)
{
+ if (!shouldApply(mutation)) return;
switch (mutation.kind)
{
case INSERT:
@@ -200,7 +207,7 @@ public class ASTSingleTableModel
}
}
- public void update(Mutation.Insert insert)
+ private void update(Mutation.Insert insert)
{
Clustering<ByteBuffer> pd = pd(insert);
BytesPartitionState partition = partitions.get(factory.createRef(pd));
@@ -229,7 +236,7 @@ public class ASTSingleTableModel
true);
}
- public void update(Mutation.Update update)
+ private void update(Mutation.Update update)
{
var split = splitOnPartition(update.where.simplify());
List<Clustering<ByteBuffer>> pks = split.left;
@@ -250,9 +257,12 @@ public class ASTSingleTableModel
for (Symbol col :
Sets.intersection(factory.staticColumns.asSet(), set.keySet()))
{
ByteBuffer current = partition.staticRow().get(col);
- write.put(col, eval(col, current, set.get(col)));
+ EvalResult result = eval(col, current, set.get(col));
+ if (result.kind == EvalResult.Kind.SKIP) continue;
+ write.put(col, result.value);
}
- partition.setStaticColumns(write);
+ if (!write.isEmpty())
+ partition.setStaticColumns(write);
}
// table has clustering but non are in the write, so only
pk/static can be updated
if (!factory.clusteringColumns.isEmpty() && remaining.isEmpty())
@@ -263,10 +273,13 @@ public class ASTSingleTableModel
for (Symbol col :
Sets.intersection(factory.regularColumns.asSet(), set.keySet()))
{
ByteBuffer current = partition.get(cd, col);
- write.put(col, eval(col, current, set.get(col)));
+ EvalResult result = eval(col, current, set.get(col));
+ if (result.kind == EvalResult.Kind.SKIP) continue;
+ write.put(col, result.value);
}
- partition.setColumns(cd, write, false);
+ if (!write.isEmpty())
+ partition.setColumns(cd, write, false);
}
}
}
@@ -274,7 +287,7 @@ public class ASTSingleTableModel
private enum DeleteKind
{PARTITION, ROW, COLUMN}
- public void update(Mutation.Delete delete)
+ private void update(Mutation.Delete delete)
{
//TODO (coverage): range deletes
var split = splitOnPartition(delete.where.simplify());
@@ -328,6 +341,168 @@ public class ASTSingleTableModel
}
}
+ public boolean shouldApply(Mutation mutation)
+ {
+ if (!mutation.isCas()) return true;
+ return shouldApply(mutation, selectPartitionForCAS(mutation));
+ }
+
+ private SelectResult selectPartitionForCAS(Mutation mutation)
+ {
+ var partition = partitions.get(factory.createRef(pd(mutation)));
+ if (partition == null) return
SelectResult.ordered(factory.selectionOrder, NO_ROWS);
+
+ var cd = cdOrNull(mutation);
+ var row = cd == null ? null : partition.get(cd);
+ ImmutableUniqueList<Symbol> columns = cd != null ?
factory.selectionOrder : factory.partitionAndStaticColumns;
+ return SelectResult.ordered(columns, new ByteBuffer[][] {
getRowAsByteBuffer(columns, partition, row)});
+ }
+
+ private boolean shouldApply(Mutation mutation, SelectResult current)
+ {
+ Preconditions.checkArgument(mutation.isCas());
+ // process condition
+ CasCondition condition;
+ switch (mutation.kind)
+ {
+ case INSERT:
+ condition = CasCondition.Simple.NotExists;
+ break;
+ case UPDATE:
+ condition = ((Mutation.Update) mutation).casCondition.get();
+ break;
+ case DELETE:
+ condition = ((Mutation.Delete) mutation).casCondition.get();
+ break;
+ default:
+ throw new UnsupportedOperationException(mutation.kind.name());
+ }
+ if (condition instanceof CasCondition.Simple)
+ {
+ boolean hasPartition = current.rows.length > 0;
+ boolean partitionOrRow =
current.columns.equals(factory.partitionAndStaticColumns);
+ boolean hasRow = partitionOrRow ? hasPartition :
current.isAllDefined(factory.clusteringColumns);
+ var simple = (CasCondition.Simple) condition;
+ switch (simple)
+ {
+ case Exists:
+ return hasRow;
+ case NotExists:
+ return !hasRow;
+ default:
+ throw new UnsupportedOperationException(simple.name());
+ }
+ }
+ var ifCondition = (CasCondition.IfCondition) condition;
+ String letRow = "row";
+ Symbol rowSymbol = Symbol.unknownType(letRow);
+ Map<String, SelectResult> lets = Map.of(letRow, current);
+ // point the columns to be row.column that way it matches LET clause
in BEGIN TRANSACTION, allowing better reuse
+ var updatedCondition = ifCondition.conditional.visit(new Visitor()
+ {
+ @Override
+ public ReferenceExpression visit(ReferenceExpression r)
+ {
+ Preconditions.checkArgument(!(r instanceof Reference),
"Unexpected reference detected: %s", r);
+ return Reference.of(rowSymbol, r);
+ }
+ });
+ return process(updatedCondition, lets);
+ }
+
+ private boolean process(Conditional condition, Map<String, SelectResult>
lets)
+ {
+ if (condition.getClass() == Conditional.Is.class)
+ {
+ var is = (Conditional.Is) condition;
+ Object result = extract(is.reference, lets);
+ return result == null
+ ? is.kind == Conditional.Is.Kind.Null
+ : is.kind == Conditional.Is.Kind.NotNull;
+ }
+ else if (condition.getClass() == Conditional.Where.class)
+ {
+ var where = (Conditional.Where) (condition);
+ if (!where.lhs.type().equals(where.rhs.type()))
+ throw new UnsupportedOperationException("For now where clause
must always have matching types: given " + where.lhs.type() + ' ' +
where.rhs.type());
+ ByteBuffer lhs = where.lhs instanceof ReferenceExpression
+ ? (ByteBuffer) extract((ReferenceExpression)
where.lhs, lets)
+ : eval(where.lhs);
+ ByteBuffer rhs = where.rhs instanceof ReferenceExpression
+ ? (ByteBuffer) extract((ReferenceExpression)
where.rhs, lets)
+ : eval(where.rhs);
+ // If anything is null avoid doing the test, but there is a
special case where this returns true... both sides are null!
+ // This logic isn't consistent with other parts of the database
and is local to CAS IF clause
+ // see ML@Inconsistent null handling between WHERE and IF clauses
+ if (lhs == null || rhs == null)
+ return lhs == rhs;
+ return where.kind.test(where.lhs.type(), lhs, rhs);
+ }
+ else if (condition.getClass() == Conditional.And.class)
+ {
+ var conditions = condition.simplify();
+ for (var c : conditions)
+ {
+ if (!process(c, lets))
+ return false;
+ }
+ return true;
+ }
+ else
+ {
+ throw new UnsupportedOperationException("Unsupported condition
type: " + condition.getClass() + "; " + condition.toCQL());
+ }
+ }
+
+ // Either ByteBuffer (cell) or ByteBuffer[] (row)
+ private static Object extract(ReferenceExpression expr, Map<String,
SelectResult> lets)
+ {
+ Object result = extract0(expr, lets);
+ if (result instanceof SelectResult)
+ {
+ var rows = ((SelectResult) result).rows;
+ result = rows.length == 0 ? null : rows[0];
+ }
+ return result;
+ }
+
+ // o can be Map<String, SelectResult> (lets), SelectResult (row),
ByteBuffer (cell)
+ private static Object extract0(ReferenceExpression expr, @Nullable Object
o)
+ {
+ if (o == null) return null;
+ if (expr instanceof Reference)
+ {
+ Reference ref = (Reference) expr;
+ for (var symbol : ref.path)
+ o = extract0(symbol, o);
+ return o;
+ }
+ else if (expr instanceof Symbol)
+ {
+ var symbol = (Symbol) expr;
+ if (o instanceof Map)
+ {
+ Map<String, SelectResult> lets = (Map<String, SelectResult>) o;
+ return lets.get(symbol.symbol);
+ }
+ else if (o instanceof SelectResult)
+ {
+ SelectResult result = (SelectResult) o;
+ if (result.rows.length == 0)
+ return null;
+ return result.rows[0][result.columns.indexOf(symbol)];
+ }
+ else
+ {
+ throw new UnsupportedOperationException("Unexpected object
type: " + o.getClass());
+ }
+ }
+ else
+ {
+ throw new UnsupportedOperationException("Unsupported ref type: " +
expr.getClass() + "; " + expr.toCQL());
+ }
+ }
+
private List<Clustering<ByteBuffer>> clustering(List<Conditional>
conditionals)
{
if (conditionals.isEmpty())
@@ -422,11 +597,74 @@ public class ASTSingleTableModel
return Collections.singletonList(BufferClustering.make(bbs));
}
+ private Clustering<ByteBuffer> pd(Mutation mutation)
+ {
+ switch (mutation.kind)
+ {
+ case INSERT:
+ return pd((Mutation.Insert) mutation);
+ case UPDATE:
+ return pd((Mutation.Update) mutation);
+ case DELETE:
+ return pd((Mutation.Delete) mutation);
+ default:
+ throw new UnsupportedOperationException(mutation.kind.name());
+ }
+ }
+
private Clustering<ByteBuffer> pd(Mutation.Insert mutation)
{
return key(mutation.values, factory.partitionColumns);
}
+ private Clustering<ByteBuffer> pd(Mutation.Update mutation)
+ {
+ return pd("Update", mutation.where.simplify());
+ }
+
+ private Clustering<ByteBuffer> pd(Mutation.Delete mutation)
+ {
+ return pd("Delete", mutation.where.simplify());
+ }
+
+ private Clustering<ByteBuffer> pd(String type, List<Conditional>
conditionals)
+ {
+ var split = splitOnPartition(conditionals);
+ List<Clustering<ByteBuffer>> pks = split.left;
+ Preconditions.checkArgument(pks.size() == 1, "%s had more than 1
partition key! expected 1 but was %s", type, pks.size());
+ return pks.get(0);
+ }
+
+ @Nullable
+ private Clustering<ByteBuffer> cdOrNull(Mutation mutation)
+ {
+ if (factory.clusteringColumns.isEmpty()) return Clustering.EMPTY;
+ if (mutation.kind == Mutation.Kind.INSERT)
+ {
+ var insert = (Mutation.Insert) mutation;
+ return
!insert.values.keySet().containsAll(factory.clusteringColumns)
+ ? null
+ : key(insert.values, factory.clusteringColumns);
+ }
+ Conditional where;
+ switch (mutation.kind)
+ {
+ case UPDATE:
+ where = ((Mutation.Update) mutation).where;
+ break;
+ case DELETE:
+ where = ((Mutation.Delete) mutation).where;
+ break;
+ default:
+ throw new UnsupportedOperationException("Unexpected mutation:
" + mutation.kind);
+ }
+ var partitions = splitOnPartition(where.simplify());
+ if (partitions.right.isEmpty()) return null;
+ var matches = clustering(partitions.right);
+ Preconditions.checkArgument(matches.size() == 1);
+ return matches.get(0);
+ }
+
public BytesPartitionState get(BytesPartitionState.Ref ref)
{
return partitions.get(ref);
@@ -611,20 +849,53 @@ public class ASTSingleTableModel
private static class SelectResult
{
+ private final ImmutableUniqueList<Symbol> columns;
private final ByteBuffer[][] rows;
private final boolean unordered;
- private SelectResult(ByteBuffer[][] rows, boolean unordered)
+ private SelectResult(ImmutableUniqueList<Symbol> columns,
ByteBuffer[][] rows, boolean unordered)
{
+ this.columns = columns;
this.rows = rows;
this.unordered = unordered;
}
+
+ private static SelectResult ordered(ImmutableUniqueList<Symbol>
columns, ByteBuffer[][] rows)
+ {
+ return new SelectResult(columns, rows, false);
+ }
+
+ private static SelectResult unordered(ImmutableUniqueList<Symbol>
columns, ByteBuffer[][] rows)
+ {
+ return new SelectResult(columns, rows, true);
+ }
+
+ public boolean isAllDefined(ImmutableUniqueList<Symbol> selectColumns)
+ {
+ if (rows.length == 0) return false;
+ for (var row : rows)
+ {
+ for (var col : selectColumns)
+ {
+ if (row[columns.indexOf(col)] == null)
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ public ImmutableUniqueList<Symbol> columns(Select select)
+ {
+ if (select.selections.isEmpty()) return factory.selectionOrder;
+ throw new UnsupportedOperationException("Getting columns from select
other than SELECT * is currently not supported");
}
private SelectResult getRowsAsByteBuffer(Select select)
{
+ ImmutableUniqueList<Symbol> columns = columns(select);
if (select.where.isEmpty())
- return new SelectResult(getRowsAsByteBuffer(applyLimits(all(),
select.perPartitionLimit, select.limit)), false);
+ return SelectResult.ordered(columns,
getRowsAsByteBuffer(applyLimits(all(), select.perPartitionLimit,
select.limit)));
LookupContext ctx = context(select);
List<PrimaryKey> primaryKeys;
if (ctx.unmatchable)
@@ -652,7 +923,7 @@ public class ASTSingleTableModel
}
primaryKeys = applyLimits(primaryKeys, select.perPartitionLimit,
select.limit);
//TODO (correctness): now that we have the rows we need to handle the
selections/aggregation/limit/group-by/etc.
- return new SelectResult(getRowsAsByteBuffer(primaryKeys),
ctx.unordered);
+ return new SelectResult(columns, getRowsAsByteBuffer(primaryKeys),
ctx.unordered);
}
private List<PrimaryKey> applyLimits(List<PrimaryKey> primaryKeys,
Optional<Value> perPartitionLimitOpt, Optional<Value> limitOpt)
@@ -709,20 +980,37 @@ public class ASTSingleTableModel
}
private ByteBuffer[] getRowAsByteBuffer(BytesPartitionState partition,
@Nullable BytesPartitionState.Row row)
+ {
+ return getRowAsByteBuffer(factory.selectionOrder, partition, row);
+ }
+
+ private ByteBuffer[] getRowAsByteBuffer(ImmutableUniqueList<Symbol>
columns, BytesPartitionState partition, @Nullable BytesPartitionState.Row row)
{
Clustering<ByteBuffer> pd = partition.key;
BytesPartitionState.Row staticRow = partition.staticRow();
- ByteBuffer[] bbs = new ByteBuffer[factory.selectionOrder.size()];
+ ByteBuffer[] bbs = new ByteBuffer[columns.size()];
for (Symbol col : factory.partitionColumns)
- bbs[factory.selectionOrder.indexOf(col)] =
pd.bufferAt(factory.partitionColumns.indexOf(col));
+ {
+ if (!columns.contains(col)) continue;
+ bbs[columns.indexOf(col)] =
pd.bufferAt(factory.partitionColumns.indexOf(col));
+ }
for (Symbol col : factory.staticColumns)
- bbs[factory.selectionOrder.indexOf(col)] = staticRow.get(col);
+ {
+ if (!columns.contains(col)) continue;
+ bbs[columns.indexOf(col)] = staticRow.get(col);
+ }
if (row != null)
{
for (Symbol col : factory.clusteringColumns)
- bbs[factory.selectionOrder.indexOf(col)] =
row.clustering.bufferAt(factory.clusteringColumns.indexOf(col));
+ {
+ if (!columns.contains(col)) continue;
+ bbs[columns.indexOf(col)] =
row.clustering.bufferAt(factory.clusteringColumns.indexOf(col));
+ }
for (Symbol col : factory.regularColumns)
- bbs[factory.selectionOrder.indexOf(col)] = row.get(col);
+ {
+ if (!columns.contains(col)) continue;
+ bbs[columns.indexOf(col)] = row.get(col);
+ }
}
return bbs;
}
@@ -942,26 +1230,52 @@ public class ASTSingleTableModel
return
current.stream().map(BufferClustering::new).collect(Collectors.toList());
}
- private static ByteBuffer eval(Symbol col, @Nullable ByteBuffer current,
Expression e)
+ private static class EvalResult
+ {
+ private static final EvalResult SKIP = new EvalResult(Kind.SKIP, null);
+
+ private enum Kind { SKIP, ACCEPT }
+
+ private final Kind kind;
+ private final @Nullable ByteBuffer value;
+
+ private EvalResult(Kind kind, @Nullable ByteBuffer value)
+ {
+ this.kind = kind;
+ this.value = value;
+ }
+
+ private static EvalResult accept(@Nullable ByteBuffer bb)
+ {
+ return new EvalResult(Kind.ACCEPT, bb);
+ }
+ }
+
+ private static EvalResult eval(Symbol col, @Nullable ByteBuffer current,
Expression e)
{
- if (!(e instanceof AssignmentOperator)) return eval(e);
+ if (!(e instanceof AssignmentOperator)) return
EvalResult.accept(eval(e));
+ current = col.type().sanitize(current);
// multi cell collections have the property that they do update even
if the current value is null
boolean isFancy = col.type().isCollection() &&
col.type().isMultiCell();
- if (current == null && !isFancy) return null; // null + ? == null
+ if (current == null && !isFancy) return EvalResult.SKIP; // null + ?
== null
var assignment = (AssignmentOperator) e;
if (isFancy && current == null)
{
return assignment.kind == AssignmentOperator.Kind.SUBTRACT
// if it doesn't exist, then there is nothing to subtract
- ? null
- : eval(assignment.right);
+ ? EvalResult.SKIP
+ : EvalResult.accept(eval(assignment.right));
}
+ // validate your inputs...
+ ByteBuffer rhs = col.type().sanitize(eval(assignment.right));
+ if (rhs == null)
+ return EvalResult.SKIP;
switch (assignment.kind)
{
case ADD:
- return eval(new Operator(Operator.Kind.ADD, new
Literal(current, e.type()), assignment.right));
+ return EvalResult.accept(eval(new Operator(Operator.Kind.ADD,
new Literal(current, e.type()), assignment.right)));
case SUBTRACT:
- return eval(new Operator(Operator.Kind.SUBTRACT, new
Literal(current, e.type()), assignment.right));
+ return EvalResult.accept(eval(new
Operator(Operator.Kind.SUBTRACT, new Literal(current, e.type()),
assignment.right)));
default:
throw new UnsupportedOperationException(assignment.kind + ": "
+ assignment.toCQL());
}
diff --git
a/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java
b/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java
index a10524968a..c2d18e573d 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/BytesPartitionState.java
@@ -449,9 +449,10 @@ public class BytesPartitionState
public final TableMetadata metadata;
public final ImmutableUniqueList<Symbol> partitionColumns;
public final ImmutableUniqueList<Symbol> clusteringColumns;
+ public final ImmutableUniqueList<Symbol> primaryColumns;
public final ImmutableUniqueList<Symbol> staticColumns;
public final ImmutableUniqueList<Symbol> regularColumns;
- public final ImmutableUniqueList<Symbol> selectionOrder,
regularAndStaticColumns;
+ public final ImmutableUniqueList<Symbol> selectionOrder,
partitionAndStaticColumns, regularAndStaticColumns;
public final ClusteringComparator clusteringComparator;
@@ -471,9 +472,23 @@ public class BytesPartitionState
for (ColumnMetadata pk : metadata.clusteringColumns())
symbolListBuilder.add(Symbol.from(pk));
clusteringColumns = symbolListBuilder.buildAndClear();
+ if (clusteringColumns.isEmpty()) primaryColumns = partitionColumns;
+ else
+ {
+ symbolListBuilder.addAll(partitionColumns);
+ symbolListBuilder.addAll(clusteringColumns);
+ primaryColumns = symbolListBuilder.buildAndClear();
+ }
for (ColumnMetadata pk : metadata.staticColumns())
symbolListBuilder.add(Symbol.from(pk));
staticColumns = symbolListBuilder.buildAndClear();
+ if (staticColumns.isEmpty()) partitionAndStaticColumns =
partitionColumns;
+ else
+ {
+ symbolListBuilder.addAll(partitionColumns);
+ symbolListBuilder.addAll(staticColumns);
+ partitionAndStaticColumns = symbolListBuilder.buildAndClear();
+ }
for (ColumnMetadata pk : metadata.regularColumns())
symbolListBuilder.add(Symbol.from(pk));
regularColumns = symbolListBuilder.buildAndClear();
diff --git a/test/unit/org/apache/cassandra/cql3/KnownIssue.java
b/test/unit/org/apache/cassandra/cql3/KnownIssue.java
index b1c2c09d66..be2dfe7524 100644
--- a/test/unit/org/apache/cassandra/cql3/KnownIssue.java
+++ b/test/unit/org/apache/cassandra/cql3/KnownIssue.java
@@ -40,7 +40,9 @@ public enum KnownIssue
AF_MULTI_NODE_MULTI_COLUMN_AND_NODE_LOCAL_WRITES("https://issues.apache.org/jira/browse/CASSANDRA-19007",
"When doing multi
node/multi column queries, AF can miss data when the nodes are not in-sync"),
SAI_AND_VECTOR_COLUMNS("https://issues.apache.org/jira/browse/CASSANDRA-20464",
- "When doing an SAI query, if the where clause also
contains a vector column bad results can be produced")
+ "When doing an SAI query, if the where clause also
contains a vector column bad results can be produced"),
+
CAS_CONDITION_ON_UDT_W_EMPTY_BYTES("https://issues.apache.org/jira/browse/CASSANDRA-20479",
+ "WHERE clause blocks operations on UDTs
but CAS allows in IF clause. During this path empty can be confused with null
which allows non-existing rows to match empty bytes"),
;
KnownIssue(String url, String description)
diff --git a/test/unit/org/apache/cassandra/cql3/ast/AssignmentOperator.java
b/test/unit/org/apache/cassandra/cql3/ast/AssignmentOperator.java
index 0ffb65411b..e3918f70da 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/AssignmentOperator.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/AssignmentOperator.java
@@ -59,10 +59,15 @@ public class AssignmentOperator implements Expression
EnumSet<Kind> result = EnumSet.noneOf(Kind.class);
if (type instanceof CollectionType && type.isMultiCell())
{
- if (type instanceof SetType || type instanceof ListType)
+ if (type instanceof SetType)
return EnumSet.of(Kind.ADD, Kind.SUBTRACT);
+ if (type instanceof ListType)
+ return isTransaction
+ ? EnumSet.of(Kind.ADD, Kind.SUBTRACT)
+ : EnumSet.of(Kind.ADD);
if (type instanceof MapType)
{
+ //TODO (coverage): include SUBTRACT support
// map supports subtract, but not map - map; only map - set!
// since this is annoying to support, for now dropping -
return EnumSet.of(Kind.ADD);
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Bind.java
b/test/unit/org/apache/cassandra/cql3/ast/Bind.java
index ac58dfc7df..788d301dbf 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Bind.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Bind.java
@@ -54,6 +54,7 @@ public class Bind implements Value
@Override
public ByteBuffer valueEncoded()
{
+ if (value == null) return null;
return value instanceof ByteBuffer ? (ByteBuffer) value :
((AbstractType) type).decompose(value);
}
diff --git a/test/unit/org/apache/cassandra/cql3/ast/CasCondition.java
b/test/unit/org/apache/cassandra/cql3/ast/CasCondition.java
index 9f8d5867e0..d0d4d0e35b 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/CasCondition.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/CasCondition.java
@@ -22,6 +22,8 @@ import java.util.stream.Stream;
public interface CasCondition extends Element
{
+ CasCondition visit(Visitor v);
+
enum Simple implements CasCondition
{
NotExists("IF NOT EXISTS"),
@@ -39,11 +41,17 @@ public interface CasCondition extends Element
{
sb.append(cql);
}
+
+ @Override
+ public CasCondition visit(Visitor v)
+ {
+ return v.visit(this);
+ }
}
class IfCondition implements CasCondition
{
- private final Conditional conditional;
+ public final Conditional conditional;
public IfCondition(Conditional conditional)
{
@@ -62,5 +70,15 @@ public interface CasCondition extends Element
{
return Stream.of(conditional);
}
+
+ @Override
+ public CasCondition visit(Visitor v)
+ {
+ var u = v.visit(this);
+ if (u != this) return u;
+ var c = conditional.visit(v);
+ if (c == conditional) return this;
+ return new IfCondition(c);
+ }
}
}
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Conditional.java
b/test/unit/org/apache/cassandra/cql3/ast/Conditional.java
index 52f79bb1dc..66012c060c 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Conditional.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Conditional.java
@@ -72,6 +72,21 @@ public interface Conditional extends Expression
{
this.value = value;
}
+
+ public boolean test(AbstractType<?> type, ByteBuffer a, ByteBuffer
b)
+ {
+ int rc = type.compare(a, b);
+ switch (this)
+ {
+ case EQUAL: return rc == 0;
+ case NOT_EQUAL: return rc != 0;
+ case GREATER_THAN: return rc > 0;
+ case GREATER_THAN_EQ: return rc >=0;
+ case LESS_THAN: return rc < 0;
+ case LESS_THAN_EQ: return rc <=0;
+ default: throw new
UnsupportedOperationException(this.name());
+ }
+ }
}
public final Inequality kind;
diff --git a/test/unit/org/apache/cassandra/cql3/ast/ExpressionEvaluator.java
b/test/unit/org/apache/cassandra/cql3/ast/ExpressionEvaluator.java
index 34acb843c7..a626dcd6b0 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/ExpressionEvaluator.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/ExpressionEvaluator.java
@@ -55,6 +55,11 @@ public class ExpressionEvaluator
Object rhs = eval(e.right);
if (rhs instanceof ByteBuffer)
rhs = e.right.type().compose((ByteBuffer) rhs);
+ // null + 42 = null
+ // 42 + null = null
+ // if anything is null, everything is null!
+ if (lhs == null || rhs == null)
+ return null;
switch (e.kind)
{
case ADD:
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Literal.java
b/test/unit/org/apache/cassandra/cql3/ast/Literal.java
index 3886fe91b4..4bd2f9b631 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Literal.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Literal.java
@@ -56,6 +56,7 @@ public class Literal implements Value
@Override
public ByteBuffer valueEncoded()
{
+ if (value == null) return null;
return value instanceof ByteBuffer ? (ByteBuffer) value :
((AbstractType) type).decompose(value);
}
@@ -69,6 +70,11 @@ public class Literal implements Value
public void toCQL(StringBuilder sb, CQLFormatter formatter)
{
ByteBuffer bytes = valueEncoded();
+ if (bytes == null)
+ {
+ sb.append("null");
+ return;
+ }
if (bytes.remaining() == 0 && !actuallySupportsEmpty(type))
{
sb.append("<empty bytes>");
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Mutation.java
b/test/unit/org/apache/cassandra/cql3/ast/Mutation.java
index 9126d8cbda..95987dc657 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Mutation.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Mutation.java
@@ -52,6 +52,8 @@ public abstract class Mutation implements Statement
public abstract boolean isCas();
+ public abstract Mutation withoutTimestamp();
+
public Mutation withTimestamp(long timestamp)
{
return withTimestamp(new Timestamp(new Literal(timestamp,
LongType.instance)));
@@ -172,6 +174,11 @@ public abstract class Mutation implements Statement
this.timestamp = timestamp;
}
+ public Using withoutTimestamp()
+ {
+ return new Using(ttl, Optional.empty());
+ }
+
public Using withTimestamp(Timestamp timestamp)
{
return new Using(ttl, Optional.of(timestamp));
@@ -299,6 +306,14 @@ public abstract class Mutation implements Statement
return ifNotExists;
}
+ @Override
+ public Mutation withoutTimestamp()
+ {
+ return new Insert(table, values, ifNotExists, using.isEmpty()
+ ? using
+ : using.map(u ->
u.withoutTimestamp()));
+ }
+
@Override
public Insert withTimestamp(Timestamp timestamp)
{
@@ -404,9 +419,20 @@ public abstract class Mutation implements Statement
Conditional copiedWhere = where.visit(v);
if (where != copiedWhere)
updated = true;
+ Optional<? extends CasCondition> updatedCasCondition =
casCondition;
+ if (casCondition.isPresent())
+ {
+ CasCondition original = casCondition.get();
+ var casCopy = original.visit(v);
+ if (casCopy != original)
+ {
+ updatedCasCondition = Optional.ofNullable(casCopy);
+ updated = true;
+ }
+ }
if (!updated) return this;
- return new Update(table, using, copied, copiedWhere, casCondition);
+ return new Update(table, using, copied, copiedWhere,
updatedCasCondition);
}
@Override
@@ -415,6 +441,12 @@ public abstract class Mutation implements Statement
return casCondition.isPresent();
}
+ @Override
+ public Mutation withoutTimestamp()
+ {
+ return new Update(table, using.isEmpty() ? using : using.map(u ->
u.withoutTimestamp()), set, where, casCondition);
+ }
+
@Override
public Update withTimestamp(Timestamp timestamp)
{
@@ -520,9 +552,20 @@ WHERE PK_column_conditions
var copiedWhere = where.visit(v);
if (copiedWhere != where)
updated = true;
+ Optional<? extends CasCondition> updatedCasCondition =
casCondition;
+ if (casCondition.isPresent())
+ {
+ CasCondition original = casCondition.get();
+ var casCopy = original.visit(v);
+ if (casCopy != original)
+ {
+ updatedCasCondition = Optional.ofNullable(casCopy);
+ updated = true;
+ }
+ }
if (!updated) return this;
- return new Delete(copiedColumns, table, timestamp, copiedWhere,
casCondition);
+ return new Delete(copiedColumns, table, timestamp, copiedWhere,
updatedCasCondition);
}
@Override
@@ -531,6 +574,12 @@ WHERE PK_column_conditions
return casCondition.isPresent();
}
+ @Override
+ public Mutation withoutTimestamp()
+ {
+ return new Delete(columns, table, Optional.empty(), where,
casCondition);
+ }
+
@Override
public Delete withTimestamp(Timestamp timestamp)
{
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Reference.java
b/test/unit/org/apache/cassandra/cql3/ast/Reference.java
index fe836a05dd..ee3b73d6fd 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Reference.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Reference.java
@@ -48,6 +48,11 @@ public class Reference implements ReferenceExpression
return new
Reference(Collections.singletonList(Objects.requireNonNull(top)));
}
+ public static Reference of(ReferenceExpression top, ReferenceExpression
next)
+ {
+ return new Reference(List.of(top, next));
+ }
+
@Override
public AbstractType<?> type()
{
diff --git a/test/unit/org/apache/cassandra/cql3/ast/ReferenceExpression.java
b/test/unit/org/apache/cassandra/cql3/ast/ReferenceExpression.java
index 7a99e030a4..d1727fccdb 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/ReferenceExpression.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/ReferenceExpression.java
@@ -26,6 +26,6 @@ public interface ReferenceExpression extends Expression
@Override
default ReferenceExpression visit(Visitor v)
{
- return this;
+ return v.visit(this);
}
}
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Value.java
b/test/unit/org/apache/cassandra/cql3/ast/Value.java
index f04b5fcd36..92ef010183 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Value.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Value.java
@@ -20,11 +20,15 @@ package org.apache.cassandra.cql3.ast;
import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
+
import org.apache.cassandra.db.marshal.AbstractType;
public interface Value extends Expression
{
+ @Nullable
Object value();
+ @Nullable
ByteBuffer valueEncoded();
Value with(Object value, AbstractType<?> type);
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Visitor.java
b/test/unit/org/apache/cassandra/cql3/ast/Visitor.java
index 71e4797c25..c87415e981 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Visitor.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Visitor.java
@@ -71,6 +71,11 @@ public interface Visitor
default Value visit(Value v) { return v; }
+ default CasCondition visit(CasCondition s)
+ {
+ return s;
+ }
+
class CompositeVisitor implements Visitor
{
private final List<Visitor> visitors;
diff --git a/test/unit/org/apache/cassandra/utils/ASTGenerators.java
b/test/unit/org/apache/cassandra/utils/ASTGenerators.java
index 47b2267de3..cdf533cb42 100644
--- a/test/unit/org/apache/cassandra/utils/ASTGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/ASTGenerators.java
@@ -166,6 +166,7 @@ public class ASTGenerators
private Gen<?> valueGen;
private Gen<Boolean> useOperator = SourceDSL.booleans().all();
private Gen<Boolean> useEmpty = SourceDSL.arbitrary().constant(false);
+ private Gen<Boolean> useNull = SourceDSL.arbitrary().constant(false);
private BiFunction<Object, AbstractType<?>, Gen<Value>>
literalOrBindGen = ASTGenerators::valueGen;
public ExpressionBuilder(AbstractType<?> type)
@@ -182,6 +183,12 @@ public class ASTGenerators
return this;
}
+ public ExpressionBuilder allowNull()
+ {
+ useNull = SourceDSL.integers().between(1, 100).map(i -> i < 10);
+ return this;
+ }
+
public ExpressionBuilder withOperators()
{
useOperator = i -> true;
@@ -217,6 +224,8 @@ public class ASTGenerators
//TODO (coverage): rather than single level operators, allow
nested (a + b + c + d)
Gen<Value> leaf = rs ->
literalOrBindGen.apply(valueGen.generate(rs), type).generate(rs);
return rs -> {
+ if (useNull.generate(rs))
+ return new Bind(null, type);
if (useEmpty.generate(rs))
return new Bind(ByteBufferUtil.EMPTY_BYTE_BUFFER, type);
Expression e = leaf.generate(rs);
@@ -395,6 +404,12 @@ public class ASTGenerators
return this;
}
+ public MutationGenBuilder allowNull(Symbol symbol)
+ {
+ columnExpressions.get(symbol).allowNull();
+ return this;
+ }
+
public MutationGenBuilder withDeletionKind(Gen<DeleteKind>
deleteKindGen)
{
this.deleteKindGen = deleteKindGen;
@@ -432,7 +447,7 @@ public class ASTGenerators
public MutationGenBuilder withCasGen(Gen<Boolean> withCasGen)
{
- withCasGen = Objects.requireNonNull(withCasGen);
+ this.withCasGen = Objects.requireNonNull(withCasGen);
return this;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]