This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new acec78abf4 BEGIN TRANSACTION crashes if a mutation touches multiple
rows
acec78abf4 is described below
commit acec78abf40230243693e2e13ded131356773ada
Author: David Capwell <[email protected]>
AuthorDate: Tue Aug 19 13:23:04 2025 -0700
BEGIN TRANSACTION crashes if a mutation touches multiple rows
patch by David Capwell; reviewed by Ariel Weisberg for CASSANDRA-20844
---
CHANGES.txt | 1 +
.../cql3/restrictions/SimpleRestriction.java | 2 +-
.../cql3/statements/ModificationStatement.java | 18 +++-
.../org/apache/cassandra/service/StorageProxy.java | 5 +-
.../service/accord/txn/AccordUpdateParameters.java | 4 +-
.../service/accord/txn/TxnReferenceOperations.java | 37 +++----
.../cassandra/service/accord/txn/TxnWrite.java | 25 +++--
.../service/consensus/TransactionalMode.java | 6 +-
.../UnsupportedTransactionConsistencyLevel.java | 43 ++++++++
.../ConsensusMigrationMutationHelper.java | 6 +-
.../org/apache/cassandra/utils/ByteBufferUtil.java | 1 +
.../distributed/test/accord/AccordCQLTestBase.java | 115 ++++++++++++++++++---
.../test/accord/AccordInteroperabilityTest.java | 10 +-
.../distributed/test/accord/AccordTestBase.java | 1 +
.../test/cql3/CasMultiNodeTableWalkBase.java | 2 +-
.../test/cql3/SingleNodeTokenConflictTest.java | 4 +-
.../fuzz/topology/AccordTopologyMixupTest.java | 2 +-
.../cassandra/harry/model/ASTSingleTableModel.java | 16 ++-
.../test/SingleNodeSingleTableASTTest.java | 3 -
.../simulator/test/SingleTableASTSimulation.java | 13 +--
.../config/DatabaseDescriptorRefTest.java | 1 +
.../apache/cassandra/cql3/ast/ExpressionTest.java | 14 +++
.../org/apache/cassandra/cql3/ast/Literal.java | 6 ++
.../unit/org/apache/cassandra/cql3/ast/Symbol.java | 8 +-
.../org/apache/cassandra/utils/ASTGenerators.java | 49 +++++++--
25 files changed, 302 insertions(+), 90 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index eb07e60cb6..aeef6ebc6c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * BEGIN TRANSACTION crashes if a mutation touches multiple rows
(CASSANDRA-20844)
* Fix version range check in MessagingService.getVersionOrdinal
(CASSANDRA-20842)
* Allow custom constraints to be loaded via SPI (CASSANDRA-20824)
* Optimize DataPlacement lookup by ReplicationParams (CASSANDRA-20804)
diff --git
a/src/java/org/apache/cassandra/cql3/restrictions/SimpleRestriction.java
b/src/java/org/apache/cassandra/cql3/restrictions/SimpleRestriction.java
index 13faf3a27c..85d75fb07f 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/SimpleRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SimpleRestriction.java
@@ -225,7 +225,7 @@ public final class SimpleRestriction implements
SingleRestriction
{
assert operator == Operator.EQ ||
operator == Operator.IN ||
- operator == Operator.ANN;
+ operator == Operator.ANN : String.format("Unexpected operator:
%s", operator);
return bindAndGetClusteringElements(options);
}
diff --git
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 5d8d5d5b3f..fa7aaaebaa 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -912,8 +912,16 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
{
List<TxnReferenceOperation> regularOps =
getTxnReferenceOps(operations.regularSubstitutions(), options);
List<TxnReferenceOperation> staticOps =
getTxnReferenceOps(operations.staticSubstitutions(), options);
- Clustering<?> clustering = !regularOps.isEmpty() ?
Iterables.getOnlyElement(createClustering(options, state)) : null;
- return new TxnReferenceOperations(metadata, clustering, regularOps,
staticOps);
+ List<Clustering<?>> clusterings = txnClusterings(options, state);
+ return new TxnReferenceOperations(metadata, clusterings, regularOps,
staticOps);
+ }
+
+ private List<Clustering<?>> txnClusterings(QueryOptions options,
ClientState state)
+ {
+ if (restrictions.hasAllPrimaryKeyColumnsRestrictedByEqualities())
+ return new ArrayList<>(restrictions.getClusteringColumns(options,
state));
+ // Range/Partition delete, static only
+ return Collections.emptyList();
}
public ModificationStatement forTxn()
@@ -944,14 +952,16 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
{
PartitionUpdate baseUpdate = getTxnUpdate(state, options);
TxnReferenceOperations referenceOps = getTxnReferenceOps(options,
state);
- return new TxnWrite.Fragment(partitionKey, index, baseUpdate,
referenceOps);
+ long timestamp = attrs.isTimestampSet() ?
attrs.getTimestamp(TxnWrite.NO_TIMESTAMP, options) : TxnWrite.NO_TIMESTAMP;
+ return new TxnWrite.Fragment(partitionKey, index, baseUpdate,
referenceOps, timestamp);
}
public TxnWrite.Fragment getTxnWriteFragment(int index, ClientState state,
QueryOptions options, KeyCollector keyCollector)
{
PartitionUpdate baseUpdate = getTxnUpdate(state, options);
TxnReferenceOperations referenceOps = getTxnReferenceOps(options,
state);
- return new
TxnWrite.Fragment(keyCollector.collect(baseUpdate.metadata(),
baseUpdate.partitionKey()), index, baseUpdate, referenceOps);
+ long timestamp = attrs.isTimestampSet() ?
attrs.getTimestamp(TxnWrite.NO_TIMESTAMP, options) : TxnWrite.NO_TIMESTAMP;
+ return new
TxnWrite.Fragment(keyCollector.collect(baseUpdate.metadata(),
baseUpdate.partitionKey()), index, baseUpdate, referenceOps, timestamp);
}
final void addUpdates(UpdatesCollector collector,
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 72692af133..40b3d1fe82 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -149,6 +149,7 @@ import
org.apache.cassandra.service.accord.txn.TxnRangeReadResult;
import org.apache.cassandra.service.accord.txn.TxnRead;
import org.apache.cassandra.service.accord.txn.TxnResult;
import org.apache.cassandra.service.consensus.TransactionalMode;
+import
org.apache.cassandra.service.consensus.UnsupportedTransactionConsistencyLevel;
import
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.SplitConsumer;
import
org.apache.cassandra.service.consensus.migration.ConsensusMigrationMutationHelper.SplitMutations;
import org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter;
@@ -2243,7 +2244,7 @@ public class StorageProxy implements StorageProxyMBean
public static IAccordResult<TxnResult> readWithAccord(ClusterMetadata cm,
PartitionRangeReadCommand command, AbstractBounds<PartitionPosition> range,
ConsistencyLevel consistencyLevel, Dispatcher.RequestTime requestTime)
{
if (consistencyLevel != null &&
!IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS.contains(consistencyLevel))
- throw new InvalidRequestException(consistencyLevel + " is not
supported by Accord");
+ throw
UnsupportedTransactionConsistencyLevel.read(consistencyLevel);
TableMetadata tableMetadata = getTableMetadata(cm,
command.metadata().id);
TableParams tableParams = tableMetadata.params;
@@ -2260,7 +2261,7 @@ public class StorageProxy implements StorageProxyMBean
private static IAccordResult<TxnResult>
readWithAccordAsync(ClusterMetadata cm, SinglePartitionReadCommand.Group group,
ConsistencyLevel consistencyLevel, Dispatcher.RequestTime requestTime)
{
if (consistencyLevel != null &&
!IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS.contains(consistencyLevel))
- throw new InvalidRequestException(consistencyLevel + " is not
supported by Accord");
+ throw
UnsupportedTransactionConsistencyLevel.read(consistencyLevel);
// If the non-SERIAL write strategy is sending all writes through
Accord there is no need to use the supplied consistency
// level since Accord will manage reading safely
diff --git
a/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java
b/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java
index 2e5baf296b..db369440d5 100644
---
a/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java
+++
b/src/java/org/apache/cassandra/service/accord/txn/AccordUpdateParameters.java
@@ -71,7 +71,7 @@ public class AccordUpdateParameters
return data;
}
- public UpdateParameters updateParameters(TableMetadata metadata,
DecoratedKey dk, int rowIndex)
+ public UpdateParameters updateParameters(TableMetadata metadata,
DecoratedKey dk, int rowIndex, long overrideTimestamp)
{
// This is currently only used by Guardrails, but this logically have
issues with Accord as drifts in config
// values could cause unexpected issues in Accord. (ex. some nodes
reject writes while others accept)
@@ -82,7 +82,7 @@ public class AccordUpdateParameters
return new RowUpdateParameters(metadata,
disabledGuardrails,
options,
- timestamp,
+ overrideTimestamp ==
TxnWrite.NO_TIMESTAMP ? timestamp : overrideTimestamp,
MICROSECONDS.toSeconds(timestamp),
ttl,
prefetchRow(dk, rowIndex));
diff --git
a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java
index 7170d14503..8063e0eed3 100644
---
a/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java
+++
b/src/java/org/apache/cassandra/service/accord/txn/TxnReferenceOperations.java
@@ -19,12 +19,11 @@
package org.apache.cassandra.service.accord.txn;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
-import com.google.common.base.Preconditions;
-
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.ParameterisedVersionedSerializer;
@@ -40,18 +39,17 @@ import static
org.apache.cassandra.utils.CollectionSerializers.serializedListSiz
public class TxnReferenceOperations
{
- private static final TxnReferenceOperations EMPTY = new
TxnReferenceOperations(null, null, Collections.emptyList(),
Collections.emptyList());
+ private static final TxnReferenceOperations EMPTY = new
TxnReferenceOperations(null, Collections.emptyList(), Collections.emptyList(),
Collections.emptyList());
private final TableMetadata metadata;
- final Clustering<?> clustering;
+ final List<Clustering<?>> clusterings;
final List<TxnReferenceOperation> regulars;
final List<TxnReferenceOperation> statics;
- public TxnReferenceOperations(TableMetadata metadata, Clustering<?>
clustering, List<TxnReferenceOperation> regulars, List<TxnReferenceOperation>
statics)
+ public TxnReferenceOperations(TableMetadata metadata, List<Clustering<?>>
clusterings, List<TxnReferenceOperation> regulars, List<TxnReferenceOperation>
statics)
{
this.metadata = metadata;
- Preconditions.checkArgument(clustering != null || regulars.isEmpty());
- this.clustering = clustering;
+ this.clusterings = clusterings;
this.regulars = regulars;
this.statics = statics;
}
@@ -62,19 +60,19 @@ public class TxnReferenceOperations
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TxnReferenceOperations that = (TxnReferenceOperations) o;
- return metadata.equals(that.metadata) && Objects.equals(clustering,
that.clustering) && regulars.equals(that.regulars) &&
statics.equals(that.statics);
+ return metadata.equals(that.metadata) &&
clusterings.equals(that.clusterings) && regulars.equals(that.regulars) &&
statics.equals(that.statics);
}
@Override
public int hashCode()
{
- return Objects.hash(metadata, clustering, regulars, statics);
+ return Objects.hash(metadata, clusterings, regulars, statics);
}
@Override
public String toString()
{
- return "TxnReferenceOperations{metadata=" + metadata + ", clustering="
+ clustering + ", regulars=" + regulars + ", statics=" + statics + '}';
+ return "TxnReferenceOperations{metadata=" + metadata + ",
clusterings=" + clusterings + ", regulars=" + regulars + ", statics=" + statics
+ '}';
}
public static TxnReferenceOperations empty()
@@ -97,9 +95,9 @@ public class TxnReferenceOperations
return;
tables.serialize(operations.metadata, out);
- out.writeBoolean(operations.clustering != null);
- if (operations.clustering != null)
- Clustering.serializer.serialize(operations.clustering, out,
version.messageVersion(), operations.metadata.comparator.subtypes());
+ out.writeVInt32(operations.clusterings.size());
+ for (Clustering<?> clustering : operations.clusterings)
+ Clustering.serializer.serialize(clustering, out,
version.messageVersion(), operations.metadata.comparator.subtypes());
serializeList(operations.regulars, tables, out,
TxnReferenceOperation.serializer);
serializeList(operations.statics, tables, out,
TxnReferenceOperation.serializer);
}
@@ -111,8 +109,11 @@ public class TxnReferenceOperations
return TxnReferenceOperations.empty();
TableMetadata metadata = tables.deserialize(in);
- Clustering<?> clustering = in.readBoolean() ?
Clustering.serializer.deserialize(in, version.messageVersion(),
metadata.comparator.subtypes()) : null;
- return new TxnReferenceOperations(metadata, clustering,
deserializeList(tables, in, TxnReferenceOperation.serializer),
+ int clusteringCount = in.readVInt32();
+ List<Clustering<?>> clusterings = new ArrayList<>(clusteringCount);
+ for (int i = 0; i < clusteringCount; i++)
+ clusterings.add(Clustering.serializer.deserialize(in,
version.messageVersion(), metadata.comparator.subtypes()));
+ return new TxnReferenceOperations(metadata, clusterings,
deserializeList(tables, in, TxnReferenceOperation.serializer),
deserializeList(tables, in,
TxnReferenceOperation.serializer));
}
@@ -123,9 +124,9 @@ public class TxnReferenceOperations
if (operations.isEmpty())
return size;
size += tables.serializedSize(operations.metadata);
- size += TypeSizes.BOOL_SIZE;
- if (operations.clustering != null)
- size +=
Clustering.serializer.serializedSize(operations.clustering,
version.messageVersion(), operations.metadata.comparator.subtypes());
+ size += TypeSizes.sizeofVInt(operations.clusterings.size());
+ for (Clustering<?> clustering : operations.clusterings)
+ size += Clustering.serializer.serializedSize(clustering,
version.messageVersion(), operations.metadata.comparator.subtypes());
size += serializedListSize(operations.regulars, tables,
TxnReferenceOperation.serializer);
size += serializedListSize(operations.statics, tables,
TxnReferenceOperation.serializer);
return size;
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
index 31efe97026..c5890b1cd0 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
@@ -29,7 +29,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
-import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,6 +80,8 @@ import static
org.apache.cassandra.utils.ArraySerializers.skipArray;
public class TxnWrite extends AbstractKeySorted<TxnWrite.Update> implements
Write
{
+ public static final long NO_TIMESTAMP = 0;
+
@SuppressWarnings("unused")
private static final Logger logger =
LoggerFactory.getLogger(TxnWrite.class);
@@ -245,13 +246,15 @@ public class TxnWrite extends
AbstractKeySorted<TxnWrite.Update> implements Writ
public final int index;
public final PartitionUpdate baseUpdate;
public final TxnReferenceOperations referenceOps;
+ public final long timestamp;
- public Fragment(PartitionKey key, int index, PartitionUpdate
baseUpdate, TxnReferenceOperations referenceOps)
+ public Fragment(PartitionKey key, int index, PartitionUpdate
baseUpdate, TxnReferenceOperations referenceOps, long timestamp)
{
this.key = key;
this.index = index;
this.baseUpdate = baseUpdate;
this.referenceOps = referenceOps;
+ this.timestamp = timestamp;
}
public static int compareKeys(Fragment left, Fragment right)
@@ -302,17 +305,20 @@ public class TxnWrite extends
AbstractKeySorted<TxnWrite.Update> implements Writ
baseUpdate.rowCount(),
baseUpdate.canHaveShadowedData());
- UpdateParameters up =
parameters.updateParameters(baseUpdate.metadata(), key, index);
+ UpdateParameters up =
parameters.updateParameters(baseUpdate.metadata(), key, index, timestamp);
TxnData data = parameters.getData();
Row staticRow = applyUpdates(baseUpdate.staticRow(),
referenceOps.statics, key, Clustering.STATIC_CLUSTERING, up, data);
if (!staticRow.isEmpty())
updateBuilder.add(staticRow);
- Row existing = baseUpdate.hasRows() ?
Iterables.getOnlyElement(baseUpdate) : null;
- Row row = applyUpdates(existing, referenceOps.regulars, key,
referenceOps.clustering, up, data);
- if (row != null)
- updateBuilder.add(row);
+ for (Clustering<?> clustering : referenceOps.clusterings)
+ {
+ Row existing = baseUpdate.hasRows() ?
baseUpdate.getRow(clustering) : null;
+ Row row = applyUpdates(existing, referenceOps.regulars, key,
clustering, up, data);
+ if (row != null)
+ updateBuilder.add(row);
+ }
return new Update(this.key, index, updateBuilder.build(), tables);
}
@@ -371,6 +377,7 @@ public class TxnWrite extends
AbstractKeySorted<TxnWrite.Update> implements Writ
out.writeUnsignedVInt32(fragment.index);
PartitionUpdate.serializer.serializeWithoutKey(fragment.baseUpdate, tables,
out, version.messageVersion());
TxnReferenceOperations.serializer.serialize(fragment.referenceOps, tables, out,
version);
+ out.writeUnsignedVInt(fragment.timestamp);
}
public Fragment deserialize(PartitionKey key, TableMetadatas
tables, DataInputPlus in, Version version) throws IOException
@@ -379,7 +386,8 @@ public class TxnWrite extends
AbstractKeySorted<TxnWrite.Update> implements Writ
// TODO (required): why FROM_REMOTE?
PartitionUpdate baseUpdate =
PartitionUpdate.serializer.deserialize(key, tables, in,
version.messageVersion(), FROM_REMOTE);
TxnReferenceOperations referenceOps =
TxnReferenceOperations.serializer.deserialize(tables, in, version);
- return new Fragment(key, idx, baseUpdate, referenceOps);
+ long timestamp = in.readUnsignedVInt();
+ return new Fragment(key, idx, baseUpdate, referenceOps,
timestamp);
}
public long serializedSize(Fragment fragment, TableMetadatas
tables, Version version)
@@ -388,6 +396,7 @@ public class TxnWrite extends
AbstractKeySorted<TxnWrite.Update> implements Writ
size += TypeSizes.sizeofUnsignedVInt(fragment.index);
size +=
PartitionUpdate.serializer.serializedSizeWithoutKey(fragment.baseUpdate,
tables, version.messageVersion());
size +=
TxnReferenceOperations.serializer.serializedSize(fragment.referenceOps, tables,
version);
+ size += TypeSizes.sizeofUnsignedVInt(fragment.timestamp);
return size;
}
}
diff --git
a/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java
b/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java
index ceac2d89dc..5355d33d10 100644
--- a/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java
+++ b/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java
@@ -142,7 +142,7 @@ public enum TransactionalMode
public ConsistencyLevel commitCLForMode(TransactionalMigrationFromMode
fromMode, ConsistencyLevel consistencyLevel, ClusterMetadata cm, TableId
tableId, Token token)
{
if
(!IAccordService.SUPPORTED_COMMIT_CONSISTENCY_LEVELS.contains(consistencyLevel))
- throw new UnsupportedOperationException("Consistency level " +
consistencyLevel + " is unsupported with Accord for write/commit, supported are
ANY, ONE, QUORUM, and ALL");
+ throw
UnsupportedTransactionConsistencyLevel.commit(consistencyLevel);
if (ignoresSuppliedCommitCL())
{
@@ -172,7 +172,7 @@ public enum TransactionalMode
public ConsistencyLevel readCLForMode(TransactionalMigrationFromMode
fromMode, ConsistencyLevel consistencyLevel, ClusterMetadata cm, TableId
tableId, Token token)
{
if
(!IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS.contains(consistencyLevel))
- throw new UnsupportedOperationException("Consistency level " +
consistencyLevel + " is unsupported with Accord for read, supported are ONE,
QUORUM, and SERIAL");
+ throw
UnsupportedTransactionConsistencyLevel.read(consistencyLevel);
if (ignoresSuppliedReadCL())
{
@@ -215,7 +215,7 @@ public enum TransactionalMode
public ConsistencyLevel readCLForMode(TransactionalMigrationFromMode
fromMode, ConsistencyLevel consistencyLevel, ClusterMetadata cm, TableId
tableId, AbstractBounds<PartitionPosition> range)
{
if
(!IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS.contains(consistencyLevel))
- throw new UnsupportedOperationException("Consistency level " +
consistencyLevel + " is unsupported with Accord for read, supported are ONE,
QUORUM, and SERIAL");
+ throw
UnsupportedTransactionConsistencyLevel.read(consistencyLevel);
checkState(range.unwrap().size() == 1);
if (ignoresSuppliedReadCL())
diff --git
a/src/java/org/apache/cassandra/service/consensus/UnsupportedTransactionConsistencyLevel.java
b/src/java/org/apache/cassandra/service/consensus/UnsupportedTransactionConsistencyLevel.java
new file mode 100644
index 0000000000..5b7ecf7b3a
--- /dev/null
+++
b/src/java/org/apache/cassandra/service/consensus/UnsupportedTransactionConsistencyLevel.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.consensus;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.service.accord.IAccordService;
+
+public class UnsupportedTransactionConsistencyLevel extends
InvalidRequestException
+{
+ public enum Kind { Read, Commit }
+
+ private UnsupportedTransactionConsistencyLevel(Kind kind, ConsistencyLevel
consistencyLevel)
+ {
+ super("ConsistencyLevel " + consistencyLevel + " is unsupported with
Accord for " + (kind == Kind.Commit ? "write/commit" : "read") + ", supported
are " + (kind == Kind.Commit ?
IAccordService.SUPPORTED_COMMIT_CONSISTENCY_LEVELS :
IAccordService.SUPPORTED_READ_CONSISTENCY_LEVELS));
+ }
+
+ public static UnsupportedTransactionConsistencyLevel
commit(ConsistencyLevel consistencyLevel)
+ {
+ return new UnsupportedTransactionConsistencyLevel(Kind.Commit,
consistencyLevel);
+ }
+
+ public static UnsupportedTransactionConsistencyLevel read(ConsistencyLevel
consistencyLevel)
+ {
+ return new UnsupportedTransactionConsistencyLevel(Kind.Read,
consistencyLevel);
+ }
+}
diff --git
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
index c319daee9b..d8cc96a3a1 100644
---
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
+++
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationMutationHelper.java
@@ -41,7 +41,6 @@ import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.RetryOnDifferentSystemException;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
@@ -63,6 +62,7 @@ import org.apache.cassandra.service.accord.txn.TxnResult;
import org.apache.cassandra.service.accord.txn.TxnUpdate;
import org.apache.cassandra.service.accord.txn.TxnWrite;
import org.apache.cassandra.service.consensus.TransactionalMode;
+import
org.apache.cassandra.service.consensus.UnsupportedTransactionConsistencyLevel;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tracing.Tracing;
@@ -247,7 +247,7 @@ public class ConsensusMigrationMutationHelper
PreserveTimestamp preserveTimestamps)
{
if (consistencyLevel != null &&
!IAccordService.SUPPORTED_COMMIT_CONSISTENCY_LEVELS.contains(consistencyLevel))
- throw new InvalidRequestException(consistencyLevel + " is not
supported by Accord");
+ throw
UnsupportedTransactionConsistencyLevel.commit(consistencyLevel);
TableMetadatas tables;
{
@@ -271,7 +271,7 @@ public class ConsensusMigrationMutationHelper
{
PartitionKey pk = keyCollector.collect(update.metadata(),
update.partitionKey());
minEpoch = Math.max(minEpoch,
update.metadata().epoch.getEpoch());
- fragments.add(new TxnWrite.Fragment(pk, fragmentIndex++,
update, TxnReferenceOperations.empty()));
+ fragments.add(new TxnWrite.Fragment(pk, fragmentIndex++,
update, TxnReferenceOperations.empty(), TxnWrite.NO_TIMESTAMP));
}
}
// Potentially ignore commit consistency level if the
TransactionalMode specifies full
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 57d824885a..9e61fd2f26 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -570,6 +570,7 @@ public class ByteBufferUtil
public static ByteBuffer objectToBytes(Object obj)
{
+ if (obj == null) return null;
if (obj instanceof Integer)
return ByteBufferUtil.bytes((int) obj);
else if (obj instanceof Byte)
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
index 27a07343f3..f89fb1a986 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
@@ -50,6 +50,13 @@ import accord.topology.Topologies;
import org.apache.cassandra.config.Config.PaxosVariant;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.ast.Symbol;
+import org.apache.cassandra.cql3.ast.AssignmentOperator;
+import org.apache.cassandra.cql3.ast.Literal;
+import org.apache.cassandra.cql3.ast.Mutation;
+import org.apache.cassandra.cql3.ast.Reference;
+import org.apache.cassandra.cql3.ast.Select;
+import org.apache.cassandra.cql3.ast.Statement;
+import org.apache.cassandra.cql3.ast.Txn;
import org.apache.cassandra.cql3.functions.types.utils.Bytes;
import org.apache.cassandra.cql3.statements.TransactionStatement;
import org.apache.cassandra.distributed.util.QueryResultUtil;
@@ -85,6 +92,7 @@ import static
org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
import static org.apache.cassandra.distributed.util.QueryResultUtil.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -3239,19 +3247,16 @@ public abstract class AccordCQLTestBase extends
AccordTestBase
ICoordinator coordinator = cluster.coordinator(1);
coordinator.execute("INSERT INTO " + qualifiedAccordTableName
+ "(k, c, v0, v1) VALUES (0, 0, {0}, 1)", QUORUM);
- String cql = "BEGIN TRANSACTION\n" +
- " LET row = (SELECT *\n" +
- " FROM " + qualifiedAccordTableName
+ '\n' +
- " WHERE k = ? AND c = ?);\n" +
- " UPDATE " + qualifiedAccordTableName + '\n' +
- " SET\n" +
- " v0={1},\n" +
- " v1 += row.v1\n" +
- " WHERE \n" +
- " k = ? AND \n" +
- " c = ?;\n" +
- "COMMIT TRANSACTION";
- coordinator.execute(cql, QUORUM, 0, 0, 0, 0);
+ Statement stmt = Txn.builder()
+ .addLet("row",
Select.builder().table(KEYSPACE, accordTableName).value("k", 0).value("c",
0).build())
+ .addUpdate(Mutation.update(KEYSPACE,
accordTableName)
+ .set(v0, new
Literal(Collections.singleton(1), v0.type()))
+ .set(v1, new
AssignmentOperator(AssignmentOperator.Kind.ADD, Reference.of(row, v1)))
+ .value("k", 0)
+ .value("c", 0)
+ .build())
+ .build();
+ coordinator.execute(stmt.toCQL(), QUORUM,
stmt.bindsEncoded());
// is the data correct?
var result = coordinator.executeWithResult("SELECT * FROM " +
qualifiedAccordTableName, QUORUM);
@@ -3260,4 +3265,88 @@ public abstract class AccordCQLTestBase extends
AccordTestBase
.build());
});
}
+
+ @Test
+ public void updateMultipleRows()
+ {
+ SHARED_CLUSTER.schemaChange("CREATE TABLE " + qualifiedAccordTableName
+ " (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH " +
transactionalMode.asCqlParam());
+
+ ICoordinator node = SHARED_CLUSTER.coordinator(1);
+ node.execute("INSERT INTO " + qualifiedAccordTableName + " (pk, ck, v)
VALUES (1, 10, 100)", ConsistencyLevel.QUORUM);
+ node.execute("INSERT INTO " + qualifiedAccordTableName + " (pk, ck, v)
VALUES (1, 20, 200)", ConsistencyLevel.QUORUM);
+ node.execute("INSERT INTO " + qualifiedAccordTableName + " (pk, ck, v)
VALUES (1, 30, 300)", ConsistencyLevel.QUORUM);
+
+ node.execute("BEGIN TRANSACTION\n" +
+ " UPDATE " + qualifiedAccordTableName + '\n' +
+ " SET v = 999\n" +
+ " WHERE pk = 1\n" +
+ " AND ck IN (10, 20);\n" +
+ "COMMIT TRANSACTION", ConsistencyLevel.QUORUM);
+
+ assertRows(node.execute("SELECT * FROM " + qualifiedAccordTableName +
" WHERE pk = 1", ConsistencyLevel.QUORUM),
+ AssertUtils.row(1, 10, 999),
+ AssertUtils.row(1, 20, 999),
+ AssertUtils.row(1, 30, 300));
+ }
+
+ @Test
+ public void deleteMultipleRows()
+ {
+ SHARED_CLUSTER.schemaChange("CREATE TABLE " + qualifiedAccordTableName
+ " (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH " +
transactionalMode.asCqlParam());
+
+ ICoordinator node = SHARED_CLUSTER.coordinator(1);
+ node.execute("INSERT INTO " + qualifiedAccordTableName + " (pk, ck, v)
VALUES (1, 10, 100)", ConsistencyLevel.QUORUM);
+ node.execute("INSERT INTO " + qualifiedAccordTableName + " (pk, ck, v)
VALUES (1, 20, 200)", ConsistencyLevel.QUORUM);
+ node.execute("INSERT INTO " + qualifiedAccordTableName + " (pk, ck, v)
VALUES (1, 30, 300)", ConsistencyLevel.QUORUM);
+
+ node.execute("BEGIN TRANSACTION\n" +
+ " DELETE FROM " + qualifiedAccordTableName + '\n' +
+ " WHERE pk = 1\n" +
+ " AND ck IN (10, 20);\n" +
+ "COMMIT TRANSACTION", ConsistencyLevel.QUORUM);
+
+ // Verify the deletes
+ assertRows(node.execute("SELECT * FROM " + qualifiedAccordTableName +
" WHERE pk = 1", ConsistencyLevel.QUORUM),
+ AssertUtils.row(1, 30, 300));
+ }
+
+ @Test
+ public void usingTimestamp() throws Exception
+ {
+ // This test was discovered by
org.apache.cassandra.distributed.test.cql3.MixedReadsAccordInteropMultiNodeTableWalkTest
+ // It could be simplified but was boiled down to the minimum steps to
reproduce the timestamp issue: USING TIMESTAMP
+ // was not respected operations that happen post read
+ test("CREATE TABLE "+qualifiedAccordTableName+" (\n" +
+ "\t\t pk0 float,\n" +
+ "\t\t pk1 varint,\n" +
+ "\t\t ck0 smallint,\n" +
+ "\t\t ck1 tinyint,\n" +
+ "\t\t v3 uuid,\n" +
+ "\t\t v1 list<uuid>,\n" +
+ "\t\t v4 map<vector<uuid, 1>, frozen<map<smallint, text>>>,\n"
+
+ "\t\t PRIMARY KEY ((pk0, pk1), ck0, ck1)\n" +
+ "\t\t) WITH CLUSTERING ORDER BY (ck0 DESC, ck1 ASC)\n" +
+ "\t\t AND " + transactionalMode.asCqlParam(), cluster -> {
+ ICoordinator node = cluster.coordinator(2);
+ node.execute("UPDATE " + qualifiedAccordTableName + " USING
TIMESTAMP 6 " +
+ "SET " +
+ " v1=[00000000-0000-4e00-8c00-000000000000], " +
+ " v4={[00000000-0000-4800-b300-000000000000]:
{-30955: 'ϵ', -10479: '虑퐕㐧', 7904: '䁋'}}, " +
+ " v3=00000000-0000-4600-b300-000000000000 " +
+ "WHERE pk0 = -1.1763917E35 AND pk1 = -466454 " +
+ " AND ck0 IN (-26786, 10038, 4991) AND ck1
IN (124 - 36, -100)", ConsistencyLevel.QUORUM);
+ AssertUtils.assertRows(node.executeWithResult("SELECT
writetime(v1), writetime(v3), writetime(v4) FROM " + qualifiedAccordTableName +
" WHERE pk0 = -1.1763917E35 AND pk1 = -466454", ConsistencyLevel.QUORUM),
+ QueryResults.builder()
+ .row(List.of(6L), 6L,
List.of(6L))
+ .row(List.of(6L), 6L,
List.of(6L))
+ .row(List.of(6L), 6L,
List.of(6L))
+ .row(List.of(6L), 6L,
List.of(6L))
+ .row(List.of(6L), 6L,
List.of(6L))
+ .row(List.of(6L), 6L,
List.of(6L))
+ .build());
+ node.execute("DELETE FROM " + qualifiedAccordTableName + " USING
TIMESTAMP 8 WHERE pk0 = -1.1763917E35 AND pk1 = -466454",
ConsistencyLevel.QUORUM);
+ QueryResultUtil.assertThat(node.executeWithResult("SELECT * FROM "
+ qualifiedAccordTableName + " WHERE pk0 = -1.1763917E35 AND pk1 = -466454",
ConsistencyLevel.QUORUM))
+ .isEmpty();
+ });
+ }
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordInteroperabilityTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordInteroperabilityTest.java
index 8f072a9782..d38f07ef9b 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordInteroperabilityTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordInteroperabilityTest.java
@@ -32,6 +32,8 @@ import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.shared.AssertUtils;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.service.accord.IAccordService;
+import org.apache.cassandra.utils.AssertionUtils;
+import org.assertj.core.api.Assertions;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -108,8 +110,8 @@ public class AccordInteroperabilityTest extends
AccordTestBase
}
catch (Throwable t)
{
- assertEquals(InvalidRequestException.class.getName(),
t.getClass().getName());
- assertEquals(cl + " is not supported by Accord",
t.getMessage());
+
Assertions.assertThat(t).is(AssertionUtils.isInstanceof(InvalidRequestException.class));
+ assertEquals("ConsistencyLevel " + cl + " is
unsupported with Accord for read, supported are [ONE, QUORUM, ALL, SERIAL]",
t.getMessage());
}
}
});
@@ -130,11 +132,11 @@ public class AccordInteroperabilityTest extends
AccordTestBase
}
catch (Throwable t)
{
- assertEquals(InvalidRequestException.class.getName(),
t.getClass().getName());
+
Assertions.assertThat(t).is(AssertionUtils.isInstanceof(InvalidRequestException.class));
if (cl == ConsistencyLevel.SERIAL || cl ==
ConsistencyLevel.LOCAL_SERIAL)
assertEquals("You must use conditional updates
for serializable writes", t.getMessage());
else
- assertEquals(cl + " is not supported by Accord",
t.getMessage());
+ assertEquals("ConsistencyLevel " + cl + " is
unsupported with Accord for write/commit, supported are [ANY, ONE, QUORUM, ALL,
SERIAL]", t.getMessage());
}
}
});
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
index 6334ff6957..b4c53b6159 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
@@ -221,6 +221,7 @@ public abstract class AccordTestBase extends TestBaseImpl
{
for (String ddl : ddls)
SHARED_CLUSTER.schemaChange(ddl);
+ ClusterUtils.waitForCMSToQuiesce(SHARED_CLUSTER,
ClusterUtils.maxEpoch(SHARED_CLUSTER));
// Evict commands from the cache immediately to expose problems
loading from disk.
// SHARED_CLUSTER.forEach(node -> node.runOnInstance(() ->
AccordService.instance().setCacheSize(0)));
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java
b/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java
index 31d1aab31f..9a466b9493 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/cql3/CasMultiNodeTableWalkBase.java
@@ -79,7 +79,7 @@ public abstract class CasMultiNodeTableWalkBase extends
MultiNodeTableWalkBase
protected Gen<Mutation> toMutationGen(ASTGenerators.MutationGenBuilder
mutationGenBuilder)
{
mutationGenBuilder.withCasGen(i -> true)
- .withAllowUpdateMultipleClusteringKeys(false);
// paxos supports but the model doesn't yet
+ .disallowUpdateMultipleRows(); // paxos supports
but the model doesn't yet
// generator might not always generate a cas statement... should
fix generator!
Gen<Mutation> gen =
toGen(mutationGenBuilder.build()).filter(Mutation::isCas);
if (metadata.regularAndStaticColumns().stream().anyMatch(c ->
c.type.isUDT())
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTokenConflictTest.java
b/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTokenConflictTest.java
index 216b1e79c6..703c4e3c93 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTokenConflictTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTokenConflictTest.java
@@ -384,9 +384,7 @@ public class SingleNodeTokenConflictTest extends
StatefulASTBase
this.mutationGen = toGen(new
ASTGenerators.MutationGenBuilder(metadata)
- .withoutTransaction()
- .withoutTtl()
- .withoutTimestamp()
+ .withTxnSafe()
.withPartitions(SourceDSL.arbitrary().pick(uniquePartitions))
.withIgnoreIssues(IGNORED_ISSUES)
.build());
diff --git
a/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
b/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
index 8d6bc02773..8895a7271b 100644
---
a/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
+++
b/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
@@ -148,7 +148,7 @@ public class AccordTopologyMixupTest extends
TopologyMixupTestBase<AccordTopolog
private static CommandGen<Spec> cqlOperations(Spec spec)
{
Gen<Statement> select = (Gen<Statement>) (Gen<?>) fromQT(new
ASTGenerators.SelectGenBuilder(spec.metadata).withLimit1().build());
- Gen<Statement> mutation = (Gen<Statement>) (Gen<?>) fromQT(new
ASTGenerators.MutationGenBuilder(spec.metadata).withoutTimestamp().withoutTtl().withAllowUpdateMultipleClusteringKeys(false).build());
+ Gen<Statement> mutation = (Gen<Statement>) (Gen<?>) fromQT(new
ASTGenerators.MutationGenBuilder(spec.metadata).withTxnSafe().disallowUpdateMultiplePartitionKeys().build());
Gen<Statement> txn = (Gen<Statement>) (Gen<?>) fromQT(new
ASTGenerators.TxnGenBuilder(spec.metadata).build());
Map<Gen<Statement>, Integer> operations = new LinkedHashMap<>();
operations.put(select, 1);
diff --git
a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
index 3d2bf9a74d..c914350328 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
@@ -857,7 +857,10 @@ public class ASTSingleTableModel
SelectResult result = (SelectResult) o;
if (result.rows.length == 0)
return null;
- return result.rows[0][result.columns.indexOf(symbol)];
+ ByteBuffer bb = result.rows[0][result.columns.indexOf(symbol)];
+ if (bb != null && symbol.type().isNull(bb))
+ bb = null;
+ return bb;
}
else
{
@@ -1194,7 +1197,9 @@ public class ASTSingleTableModel
sb.append("\n\tDiff (expected over actual):\n");
Row eSmall = e.select(smallestDiff);
Row aSmall = smallest.select(smallestDiff);
- sb.append(table(eSmall.columns, Arrays.asList(eSmall,
aSmall)));
+ // Symbol.toString is just the column name, it is easier to
understand what is going on if the type is included,
+ // which is done in Symbol.detailedName: name type (reversed)?
+
sb.append(table(eSmall.columns.stream().map(Symbol::detailedName).collect(Collectors.toList()),
Arrays.asList(eSmall, aSmall)));
}
}
else
@@ -1272,7 +1277,12 @@ public class ASTSingleTableModel
private static String table(ImmutableUniqueList<Symbol> columns,
Collection<Row> rows)
{
- return
TableBuilder.toStringPiped(columns.stream().map(Symbol::toCQL).collect(Collectors.toList()),
+ return
table(columns.stream().map(Symbol::toCQL).collect(Collectors.toList()), rows);
+ }
+
+ private static String table(List<String> columns, Collection<Row> rows)
+ {
+ return TableBuilder.toStringPiped(columns,
// intellij or junit can be tripped
up by utf control or invisible chars, so this logic tries to normalize to make
things more safe
() -> rows.stream()
.map(r ->
r.asCQL().stream().map(StringUtils::escapeControlChars).collect(Collectors.toList()))
diff --git
a/test/simulator/test/org/apache/cassandra/simulator/test/SingleNodeSingleTableASTTest.java
b/test/simulator/test/org/apache/cassandra/simulator/test/SingleNodeSingleTableASTTest.java
index 8f9635641e..7924343dfd 100644
---
a/test/simulator/test/org/apache/cassandra/simulator/test/SingleNodeSingleTableASTTest.java
+++
b/test/simulator/test/org/apache/cassandra/simulator/test/SingleNodeSingleTableASTTest.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.junit.Ignore;
import org.junit.Test;
import accord.utils.DefaultRandom;
@@ -122,7 +121,6 @@ public class SingleNodeSingleTableASTTest extends
SimulationTestBase
}
@Test
- @Ignore
public void normal() throws IOException
{
// testOne(SimulationRunner.parseHex("0x2fd91c2a2be59d7d"),
SingleTableASTSimulation::new);
@@ -137,7 +135,6 @@ public class SingleNodeSingleTableASTTest extends
SimulationTestBase
}
@Test
- @Ignore
public void accordMixedReads() throws IOException
{
// testOne(SimulationRunner.parseHex("0x2fd91c2a2be59d7d"),
SingleTableASTSimulation.MixedReadsAccordSingleTableASTSimulation::new);
diff --git
a/test/simulator/test/org/apache/cassandra/simulator/test/SingleTableASTSimulation.java
b/test/simulator/test/org/apache/cassandra/simulator/test/SingleTableASTSimulation.java
index bae0740295..ef1f60b470 100644
---
a/test/simulator/test/org/apache/cassandra/simulator/test/SingleTableASTSimulation.java
+++
b/test/simulator/test/org/apache/cassandra/simulator/test/SingleTableASTSimulation.java
@@ -70,6 +70,7 @@ import
org.apache.cassandra.simulator.systems.SimulatedActionCallable;
import org.apache.cassandra.simulator.systems.SimulatedSystems;
import org.apache.cassandra.utils.ASTGenerators;
import org.apache.cassandra.utils.AbstractTypeGenerators;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CassandraGenerators;
import org.apache.cassandra.utils.FastByteOperations;
import org.apache.cassandra.utils.Generators;
@@ -241,7 +242,7 @@ public class SingleTableASTSimulation extends
SimulationTestBase.SimpleSimulatio
.uniqueBestEffort()
.ofSize(rs.nextInt(1, 20))
.next(rs);
- Gen<Action> mutationGen = toGen(ASTGenerators.mutationBuilder(rs,
model, uniquePartitions, i -> null).disallowEmpty().build())
+ Gen<Action> mutationGen = toGen(ASTGenerators.mutationBuilder(rs,
model, uniquePartitions, i -> null).build())
.map(mutation -> query(mutation));
Gen<Action> selectPartitionGen = Gens.pick(uniquePartitions)
@@ -393,12 +394,12 @@ public class SingleTableASTSimulation extends
SimulationTestBase.SimpleSimulatio
private IIsolatedExecutor.SerializableCallable<Object[][]>
query(Statement statement, ConsistencyLevel cl)
{
- // Simulator acts differently than jvm-dtest, so ByteBuffer isn't
safe!
- // java.lang.RuntimeException: java.io.NotSerializableException:
java.nio.HeapByteBuffer
- // So switch to literals for now
- statement = statement.visit(StandardVisitors.BIND_TO_LITERAL);
String cql = statement.toCQL();
- Object[] binds = statement.binds();
+ ByteBuffer[] encoded = statement.bindsEncoded();
+ // Simulator doesn't support ByteBuffer (fails to transfer), so
need to convert to byte[], which will get converted to ByteBuffer within
Cassandra.
+ Object[] binds = new Object[encoded.length];
+ for (int i = 0; i < encoded.length; i++)
+ binds[i] = encoded[i] == null ? null :
ByteBufferUtil.getArray(encoded[i]);
return () -> {
Query q = new Query(cql, Long.MIN_VALUE, false, cl, null,
binds);
return q.call().toObjectArrays();
diff --git
a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index b16cdc9734..ffd0522ee8 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -115,6 +115,7 @@ public class DatabaseDescriptorRefTest
"org.apache.cassandra.config.Config$CorruptedTombstoneStrategy",
"org.apache.cassandra.config.Config$BatchlogEndpointStrategy",
"org.apache.cassandra.config.Config$TombstonesMetricGranularity",
+
"org.apache.cassandra.service.consensus.UnsupportedTransactionConsistencyLevel",
"org.apache.cassandra.repair.autorepair.AutoRepairConfig",
"org.apache.cassandra.repair.autorepair.AutoRepairConfig$Options",
"org.apache.cassandra.repair.autorepair.AutoRepairConfig$RepairType",
diff --git a/test/unit/org/apache/cassandra/cql3/ast/ExpressionTest.java
b/test/unit/org/apache/cassandra/cql3/ast/ExpressionTest.java
index a2805e8569..12174e0a26 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/ExpressionTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/ExpressionTest.java
@@ -18,6 +18,10 @@
package org.apache.cassandra.cql3.ast;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+
import org.junit.Test;
import accord.utils.Gen;
@@ -55,8 +59,18 @@ public class ExpressionTest
Txn txn = Txn.builder()
.addLet("a", selectWithBind())
.addReturn(selectWithBind())
+ .addIf(new Conditional.Is(Symbol.unknownType("a"),
Conditional.Is.Kind.NotNull),
+ new Mutation.Insert(new
TableReference(Optional.empty(), "tbl"), new
LinkedHashMap<>(Map.of(Symbol.unknownType("a"), Bind.of(0))), false,
Optional.empty()))
.build();
assertNoBind(txn.visit(StandardVisitors.BIND_TO_LITERAL));
+
+ txn = Txn.builder()
+ .addLet("a", selectWithBind())
+ .addReturn(selectWithBind())
+ .addIf(Where.create(Where.Inequality.EQUAL, Bind.of(0),
Bind.of(42)),
+ new Mutation.Insert(new
TableReference(Optional.empty(), "tbl"), new
LinkedHashMap<>(Map.of(Symbol.unknownType("a"), Bind.of(0))), false,
Optional.empty()))
+ .build();
+ assertNoBind(txn.visit(StandardVisitors.BIND_TO_LITERAL));
}
private static Select selectWithBind()
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Literal.java
b/test/unit/org/apache/cassandra/cql3/ast/Literal.java
index eb6d83df41..09973bd0aa 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Literal.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Literal.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.StringType;
+import org.apache.cassandra.db.marshal.UTF8Type;
public class Literal implements Value
{
@@ -47,6 +48,11 @@ public class Literal implements Value
return new Literal(value, LongType.instance);
}
+ public static Literal of(String value)
+ {
+ return new Literal(value, UTF8Type.instance);
+ }
+
@Override
public AbstractType<?> type()
{
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Symbol.java
b/test/unit/org/apache/cassandra/cql3/ast/Symbol.java
index f905218c45..e492293691 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Symbol.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Symbol.java
@@ -121,7 +121,7 @@ public class Symbol implements ReferenceExpression,
Comparable<Symbol>
public String detailedName()
{
- return symbol + " " + type.asCQL3Type() + (reversed ? " (reversed)" :
"");
+ return symbol + ' ' + type.asCQL3Type() + (reversed ? " (reversed)" :
"");
}
@Override
@@ -172,11 +172,5 @@ public class Symbol implements ReferenceExpression,
Comparable<Symbol>
UnquotedSymbol symbol1 = (UnquotedSymbol) o;
return Objects.equals(symbol, symbol1.symbol);
}
-
- @Override
- public int hashCode()
- {
- return Objects.hash(symbol);
- }
}
}
diff --git a/test/unit/org/apache/cassandra/utils/ASTGenerators.java
b/test/unit/org/apache/cassandra/utils/ASTGenerators.java
index 4f502b19bb..860995b28f 100644
--- a/test/unit/org/apache/cassandra/utils/ASTGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/ASTGenerators.java
@@ -411,6 +411,7 @@ public class ASTGenerators
private Map<Symbol, ExpressionBuilder> columnExpressions = new
LinkedHashMap<>();
private boolean allowPartitionOnlyUpdate = true;
private boolean allowPartitionOnlyInsert = true;
+ private boolean allowUpdateMultiplePartitionKeys = true;
private boolean allowUpdateMultipleClusteringKeys = true;
private EnumSet<KnownIssue> ignoreIssues = IGNORED_ISSUES;
@@ -449,12 +450,46 @@ public class ASTGenerators
return this;
}
- public MutationGenBuilder
withAllowUpdateMultipleClusteringKeys(boolean allowUpdateMultipleClusteringKeys)
+ public MutationGenBuilder allowUpdateMultiplePartitionKeys()
+ {
+ return withAllowUpdateMultiplePartitionKeys(true);
+ }
+
+ public MutationGenBuilder disallowUpdateMultiplePartitionKeys()
+ {
+ return withAllowUpdateMultiplePartitionKeys(false);
+ }
+
+ private MutationGenBuilder
withAllowUpdateMultiplePartitionKeys(boolean allowUpdateMultiplePartitionKeys)
+ {
+ this.allowUpdateMultiplePartitionKeys =
allowUpdateMultiplePartitionKeys;
+ return this;
+ }
+
+ public MutationGenBuilder allowUpdateMultipleClusteringKeys()
+ {
+ return withAllowUpdateMultipleClusteringKeys(true);
+ }
+
+ public MutationGenBuilder disallowUpdateMultipleClusteringKeys()
+ {
+ return withAllowUpdateMultipleClusteringKeys(false);
+ }
+
+ private MutationGenBuilder
withAllowUpdateMultipleClusteringKeys(boolean allowUpdateMultipleClusteringKeys)
{
this.allowUpdateMultipleClusteringKeys =
allowUpdateMultipleClusteringKeys;
return this;
}
+ /**
+ * Tells the generator to avoid producing updates that do multiple
partition and clustering keys.
+ */
+ public MutationGenBuilder disallowUpdateMultipleRows()
+ {
+ return
disallowUpdateMultiplePartitionKeys().disallowUpdateMultipleClusteringKeys();
+ }
+
public MutationGenBuilder
withColumnExpressions(Consumer<ExpressionBuilder> fn)
{
for (Symbol symbol : allColumns)
@@ -719,7 +754,7 @@ public class ASTGenerators
var timestamp = timestampGen.generate(rnd);
if (timestamp.isPresent())
builder.timestamp(valueGen(timestamp.getAsLong(),
LongType.instance).generate(rnd));
- if (allowUpdateMultipleClusteringKeys)
+ if (allowUpdateMultiplePartitionKeys)
where(rnd, columnExpressions, builder,
partitionColumns, partitionValueGen);
else
values(rnd, columnExpressions, builder,
partitionColumns, partitionValueGen);
@@ -1056,7 +1091,7 @@ public class ASTGenerators
}
MutationGenBuilder mutationBuilder = new
MutationGenBuilder(metadata)
.withTxnSafe()
-
.withAllowUpdateMultipleClusteringKeys(false)
+
.disallowUpdateMultiplePartitionKeys()
.withReferences(new
ArrayList<>(builder.allowedReferences()));
if (!allowReferences)
mutationBuilder.withReferences(Collections.emptyList());
@@ -1183,11 +1218,9 @@ public class ASTGenerators
private Gen<Mutation> mutationGen(RandomSource rs,
LinkedHashMap<Symbol, Object> pk)
{
- MutationGenBuilder builder = mutationBuilder(IGNORED_ISSUES, rs,
model, List.of(pk), indexes);
- builder.withTxnSafe()
- //TODO (now, coverage): remove this as we should support
- // working around the bug to make progress
- .withAllowUpdateMultipleClusteringKeys(false);
+ MutationGenBuilder builder = mutationBuilder(IGNORED_ISSUES, rs,
model, List.of(pk), indexes)
+ .withTxnSafe()
+
.disallowUpdateMultiplePartitionKeys();
if (!allowEmpty)
builder.disallowEmpty();
return builder.build();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]