This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 61b3d5a54b Add support for BEGIN TRANSACTION to allow mutations that
touch multiple partitions
61b3d5a54b is described below
commit 61b3d5a54befe440044ad86159a62fee487229eb
Author: David Capwell <[email protected]>
AuthorDate: Mon Aug 25 14:08:15 2025 -0700
Add support for BEGIN TRANSACTION to allow mutations that touch multiple
partitions
patch by David Capwell; reviewed by Ariel Weisberg for CASSANDRA-20857
---
CHANGES.txt | 1 +
.../cassandra/cql3/statements/CQL3CasRequest.java | 6 +-
.../cql3/statements/ModificationStatement.java | 42 ++++---
.../cql3/statements/TransactionStatement.java | 50 +++++++-
.../serializers/AbstractSortedCollector.java | 8 ++
.../service/consensus/TransactionalMode.java | 8 ++
.../distributed/test/accord/AccordCQLTestBase.java | 63 ++++++++++
.../cql3/AccordInteropMultiNodeTableWalkBase.java | 15 +++
.../test/cql3/SingleNodeTableWalkTest.java | 41 ++++---
.../distributed/test/cql3/StatefulASTBase.java | 114 +++++-------------
.../cassandra/distributed/util/DriverUtils.java | 129 +++++++++++++++++++++
.../fuzz/topology/AccordTopologyMixupTest.java | 2 +-
.../cassandra/harry/model/ASTSingleTableModel.java | 4 +-
.../harry/model/ASTSingleTableModelTest.java | 35 ++++++
.../org/apache/cassandra/utils/ASTGenerators.java | 5 +-
15 files changed, 391 insertions(+), 132 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index b61cd947e0..08ae2c6162 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Add support for BEGIN TRANSACTION to allow mutations that touch multiple
partitions (CASSANDRA-20857)
* AutoRepair: Safeguard Full repair against disk protection(CASSANDRA-20045)
* BEGIN TRANSACTION crashes if a mutation touches multiple rows
(CASSANDRA-20844)
* Fix version range check in MessagingService.getVersionOrdinal
(CASSANDRA-20842)
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index cc1680b19a..080a705ffa 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -558,15 +558,13 @@ public class CQL3CasRequest implements CASRequest
// see CASSANDRA-18337
ModificationStatement modification = update.stmt.forTxn();
QueryOptions options = update.options;
- TxnWrite.Fragment fragment =
modification.getTxnWriteFragment(idx++, state, options, partitionKey);
- fragments.add(fragment);
+ fragments.addAll(modification.getTxnWriteFragment(idx++, state,
options, partitionKey));
}
for (RangeDeletion rangeDeletion : rangeDeletions)
{
ModificationStatement modification = rangeDeletion.stmt;
QueryOptions options = rangeDeletion.options;
- TxnWrite.Fragment fragment =
modification.getTxnWriteFragment(idx++, state, options, partitionKey);
- fragments.add(fragment);
+ fragments.addAll(modification.getTxnWriteFragment(idx++, state,
options, partitionKey));
}
return fragments;
}
diff --git
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index fa7aaaebaa..4544926fb9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -39,6 +39,7 @@ import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import accord.utils.Invariants;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.cql3.Attributes;
import org.apache.cassandra.cql3.CQLStatement;
@@ -886,15 +887,15 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
}
}
- public PartitionUpdate getTxnUpdate(ClientState state, QueryOptions
options)
+ public List<PartitionUpdate> getTxnUpdate(ClientState state, QueryOptions
options)
{
List<? extends IMutation> mutations = getMutations(state, options,
false, 0, 0, new Dispatcher.RequestTime(0, 0));
- // TODO: Temporary fix for CASSANDRA-20079
if (mutations.isEmpty())
- return PartitionUpdate.emptyUpdate(metadata,
metadata.partitioner.decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER));
- if (mutations.size() != 1)
- throw new IllegalArgumentException("When running withing a
transaction, modification statements may only mutate a single partition");
- return
Iterables.getOnlyElement(mutations.get(0).getPartitionUpdates());
+ return Collections.emptyList();
+ List<PartitionUpdate> updates = new ArrayList<>(mutations.size());
+ for (IMutation m : mutations)
+ updates.addAll(m.getPartitionUpdates());
+ return updates;
}
private static List<TxnReferenceOperation>
getTxnReferenceOps(List<ReferenceOperation> operations, QueryOptions options)
@@ -948,20 +949,33 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
return operations.allSubstitutions();
}
- public TxnWrite.Fragment getTxnWriteFragment(int index, ClientState state,
QueryOptions options, PartitionKey partitionKey)
+ public List<TxnWrite.Fragment> getTxnWriteFragment(int index, ClientState
state, QueryOptions options, PartitionKey partitionKey)
{
- PartitionUpdate baseUpdate = getTxnUpdate(state, options);
- TxnReferenceOperations referenceOps = getTxnReferenceOps(options,
state);
- long timestamp = attrs.isTimestampSet() ?
attrs.getTimestamp(TxnWrite.NO_TIMESTAMP, options) : TxnWrite.NO_TIMESTAMP;
- return new TxnWrite.Fragment(partitionKey, index, baseUpdate,
referenceOps, timestamp);
+ return getTxnWriteFragment(index, state, options, baseUpdate -> {
+
Invariants.require(baseUpdate.partitionKey().equals(partitionKey.partitionKey()),
"PartitionUpdate generated a partition key different than the one expected");
+ return partitionKey;
+ });
+ }
+
+ public List<TxnWrite.Fragment> getTxnWriteFragment(int index, ClientState
state, QueryOptions options, KeyCollector keyCollector)
+ {
+ return getTxnWriteFragment(index, state, options, baseUpdate ->
keyCollector.collect(baseUpdate.metadata(), baseUpdate.partitionKey()));
}
- public TxnWrite.Fragment getTxnWriteFragment(int index, ClientState state,
QueryOptions options, KeyCollector keyCollector)
+ private List<TxnWrite.Fragment> getTxnWriteFragment(int index, ClientState
state, QueryOptions options, java.util.function.Function<PartitionUpdate,
PartitionKey> keyCollector)
{
- PartitionUpdate baseUpdate = getTxnUpdate(state, options);
+ List<PartitionUpdate> baseUpdates = getTxnUpdate(state, options);
TxnReferenceOperations referenceOps = getTxnReferenceOps(options,
state);
long timestamp = attrs.isTimestampSet() ?
attrs.getTimestamp(TxnWrite.NO_TIMESTAMP, options) : TxnWrite.NO_TIMESTAMP;
- return new
TxnWrite.Fragment(keyCollector.collect(baseUpdate.metadata(),
baseUpdate.partitionKey()), index, baseUpdate, referenceOps, timestamp);
+ if (baseUpdates.size() == 1)
+ {
+ PartitionUpdate baseUpdate = baseUpdates.get(0);
+ return Collections.singletonList(new
TxnWrite.Fragment(keyCollector.apply(baseUpdate), index, baseUpdate,
referenceOps, timestamp));
+ }
+ List<TxnWrite.Fragment> fragments = new
ArrayList<>(baseUpdates.size());
+ for (PartitionUpdate baseUpdate : baseUpdates)
+ fragments.add(new
TxnWrite.Fragment(keyCollector.apply(baseUpdate), index, baseUpdate,
referenceOps, timestamp));
+ return fragments;
}
final void addUpdates(UpdatesCollector collector,
diff --git
a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
index 6ca18b9f02..2a96bcc16d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
@@ -36,6 +37,8 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import org.slf4j.LoggerFactory;
+
import accord.api.Key;
import accord.primitives.Keys;
import accord.primitives.Routable.Domain;
@@ -90,6 +93,7 @@ import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.NoSpamLogger;
import static accord.primitives.Txn.Kind.Read;
import static com.google.common.base.Preconditions.checkArgument;
@@ -125,6 +129,10 @@ public class TransactionStatement implements
CQLStatement.CompositeCQLStatement,
public static final String ILLEGAL_RANGE_QUERY_MESSAGE = "Range queries
are not allowed for reads within a transaction; %s %s";
public static final String UNSUPPORTED_MIGRATION = "Transaction Statement
is unsupported when migrating away from Accord or before migration to Accord is
complete for a range";
public static final String NO_PARTITION_IN_CLAUSE_WITH_LIMIT = "Partition
key is present in IN clause and there is a LIMIT... this is currently not
supported; %s statement %s";
+ public static final String WRITE_TXN_EMPTY_WITH_IGNORED_READS = "Write txn
produced no mutation, and its reads do not return to the caller; ignoring...";
+ public static final String WRITE_TXN_EMPTY_WITH_NO_READS = "Write txn
produced no mutation, and had no reads; ignoring...";
+
+ private static NoSpamLogger noSpamLogger =
NoSpamLogger.getLogger(LoggerFactory.getLogger(TransactionStatement.class), 1,
TimeUnit.MINUTES);
static class NamedSelect
{
@@ -350,9 +358,8 @@ public class TransactionStatement implements
CQLStatement.CompositeCQLStatement,
int idx = 0;
for (ModificationStatement modification : updates)
{
- TxnWrite.Fragment fragment = modification.getTxnWriteFragment(idx,
state, options, keyCollector);
- minEpoch = Math.max(minEpoch,
fragment.baseUpdate.metadata().epoch.getEpoch());
- fragments.add(fragment);
+ minEpoch = Math.max(minEpoch,
modification.metadata().epoch.getEpoch());
+ fragments.addAll(modification.getTxnWriteFragment(idx, state,
options, keyCollector));
if
(modification.allReferenceOperations().stream().anyMatch(ReferenceOperation::requiresRead))
{
@@ -447,6 +454,7 @@ public class TransactionStatement implements
CQLStatement.CompositeCQLStatement,
}
@VisibleForTesting
+ @Nullable
public Txn createTxn(ClientState state, QueryOptions options)
{
ClusterMetadata cm = ClusterMetadata.current();
@@ -467,8 +475,15 @@ public class TransactionStatement implements
CQLStatement.CompositeCQLStatement,
{
Int2ObjectHashMap<NamedSelect> autoReads = new
Int2ObjectHashMap<>();
List<TxnWrite.Fragment> writeFragments =
createWriteFragments(state, options, autoReads, keyCollector);
- ConsistencyLevel commitCL = consistencyLevelForAccordCommit(cm,
tables, keyCollector, options.getConsistency());
List<TxnNamedRead> reads = createNamedReads(options, autoReads,
keyCollector);
+ if (writeFragments.isEmpty()) // ModificationStatement yield no
Mutation (DELETE WHERE pk=0 AND c < 0 AND c > 0 -- matches no keys; so has no
mutation)
+ {
+ // cleanup memory
+ keyCollector.clear();
+ autoReads.clear();
+ return maybeCreateTxnFromEmptyWrites(cm, options, tables);
+ }
+ ConsistencyLevel commitCL = consistencyLevelForAccordCommit(cm,
tables, keyCollector, options.getConsistency());
Keys keys = keyCollector.build();
AccordUpdate update = new TxnUpdate(tables, writeFragments,
createCondition(options), commitCL, PreserveTimestamp.no);
TxnRead read = createTxnRead(tables, reads, null, Domain.Key);
@@ -476,6 +491,31 @@ public class TransactionStatement implements
CQLStatement.CompositeCQLStatement,
}
}
+ @Nullable
+ private Txn.InMemory maybeCreateTxnFromEmptyWrites(ClusterMetadata cm,
QueryOptions options, TableMetadatas.Complete tables)
+ {
+ TableMetadatasAndKeys.KeyCollector keyCollector = new
TableMetadatasAndKeys.KeyCollector(tables);
+ List<TxnNamedRead> reads = createNamedReads(options, null,
keyCollector);
+ if (reads.isEmpty())
+ {
+ // no reads, this is a no-op
+ noSpamLogger.info(WRITE_TXN_EMPTY_WITH_NO_READS);
+ return null;
+ }
+ if (returningSelect == null && returningReferences == null)
+ {
+ // the reads were for the mutation, and since the mutation doesn't
exist the reads are not needed
+ noSpamLogger.info(WRITE_TXN_EMPTY_WITH_IGNORED_READS);
+ return null;
+ }
+
+ // Return a read only txn
+ Keys keys = keyCollector.build();
+ TxnRead read = createTxnRead(tables, reads,
consistencyLevelForAccordRead(cm, tables, keys,
options.getSerialConsistency()), Domain.Key);
+ Txn.Kind kind = shouldReadEphemerally(keys,
tables.getMetadata((TableId)keys.get(0).prefix()).params, Read);
+ return new Txn.InMemory(kind, keys, read, TxnQuery.ALL, null, new
TableMetadatasAndKeys(tables, keys));
+ }
+
/**
* Returns {@code true} only if the statement selects multiple clusterings
in a partition
*/
@@ -514,6 +554,8 @@ public class TransactionStatement implements
CQLStatement.CompositeCQLStatement,
}
Txn txn = createTxn(state.getClientState(), options);
+ if (txn == null)
+ return new ResultMessage.Void();
TxnResult txnResult = AccordService.instance().coordinate(minEpoch,
txn, options.getConsistency(), requestTime);
if (txnResult.kind() == retry_new_protocol)
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/AbstractSortedCollector.java
b/src/java/org/apache/cassandra/service/accord/serializers/AbstractSortedCollector.java
index d979dab3fa..80672054ed 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/AbstractSortedCollector.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/AbstractSortedCollector.java
@@ -97,6 +97,14 @@ public abstract class AbstractSortedCollector<T, C> extends
AbstractList<T>
return add;
}
+ public void clear()
+ {
+ if (count > 1)
+ cachedAny().forceDiscard((Object[])buffer, count);
+ buffer = null;
+ count = 0;
+ }
+
public C build()
{
if (count == 0)
diff --git
a/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java
b/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java
index 5355d33d10..b2336522c6 100644
--- a/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java
+++ b/src/java/org/apache/cassandra/service/consensus/TransactionalMode.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.service.consensus;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.AbstractBounds;
@@ -289,6 +291,12 @@ public enum TransactionalMode
return valueOf(toLowerCaseLocalized(name));
}
+ @VisibleForTesting
+ public static TransactionalMode[] supported()
+ {
+ return new TransactionalMode[]{ TransactionalMode.off,
TransactionalMode.mixed_reads, TransactionalMode.full };
+ }
+
public boolean isTestMode()
{
return name().startsWith("test_");
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
index f89fb1a986..da972c9432 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -78,6 +79,7 @@ import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.AccordTestUtils;
import org.apache.cassandra.service.consensus.TransactionalMode;
import
org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode;
+import org.apache.cassandra.utils.AssertionUtils;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FailingConsumer;
import org.apache.cassandra.utils.Pair;
@@ -3349,4 +3351,65 @@ public abstract class AccordCQLTestBase extends
AccordTestBase
.isEmpty();
});
}
+
+ @Test
+ public void emptyModification() throws Exception
+ {
+ test("CREATE TABLE " + qualifiedAccordTableName + " (k int, s int
static, c int, v int, PRIMARY KEY (k, c)) WITH " +
transactionalMode.asCqlParam(), cluster -> {
+ String deleteStmt = "DELETE FROM " + qualifiedAccordTableName + "
WHERE k=0 AND c < 0 AND c > 0";
+ String selectStmt = "SELECT * FROM " + qualifiedAccordTableName +
" WHERE k=0";
+ ICoordinator node = cluster.coordinator(1);
+ node.execute("INSERT INTO " + qualifiedAccordTableName + " (k, s,
c, v) VALUES (0, 0, 0, 0)", QUORUM);
+
+ // CAS rejects
+ Assertions.assertThatThrownBy(() -> node.execute(deleteStmt + " IF
s=0", QUORUM))
+
.is(AssertionUtils.isInstanceof(InvalidRequestException.class))
+ .hasMessageContaining("DELETE statements must restrict
all PRIMARY KEY columns with equality relations");
+
+ // BEGIN TRANSACTION does not! This should no-op (user has no way
to know it did)
+ node.execute(wrapInTxn(deleteStmt), QUORUM);
+
Assertions.assertThat(node.instance().logs().watchFor(TransactionStatement.WRITE_TXN_EMPTY_WITH_NO_READS).getResult()).isNotEmpty();
+
+ // if there was a read, the txn was downgraded to a read txn
+ var results = node.execute(wrapInTxn(selectStmt, deleteStmt),
QUORUM);
+ Assertions.assertThat(results).hasDimensions(1, 4);
+
+ // there are lets but no returning
+ node.execute(wrapInTxn("LET a = (" + selectStmt + " LIMIT 1" +
')', deleteStmt), QUORUM);
+
Assertions.assertThat(node.instance().logs().watchFor(TransactionStatement.WRITE_TXN_EMPTY_WITH_IGNORED_READS).getResult()).isNotEmpty();
+ });
+ }
+
+ @Test
+ public void multiPartitionUpdate() throws Exception
+ {
+ test("CREATE TABLE " + qualifiedAccordTableName + "(k int PRIMARY KEY,
v int) WITH " + transactionalMode.asCqlParam(), cluster -> {
+ var node = cluster.coordinator(1);
+ int numPartitions = 10;
+ for (int i = 0; i < numPartitions; i++)
+ node.execute("INSERT INTO " + qualifiedAccordTableName + "(k,
v) VALUES (?, ?)", QUORUM, i, 0);
+
+ Object[] binds = IntStream.range(0,
numPartitions).boxed().toArray();
+ String where = "WHERE k IN (" + IntStream.range(0,
numPartitions).mapToObj(i -> "?").collect(Collectors.joining(", ")) + ')';
+ String updateCQL = "UPDATE " + qualifiedAccordTableName + " SET
v=1 " + where;
+ String deleteCQL = "DELETE FROM " + qualifiedAccordTableName + ' '
+ where;
+
+ // update multiple partitions at once
+ node.execute(wrapInTxn(updateCQL), QUORUM, binds);
+ for (int i = 0; i < numPartitions; i++)
+ {
+ var qr = node.executeWithResult("SELECT v FROM " +
qualifiedAccordTableName + " WHERE k=?", QUORUM, i);
+
QueryResultUtil.assertThat(qr).isEqualTo(QueryResults.builder().row(1).build());
+ }
+
+ // now delete
+ node.execute(wrapInTxn(deleteCQL), QUORUM, binds);
+
+ for (int i = 0; i < numPartitions; i++)
+ {
+ var qr = node.executeWithResult("SELECT v FROM " +
qualifiedAccordTableName + " WHERE k=?", QUORUM, i);
+ QueryResultUtil.assertThat(qr).isEmpty();
+ }
+ });
+ }
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/cql3/AccordInteropMultiNodeTableWalkBase.java
b/test/distributed/org/apache/cassandra/distributed/test/cql3/AccordInteropMultiNodeTableWalkBase.java
index 7d54906b7d..d1a5e0f32e 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/cql3/AccordInteropMultiNodeTableWalkBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/cql3/AccordInteropMultiNodeTableWalkBase.java
@@ -18,9 +18,13 @@
package org.apache.cassandra.distributed.test.cql3;
+import javax.annotation.Nullable;
+
import accord.utils.Property;
import accord.utils.RandomSource;
import org.apache.cassandra.cql3.KnownIssue;
+import org.apache.cassandra.cql3.ast.Mutation;
+import org.apache.cassandra.cql3.ast.Txn;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.shared.ClusterUtils;
@@ -86,11 +90,14 @@ Suppressed: java.lang.AssertionError: Unknown keyspace ks12
public class AccordInteropMultiNodeState extends MultiNodeState
{
private final boolean allowUsingTimestamp;
+ private final float wrapMutationAsTxn;
public AccordInteropMultiNodeState(RandomSource rs, Cluster cluster)
{
super(rs, cluster);
allowUsingTimestamp = rs.nextBoolean();
+ // when USING TIMESTAMP is done for the mutation, BEGIN
TRANSACTION can't be supported as it doesn't allow that syntax; so need to
disable wrapping mutations
+ wrapMutationAsTxn = allowUsingTimestamp ? 0F : rs.nextFloat();
}
@Override
@@ -102,6 +109,14 @@ Suppressed: java.lang.AssertionError: Unknown keyspace ks12
ClusterUtils.awaitAccordEpochReady(cluster, maxEpoch.getEpoch());
}
+ @Override
+ protected <S extends BaseState> Property.Command<S, Void, ?>
command(RandomSource rs, Mutation mutation, @Nullable String annotate)
+ {
+ if (wrapMutationAsTxn != 0 && rs.decide(wrapMutationAsTxn))
+ return super.command(rs, Txn.wrap(mutation), annotate);
+ return super.command(rs, mutation, annotate);
+ }
+
@Override
protected boolean allowUsingTimestamp()
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java
b/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java
index a890efed4c..a2a526121b 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java
@@ -444,28 +444,33 @@ public class SingleNodeTableWalkTest extends
StatefulASTBase
cluster.forEach(i -> i.nodetoolResult("disableautocompaction",
metadata.keyspace, this.metadata.name).asserts().success());
- List<LinkedHashMap<Symbol, Object>> uniquePartitions;
+ ASTGenerators.MutationGenBuilder mutationGenBuilder = new
ASTGenerators.MutationGenBuilder(metadata)
+
.withTxnSafe()
+
.withColumnExpressions(e ->
e.withOperators(Generators.fromGen(BOOLEAN_DISTRIBUTION.next(rs))))
+
.withIgnoreIssues(IGNORED_ISSUES);
+
+ // Run the test with and without bound partitions
+ // When using fixed partitions, each mutation will be for a single
partition and will use pk=? syntax
+ // When using unbounded partitions then IN clause is used on
partition keys, leading to mutations touching multiple partitions
+ if (rs.nextBoolean())
{
- int unique = rs.nextInt(1, 10);
- List<Symbol> columns = model.factory.partitionColumns;
- List<Gen<?>> gens = new ArrayList<>(columns.size());
- for (int i = 0; i < columns.size(); i++)
-
gens.add(toGen(getTypeSupport(columns.get(i).type()).valueGen));
- uniquePartitions = Gens.lists(r2 -> {
- LinkedHashMap<Symbol, Object> vs = new LinkedHashMap<>();
+ List<LinkedHashMap<Symbol, Object>> uniquePartitions;
+ {
+ int unique = rs.nextInt(1, 10);
+ List<Symbol> columns = model.factory.partitionColumns;
+ List<Gen<?>> gens = new ArrayList<>(columns.size());
for (int i = 0; i < columns.size(); i++)
- vs.put(columns.get(i), gens.get(i).next(r2));
- return vs;
- }).uniqueBestEffort().ofSize(unique).next(rs);
+
gens.add(toGen(getTypeSupport(columns.get(i).type()).valueGen));
+ uniquePartitions = Gens.lists(r2 -> {
+ LinkedHashMap<Symbol, Object> vs = new
LinkedHashMap<>();
+ for (int i = 0; i < columns.size(); i++)
+ vs.put(columns.get(i), gens.get(i).next(r2));
+ return vs;
+ }).uniqueBestEffort().ofSize(unique).next(rs);
+ }
+
mutationGenBuilder.withPartitions(Generators.fromGen(Gens.mixedDistribution(uniquePartitions).next(rs)));
}
- ASTGenerators.MutationGenBuilder mutationGenBuilder = new
ASTGenerators.MutationGenBuilder(metadata)
-
.withoutTransaction()
- .withoutTtl()
-
.withoutTimestamp()
-
.withPartitions(Generators.fromGen(Gens.mixedDistribution(uniquePartitions).next(rs)))
-
.withColumnExpressions(e ->
e.withOperators(Generators.fromGen(BOOLEAN_DISTRIBUTION.next(rs))))
-
.withIgnoreIssues(IGNORED_ISSUES);
if (IGNORED_ISSUES.contains(KnownIssue.SAI_EMPTY_TYPE))
{
model.factory.regularAndStaticColumns.stream()
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
b/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
index 19f95f4e5a..756095673e 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
@@ -19,9 +19,7 @@
package org.apache.cassandra.distributed.test.cql3;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
@@ -33,23 +31,15 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
import org.slf4j.Logger;
import accord.utils.Gen;
import accord.utils.Gens;
import accord.utils.Property;
import accord.utils.RandomSource;
-import com.datastax.driver.core.ColumnDefinitions;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
-import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.SocketOptions;
-import com.datastax.driver.core.exceptions.ReadFailureException;
-import com.datastax.driver.core.exceptions.WriteFailureException;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.KnownIssue;
@@ -63,6 +53,7 @@ import org.apache.cassandra.cql3.ast.Select;
import org.apache.cassandra.cql3.ast.StandardVisitors;
import org.apache.cassandra.cql3.ast.Statement;
import org.apache.cassandra.cql3.ast.TableReference;
+import org.apache.cassandra.cql3.ast.Txn;
import org.apache.cassandra.cql3.ast.Value;
import org.apache.cassandra.cql3.ast.Visitor;
import org.apache.cassandra.cql3.ast.Visitor.CompositeVisitor;
@@ -81,7 +72,7 @@ import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.test.JavaDriverUtils;
import org.apache.cassandra.distributed.test.TestBaseImpl;
-import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.distributed.util.DriverUtils;
import org.apache.cassandra.harry.model.ASTSingleTableModel;
import org.apache.cassandra.harry.util.StringUtils;
import org.apache.cassandra.repair.RepairGenerators;
@@ -97,7 +88,6 @@ import org.quicktheories.generators.SourceDSL;
import static accord.utils.Property.ignoreCommand;
import static accord.utils.Property.multistep;
-import static org.apache.cassandra.distributed.test.JavaDriverUtils.toDriverCL;
import static
org.apache.cassandra.utils.AbstractTypeGenerators.overridePrimitiveTypeSupport;
import static
org.apache.cassandra.utils.AbstractTypeGenerators.stringComparator;
@@ -612,6 +602,29 @@ public class StatefulASTBase extends TestBaseImpl
});
}
+ protected <S extends BaseState> Property.Command<S, Void, ?>
command(RandomSource rs, Txn txn)
+ {
+ return command(rs, txn, null);
+ }
+
+ protected <S extends BaseState> Property.Command<S, Void, ?>
command(RandomSource rs, Txn txn, @Nullable String annotate)
+ {
+ var inst = selectInstance(rs);
+ String postfix = "on " + inst;
+ if (model.isConditional(txn))
+ postfix += ", would apply " + model.shouldApply(txn);
+ if (annotate == null) annotate = postfix;
+ else annotate += ", " + postfix;
+
+ return new Property.SimpleCommand<>(humanReadable(txn, annotate),
s -> {
+ boolean hasMutation = txn.ifBlock.isPresent() ||
!txn.mutations.isEmpty();
+ ConsistencyLevel cl = hasMutation ? s.mutationCl() :
s.selectCl();
+ s.model.updateAndValidate(s.executeQuery(inst,
Integer.MAX_VALUE, cl, txn), txn);
+ if (hasMutation)
+ s.mutation();
+ });
+ }
+
protected IInvokableInstance selectInstance(RandomSource rs)
{
return cluster.get(rs.nextInt(0, cluster.size()) + 1);
@@ -686,82 +699,7 @@ public class StatefulASTBase extends TestBaseImpl
instance.executeInternal(stmt.toCQL(), (Object[])
stmt.bindsEncoded());
return new ByteBuffer[0][];
}
- else
- {
- SimpleStatement ss = new SimpleStatement(stmt.toCQL(),
(Object[]) stmt.bindsEncoded());
- if (fetchSize != Integer.MAX_VALUE)
- ss.setFetchSize(fetchSize);
- if (stmt.kind() == Statement.Kind.MUTATION)
- {
- switch (cl)
- {
- case SERIAL:
- ss.setSerialConsistencyLevel(toDriverCL(cl));
-
ss.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.QUORUM);
- break;
- case LOCAL_SERIAL:
- ss.setSerialConsistencyLevel(toDriverCL(cl));
-
ss.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM);
- break;
- default:
- ss.setConsistencyLevel(toDriverCL(cl));
- }
- }
- else
- {
- ss.setConsistencyLevel(toDriverCL(cl));
- }
-
- InetSocketAddress broadcastAddress =
instance.config().broadcastAddress();
- var host = client.getMetadata().getAllHosts().stream()
- .filter(h ->
h.getBroadcastSocketAddress().getAddress().equals(broadcastAddress.getAddress()))
- .filter(h ->
h.getBroadcastSocketAddress().getPort() == broadcastAddress.getPort())
- .findAny()
- .get();
- ss.setHost(host);
- ResultSet result;
- try
- {
- result = session.execute(ss);
- }
- catch (ReadFailureException t)
- {
- throw new AssertionError("failed from=" +
Maps.transformValues(t.getFailuresMap(), BaseState::safeErrorCode), t);
- }
- catch (WriteFailureException t)
- {
- throw new AssertionError("failed from=" +
Maps.transformValues(t.getFailuresMap(), BaseState::safeErrorCode), t);
- }
- return getRowsAsByteBuffer(result);
- }
- }
-
- private static String safeErrorCode(Integer code)
- {
- try
- {
- return RequestFailureReason.fromCode(code).name();
- }
- catch (IllegalArgumentException e)
- {
- return "Unexpected code " + code + ": " + e.getMessage();
- }
- }
-
- @VisibleForTesting
- static ByteBuffer[][] getRowsAsByteBuffer(ResultSet result)
- {
- ColumnDefinitions columns = result.getColumnDefinitions();
- List<ByteBuffer[]> ret = new ArrayList<>();
- for (Row rowVal : result)
- {
- ByteBuffer[] row = new ByteBuffer[columns.size()];
- for (int i = 0; i < columns.size(); i++)
- row[i] = rowVal.getBytesUnsafe(i);
- ret.add(row);
- }
- ByteBuffer[][] a = new ByteBuffer[ret.size()][];
- return ret.toArray(a);
+ return DriverUtils.executeQuery(session, instance, fetchSize, cl,
stmt);
}
protected String humanReadable(Statement stmt, @Nullable String
annotate)
diff --git
a/test/distributed/org/apache/cassandra/distributed/util/DriverUtils.java
b/test/distributed/org/apache/cassandra/distributed/util/DriverUtils.java
new file mode 100644
index 0000000000..46ed21ee9a
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/util/DriverUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.util;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.Maps;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.exceptions.ReadFailureException;
+import com.datastax.driver.core.exceptions.WriteFailureException;
+import org.apache.cassandra.cql3.ast.Statement;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+
+import static org.apache.cassandra.distributed.test.JavaDriverUtils.toDriverCL;
+
+public class DriverUtils
+{
+ public static ByteBuffer[][] getRowsAsByteBuffer(ResultSet result)
+ {
+ ColumnDefinitions columns = result.getColumnDefinitions();
+ List<ByteBuffer[]> ret = new ArrayList<>();
+ for (Row rowVal : result)
+ {
+ ByteBuffer[] row = new ByteBuffer[columns.size()];
+ for (int i = 0; i < columns.size(); i++)
+ row[i] = rowVal.getBytesUnsafe(i);
+ ret.add(row);
+ }
+ ByteBuffer[][] a = new ByteBuffer[ret.size()][];
+ return ret.toArray(a);
+ }
+
+ public static ByteBuffer[][] executeQuery(Session session,
+ IInstance instance,
+ int fetchSize,
+ ConsistencyLevel cl,
+ Statement stmt)
+ {
+ SimpleStatement ss = new SimpleStatement(stmt.toCQL(), (Object[])
stmt.bindsEncoded());
+ if (fetchSize != Integer.MAX_VALUE)
+ ss.setFetchSize(fetchSize);
+ if (stmt.kind() == Statement.Kind.MUTATION)
+ {
+ switch (cl)
+ {
+ case SERIAL:
+ ss.setSerialConsistencyLevel(toDriverCL(cl));
+
ss.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.QUORUM);
+ break;
+ case LOCAL_SERIAL:
+ ss.setSerialConsistencyLevel(toDriverCL(cl));
+
ss.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM);
+ break;
+ default:
+ ss.setConsistencyLevel(toDriverCL(cl));
+ }
+ }
+ else
+ {
+ ss.setConsistencyLevel(toDriverCL(cl));
+ }
+
+ var host = getHost(session, instance);
+ ss.setHost(host);
+ ResultSet result;
+ try
+ {
+ result = session.execute(ss);
+ }
+ catch (ReadFailureException t)
+ {
+ throw new AssertionError("failed from=" +
Maps.transformValues(t.getFailuresMap(), DriverUtils::safeErrorCode), t);
+ }
+ catch (WriteFailureException t)
+ {
+ throw new AssertionError("failed from=" +
Maps.transformValues(t.getFailuresMap(), DriverUtils::safeErrorCode), t);
+ }
+ return getRowsAsByteBuffer(result);
+ }
+
+ private static Host getHost(Session session, IInstance instance)
+ {
+ InetSocketAddress broadcastAddress =
instance.config().broadcastAddress();
+ return session.getCluster().getMetadata().getAllHosts().stream()
+ .filter(h ->
h.getBroadcastSocketAddress().getAddress().equals(broadcastAddress.getAddress()))
+ .filter(h -> h.getBroadcastSocketAddress().getPort() ==
broadcastAddress.getPort())
+ .findAny()
+ .get();
+ }
+
+ private static String safeErrorCode(Integer code)
+ {
+ try
+ {
+ return RequestFailureReason.fromCode(code).name();
+ }
+ catch (IllegalArgumentException e)
+ {
+ return "Unexpected code " + code + ": " + e.getMessage();
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
b/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
index 8895a7271b..d75307514d 100644
---
a/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
+++
b/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
@@ -105,7 +105,7 @@ public class AccordTopologyMixupTest extends
TopologyMixupTestBase<AccordTopolog
overridePrimitiveTypeSupport(BytesType.instance,
AbstractTypeGenerators.TypeSupport.of(BytesType.instance, Generators.bytes(1,
10), FastByteOperations::compareUnsigned));
}
- private static final List<TransactionalMode> TRANSACTIONAL_MODES =
Stream.of(TransactionalMode.values()).filter(t ->
t.accordIsEnabled).collect(Collectors.toList());
+ private static final List<TransactionalMode> TRANSACTIONAL_MODES =
Stream.of(TransactionalMode.supported()).filter(t ->
t.accordIsEnabled).collect(Collectors.toList());
@Override
protected Gen<State<Spec>> stateGen()
diff --git
a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
index c914350328..37f4585332 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModel.java
@@ -596,7 +596,7 @@ public class ASTSingleTableModel
}
// table has clustering but non are in the write, so only
pk/static can be updated
if (!factory.clusteringColumns.isEmpty() && remaining.isEmpty())
- return;
+ continue;
BytesPartitionState finalPartition = partition;
for (Clustering<ByteBuffer> cd : clustering(remaining))
{
@@ -621,7 +621,7 @@ public class ASTSingleTableModel
for (Clustering<ByteBuffer> pd : pks)
{
BytesPartitionState partition =
partitions.get(factory.createRef(pd));
- if (partition == null) return; // can't delete a partition that
doesn't exist...
+ if (partition == null) continue; // can't delete a partition that
doesn't exist...
DeleteKind kind = DeleteKind.PARTITION;
if (!delete.columns.isEmpty())
diff --git
a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModelTest.java
b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModelTest.java
index 3bdf742ce8..fbd9e07832 100644
---
a/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModelTest.java
+++
b/test/harry/main/org/apache/cassandra/harry/model/ASTSingleTableModelTest.java
@@ -78,6 +78,36 @@ public class ASTSingleTableModelTest
public static final SetType<Integer> SET_INT =
SetType.getInstance(Int32Type.instance, true);
public static final MapType<Integer, Integer> MAP_INT =
MapType.getInstance(Int32Type.instance, Int32Type.instance, true);
+ @Test
+ public void multiplePartitionUpdate()
+ {
+ TableMetadata metadata = new Builder()
+ .pk(1)
+ .ck(1)
+ .statics(1)
+ .regular(1)
+ .build();
+ ASTSingleTableModel model = new ASTSingleTableModel(metadata);
+ /*
+ UPDATE ks1.tbl USING TIMESTAMP 44
+ SET s0=[{00000000-0000-4100-b000-000000000000: -1,
00000000-0000-4900-9500-000000000000: -128,
00000000-0000-4b00-8700-000000000000: 115},
{00000000-0000-4200-ab00-000000000000: -115,
00000000-0000-4200-b000-000000000000: -3, 00000000-0000-4600-b400-000000000000:
66}]
+ WHERE pk0 IN (70, 47, -35) -- on node1
+ */
+
+ model.update(Mutation.update(metadata)
+ .timestamp(44)
+ .set("s", 42)
+ .in("pk", Int32Type.instance, 70, 47, -35)
+ .build());
+ ByteBuffer s = value(42);
+ ByteBuffer[][] expected = rows(
+ row(value(47), null, s, null),
+ row(value(-35), null, s, null),
+ row(value(70), null, s, null)
+ );
+ model.validate(expected, Select.builder(metadata).build());
+ }
+
@Test
public void singlePartition()
{
@@ -863,6 +893,11 @@ public class ASTSingleTableModelTest
return tables;
}
+ private static ByteBuffer value(int num)
+ {
+ return Int32Type.instance.decompose(num);
+ }
+
private static class ModelModel
{
private final ASTSingleTableModel model;
diff --git a/test/unit/org/apache/cassandra/utils/ASTGenerators.java
b/test/unit/org/apache/cassandra/utils/ASTGenerators.java
index 860995b28f..9cd39abba7 100644
--- a/test/unit/org/apache/cassandra/utils/ASTGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/ASTGenerators.java
@@ -824,7 +824,10 @@ public class ASTGenerators
if (deleteKind == DeleteKind.Row &&
clusteringColumns.isEmpty())
deleteKind = DeleteKind.Partition;
- values(rnd, columnExpressions, builder,
partitionColumns, partitionValueGen);
+ if (allowUpdateMultiplePartitionKeys)
+ where(rnd, columnExpressions, builder,
partitionColumns, partitionValueGen);
+ else
+ values(rnd, columnExpressions, builder,
partitionColumns, partitionValueGen);
switch (deleteKind)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]