This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 3880901d16 Accord Deps tests have incorrect range semantics
3880901d16 is described below
commit 3880901d165f022592afbc99ed918ace6fe8cdbe
Author: David Capwell <[email protected]>
AuthorDate: Thu Oct 24 21:49:44 2024 -0700
Accord Deps tests have incorrect range semantics
patch by David Capwell; reviewed by Ariel Weisberg for CASSANDRA-20029
---
.../service/accord/AccordCommandStore.java | 8 ++
.../accord/SimulatedAccordCommandStore.java | 11 ++-
.../SimulatedAccordCommandStoreTestBase.java | 95 +++++++++++++++++++++-
.../service/accord/SimulatedDepsTest.java | 16 ++--
.../accord/SimulatedMultiKeyAndRangeTest.java | 32 +-------
.../SimulatedRandomKeysWithRangeConflictTest.java | 26 ++----
6 files changed, 128 insertions(+), 60 deletions(-)
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 4a6865fa56..8773dcd294 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -102,6 +102,7 @@ public class AccordCommandStore extends CommandStore
public final String loggingId;
private final IJournal journal;
+
private final CommandStoreExecutor executor;
private final AccordStateCache.Instance<TxnId, Command, AccordSafeCommand>
commandCache;
private final AccordStateCache.Instance<RoutingKey, TimestampsForKey,
AccordSafeTimestampsForKey> timestampsForKeyCache;
@@ -295,6 +296,13 @@ public class AccordCommandStore extends CommandStore
return commandsForKeyCache;
}
+ @VisibleForTesting
+ @Override
+ protected void unsafeSetRangesForEpoch(CommandStores.RangesForEpoch
newRangesForEpoch)
+ {
+ super.unsafeSetRangesForEpoch(newRangesForEpoch);
+ }
+
@Nullable
@VisibleForTesting
public Runnable appendToKeyspace(Command after)
diff --git
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
index 7da1a51e13..f96c3b438e 100644
---
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
+++
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
@@ -109,8 +109,8 @@ public class SimulatedAccordCommandStore implements
AutoCloseable
globalExecutor = new SimulatedExecutorFactory(rs.fork(),
fromQT(Generators.TIMESTAMP_GEN.map(java.sql.Timestamp::getTime)).mapToLong(TimeUnit.MILLISECONDS::toNanos).next(rs),
failures::add);
this.unorderedScheduled = globalExecutor.scheduled("ignored");
ExecutorFactory.Global.unsafeSet(globalExecutor);
- Stage.READ.unsafeSetExecutor(unorderedScheduled);
- Stage.MUTATION.unsafeSetExecutor(unorderedScheduled);
+ for (Stage stage : Arrays.asList(Stage.READ, Stage.MUTATION,
Stage.ACCORD_RANGE_LOADER))
+ stage.unsafeSetExecutor(unorderedScheduled);
for (Stage stage : Arrays.asList(Stage.MISC, Stage.ACCORD_MIGRATION,
Stage.READ, Stage.MUTATION))
stage.unsafeSetExecutor(globalExecutor.configureSequential("ignore").build());
@@ -216,11 +216,16 @@ public class SimulatedAccordCommandStore implements
AutoCloseable
this.topology =
AccordTopology.createAccordTopology(ClusterMetadata.current());
this.topologies = new
Topologies.Single(SizeOfIntersectionSorter.SUPPLIER, topology);
var rangesForEpoch = new
CommandStores.RangesForEpoch(topology.epoch(), topology.ranges(), store);
+ store.unsafeSetRangesForEpoch(rangesForEpoch);
updateHolder.add(topology.epoch(), rangesForEpoch, topology.ranges());
updateHolder.updateGlobal(topology.ranges());
shouldEvict = boolSource(rs.fork());
- shouldFlush = boolSource(rs.fork());
+ {
+ // tests used to take 1m but after many changes in accord they now
take many minutes and its due to flush... so lower the frequency of flushing
+ var fork = rs.fork();
+ shouldFlush = () -> fork.decide(.01);
+ }
shouldCompact = boolSource(rs.fork());
}
diff --git
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
index 74ee74a4d1..05f4dffe94 100644
---
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
+++
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
@@ -20,8 +20,10 @@ package org.apache.cassandra.service.accord;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -44,7 +46,9 @@ import accord.primitives.LatestDeps;
import accord.primitives.Range;
import accord.primitives.Ranges;
import accord.primitives.Routable;
+import accord.primitives.Routables;
import accord.primitives.RoutingKeys;
+import accord.primitives.Seekables;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.primitives.Unseekables;
@@ -67,6 +71,8 @@ import
org.apache.cassandra.service.accord.api.AccordRoutingKey;
import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.RTree;
+import org.apache.cassandra.utils.RangeTree;
import org.assertj.core.api.Assertions;
import static org.apache.cassandra.schema.SchemaConstants.ACCORD_KEYSPACE_NAME;
@@ -168,6 +174,17 @@ public abstract class SimulatedAccordCommandStoreTestBase
extends CQLTester
return kc;
}
+ protected static void assertDepsMessage(SimulatedAccordCommandStore
instance,
+ DepsMessage messageType,
+ Txn txn, FullRoute<?> route,
+ DepsModel model) throws
ExecutionException, InterruptedException
+ {
+ TxnId id = assertDepsMessage(instance, messageType, txn, route,
+ model.keyConflicts(txn.keys()),
+ model.rangeConflicts(txn.keys()));
+ model.register(id, txn);
+ }
+
protected static TxnId assertDepsMessage(SimulatedAccordCommandStore
instance,
DepsMessage messageType,
Txn txn, FullRoute<?> route,
@@ -294,7 +311,6 @@ public abstract class SimulatedAccordCommandStoreTestBase
extends CQLTester
else
{
List<Range> actualRanges = IntStream.range(0,
deps.rangeDeps.rangeCount()).mapToObj(deps.rangeDeps::range).collect(Collectors.toList());
-//
Assertions.assertThat(deps.rangeDeps.rangeCount()).describedAs("Txn %s Expected
ranges size; %s", txnId, deps.rangeDeps).isEqualTo(rangeConflicts.size());
Assertions.assertThat(Ranges.of(actualRanges.toArray(Range[]::new)))
.describedAs("Txn %s had different ranges than
expected", txnId)
.isEqualTo(Ranges.of(rangeConflicts.keySet().toArray(Range[]::new)));
@@ -380,4 +396,81 @@ public abstract class SimulatedAccordCommandStoreTestBase
extends CQLTester
}
};
}
+
+ public static class DepsModel
+ {
+ private final Map<RoutingKey, List<TxnId>> keyConflicts = new
HashMap<>();
+ private final RangeTree<RoutingKey, Range, TxnId> rangeConflicts =
RTree.create(RangeTreeRangeAccessor.instance);
+ private final Ranges storeRanges;
+
+ public DepsModel(Ranges storeRanges)
+ {
+ this.storeRanges = storeRanges;
+ }
+
+ public Map<RoutingKey, List<TxnId>> keyConflicts(Seekables<?, ?>
keysOrRanges)
+ {
+ keysOrRanges = keysOrRanges.slice(storeRanges,
Routables.Slice.Minimal);
+ switch (keysOrRanges.domain())
+ {
+ case Key:
+ {
+ Keys keys = (Keys) keysOrRanges;
+ Map<RoutingKey, List<TxnId>> expectedConflicts = new
HashMap<>();
+ keys.forEach(k -> expectedConflicts.put(k.toUnseekable(),
keyConflicts.getOrDefault(k.toUnseekable(), Collections.emptyList())));
+ return expectedConflicts;
+ }
+ case Range:
+ {
+ Ranges ranges = (Ranges) keysOrRanges;
+ return keyConflicts.entrySet().stream()
+ .filter(e ->
ranges.contains(e.getKey()))
+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public Map<Range, List<TxnId>> rangeConflicts(Seekables<?, ?>
keysOrRanges)
+ {
+ // there is a patch pending to add range support for keys... that
isn't here yet so not handled
+ if (keysOrRanges.domain() != Routable.Domain.Range)
+ return Collections.emptyMap();
+ keysOrRanges = keysOrRanges.slice(storeRanges,
Routables.Slice.Minimal);
+
+ Ranges ranges = (Ranges) keysOrRanges;
+ Map<Range, List<TxnId>> conflicts = new HashMap<>();
+ ranges.forEach(r -> rangeConflicts.search(r, e -> {
+ for (Range range : Ranges.single(e.getKey()).slice(ranges,
Routables.Slice.Minimal))
+ conflicts.computeIfAbsent(range, ignore -> new
ArrayList<>()).add(e.getValue());
+ }));
+ // need to dedup/sort txns
+ conflicts.values().forEach(l -> {
+ var sortedDedup = new ArrayList<>(new TreeSet<>(l));
+ l.clear();
+ l.addAll(sortedDedup);
+ });
+ return conflicts;
+ }
+
+ public void register(TxnId txnId, Txn txn)
+ {
+ for (var s : txn.keys())
+ {
+ switch (s.domain())
+ {
+ case Key:
+ keyConflicts.computeIfAbsent(s.asKey().toUnseekable(),
i -> new ArrayList<>()).add(txnId);
+ break;
+ case Range:
+ rangeConflicts.add(s.asRange(), txnId);
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+ }
+ }
}
diff --git
a/test/unit/org/apache/cassandra/service/accord/SimulatedDepsTest.java
b/test/unit/org/apache/cassandra/service/accord/SimulatedDepsTest.java
index 961a1dfdb3..1d9935084c 100644
--- a/test/unit/org/apache/cassandra/service/accord/SimulatedDepsTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/SimulatedDepsTest.java
@@ -186,13 +186,12 @@ public class SimulatedDepsTest extends
SimulatedAccordCommandStoreTestBase
FullRangeRoute rangeRoute = ranges.toRoute(pk.toUnseekable());
Txn rangeTxn = createTxn(Txn.Kind.ExclusiveSyncPoint, ranges);
- List<TxnId> keyConflicts = new ArrayList<>(numSamples);
- List<TxnId> rangeConflicts = new ArrayList<>(numSamples);
+ DepsModel model = new
DepsModel(instance.store.unsafeRangesForEpoch().currentRanges());
for (int i = 0; i < numSamples; i++)
{
instance.maybeCacheEvict(keyRoute, ranges);
- keyConflicts.add(assertDepsMessage(instance,
rs.pick(DepsMessage.values()), keyTxn, keyRoute, keyConflicts(keyConflicts,
keyRoute)));
- rangeConflicts.add(assertDepsMessage(instance,
rs.pick(DepsMessage.values()), rangeTxn, rangeRoute, keyConflicts(keyConflicts,
keyRoute), rangeConflicts(rangeConflicts, ranges)));
+ assertDepsMessage(instance, rs.pick(DepsMessage.values()),
keyTxn, keyRoute, model);
+ assertDepsMessage(instance, rs.pick(DepsMessage.values()),
rangeTxn, rangeRoute, model);
}
}
});
@@ -259,21 +258,18 @@ public class SimulatedDepsTest extends
SimulatedAccordCommandStoreTestBase
Range left = tokenRange(tbl.id, token - 10, token + 5);
Range right = tokenRange(tbl.id, token - 5, token + 10);
- List<TxnId> keyConflicts = new ArrayList<>(numSamples);
- Map<Range, List<TxnId>> rangeConflicts = new HashMap<>();
- rangeConflicts.put(left, new ArrayList<>());
- rangeConflicts.put(right, new ArrayList<>());
+ DepsModel model = new
DepsModel(instance.store.unsafeRangesForEpoch().currentRanges());
for (int i = 0; i < numSamples; i++)
{
Ranges partialRange = Ranges.of(rs.nextBoolean() ? left :
right);
try
{
instance.maybeCacheEvict(keyRoute, partialRange);
- keyConflicts.add(assertDepsMessage(instance,
rs.pick(DepsMessage.values()), keyTxn, keyRoute, keyConflicts(keyConflicts,
keyRoute)));
+ assertDepsMessage(instance,
rs.pick(DepsMessage.values()), keyTxn, keyRoute, model);
FullRangeRoute rangeRoute =
partialRange.toRoute(pk.toUnseekable());
Txn rangeTxn = createTxn(Txn.Kind.ExclusiveSyncPoint,
partialRange);
-
rangeConflicts.get(partialRange.get(0)).add(assertDepsMessage(instance,
rs.pick(DepsMessage.values()), rangeTxn, rangeRoute, keyConflicts(keyConflicts,
keyRoute), rangeConflicts));
+ assertDepsMessage(instance,
rs.pick(DepsMessage.values()), rangeTxn, rangeRoute, model);
}
catch (Throwable t)
{
diff --git
a/test/unit/org/apache/cassandra/service/accord/SimulatedMultiKeyAndRangeTest.java
b/test/unit/org/apache/cassandra/service/accord/SimulatedMultiKeyAndRangeTest.java
index feaddeff8c..16a4d83065 100644
---
a/test/unit/org/apache/cassandra/service/accord/SimulatedMultiKeyAndRangeTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/SimulatedMultiKeyAndRangeTest.java
@@ -19,11 +19,8 @@
package org.apache.cassandra.service.accord;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
@@ -40,12 +37,9 @@ import accord.primitives.Range;
import accord.primitives.Ranges;
import accord.primitives.Routable.Domain;
import accord.primitives.Txn;
-import accord.primitives.TxnId;
import accord.utils.Gen;
import accord.utils.Gens;
import org.apache.cassandra.service.accord.api.PartitionKey;
-import org.apache.cassandra.utils.RTree;
-import org.apache.cassandra.utils.RangeTree;
import static accord.utils.Property.qt;
import static
org.apache.cassandra.dht.Murmur3Partitioner.LongToken.keyForToken;
@@ -73,12 +67,12 @@ public class SimulatedMultiKeyAndRangeTest extends
SimulatedAccordCommandStoreTe
Gen.LongGen tokenGen = tokenDistribution.next(rs);
Gen<Domain> domainGen = domainDistribution.next(rs);
Gen<DepsMessage> msgGen = msgDistribution.next(rs);
- Map<RoutingKey, List<TxnId>> keyConflicts = new HashMap<>();
- RangeTree<RoutingKey, Range, TxnId> rangeConflicts =
RTree.create(RangeTreeRangeAccessor.instance);
Gen.IntGen keyCountGen = keyDistribution.next(rs);
Gen.IntGen rangeCountGen = rangeDistribution.next(rs);
+ DepsModel model = new
DepsModel(instance.store.unsafeRangesForEpoch().currentRanges());
+
for (int i = 0; i < numSamples; i++)
{
switch (domainGen.next(rs))
@@ -99,11 +93,7 @@ public class SimulatedMultiKeyAndRangeTest extends
SimulatedAccordCommandStoreTe
Txn txn = createTxn(wrapInTxn(inserts), binds);
FullRoute<RoutingKey> route =
keys.toRoute(keys.get(0).toUnseekable());
- Map<RoutingKey, List<TxnId>> expectedConflicts =
new HashMap<>();
- route.forEach(k -> expectedConflicts.put(k,
keyConflicts.computeIfAbsent(k, ignore -> new ArrayList<>())));
-
- TxnId id = assertDepsMessage(instance,
msgGen.next(rs), txn, route, expectedConflicts, Collections.emptyMap());
- route.forEach(k -> keyConflicts.get(k).add(id));
+ assertDepsMessage(instance, msgGen.next(rs), txn,
route, model);
}
break;
case Range:
@@ -133,21 +123,7 @@ public class SimulatedMultiKeyAndRangeTest extends
SimulatedAccordCommandStoreTe
FullRangeRoute route =
ranges.toRoute(ranges.get(0).end());
Txn txn = createTxn(Txn.Kind.ExclusiveSyncPoint,
ranges);
- Map<RoutingKey, List<TxnId>> expectedKeyConflicts
= keyConflicts.entrySet().stream()
-
.filter(e -> ranges.contains(e.getKey()))
-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- Map<Range, List<TxnId>> expectedRangeConflicts =
new HashMap<>();
- ranges.forEach(r ->
- rangeConflicts.search(r, e ->
-
expectedRangeConflicts.computeIfAbsent(e.getKey(), ignore -> new
ArrayList<>()).add(e.getValue())));
- // need to dedup/sort txns
- expectedRangeConflicts.values().forEach(l -> {
- var sortedDedup = new ArrayList<>(new
TreeSet<>(l));
- l.clear();
- l.addAll(sortedDedup);
- });
- TxnId id = assertDepsMessage(instance,
msgGen.next(rs), txn, route, expectedKeyConflicts, expectedRangeConflicts);
- ranges.forEach(r -> rangeConflicts.add(r, id));
+ assertDepsMessage(instance, msgGen.next(rs), txn,
route, model);
}
break;
default:
diff --git
a/test/unit/org/apache/cassandra/service/accord/SimulatedRandomKeysWithRangeConflictTest.java
b/test/unit/org/apache/cassandra/service/accord/SimulatedRandomKeysWithRangeConflictTest.java
index 30f0f0aa7f..d1edbb4455 100644
---
a/test/unit/org/apache/cassandra/service/accord/SimulatedRandomKeysWithRangeConflictTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/SimulatedRandomKeysWithRangeConflictTest.java
@@ -25,21 +25,13 @@ import accord.primitives.Keys;
import accord.primitives.Ranges;
import accord.primitives.RoutingKeys;
import accord.primitives.Txn;
-import accord.primitives.TxnId;
import accord.utils.Property;
import accord.utils.RandomSource;
-import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
import org.apache.cassandra.utils.FailingConsumer;
import org.junit.Test;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import static accord.utils.Property.commands;
import static accord.utils.Property.stateful;
@@ -51,16 +43,14 @@ public class SimulatedRandomKeysWithRangeConflictTest
extends SimulatedAccordCom
private static Property.SimpleCommand<State> insertKey(RandomSource rs,
State state)
{
long token = rs.nextLong(Long.MIN_VALUE + 1, Long.MAX_VALUE);
- RoutingKey key = new TokenKey(state.tbl.id, new LongToken(token));
Txn keyTxn = createTxn(wrapInTxn("INSERT INTO " + state.tbl + "(pk,
value) VALUES (?, ?)"),
- Arrays.asList(keyForToken(token), 42));
+ Arrays.asList(keyForToken(token), 42));
Keys keys = (Keys) keyTxn.keys();
FullRoute<RoutingKey> keyRoute =
keys.toRoute(keys.get(0).toUnseekable());
return new Property.SimpleCommand<>("Write Txn: " + keys,
FailingConsumer.orFail(s -> {
s.instance.maybeCacheEvict(keyRoute, s.wholeRange);
- var k = assertDepsMessage(s.instance,
rs.pick(DepsMessage.values()), keyTxn, keyRoute, Map.of(key,
s.keyConflicts.computeIfAbsent(key, ignore -> new ArrayList<>())),
Collections.emptyMap());
- s.keyConflicts.get(key).add(k);
+ assertDepsMessage(s.instance, rs.pick(DepsMessage.values()),
keyTxn, keyRoute, s.model);
}));
}
@@ -68,7 +58,7 @@ public class SimulatedRandomKeysWithRangeConflictTest extends
SimulatedAccordCom
{
return new Property.SimpleCommand<>("Range Txn: " + state.wholeRange,
FailingConsumer.orFail(s -> {
s.instance.maybeCacheEvict(RoutingKeys.EMPTY, s.wholeRange);
- s.rangeConflicts.add(assertDepsMessage(s.instance,
rs.pick(DepsMessage.values()), s.rangeTxn, s.rangeRoute, s.keyConflicts,
rangeConflicts(s.rangeConflicts, s.wholeRange)));
+ assertDepsMessage(s.instance, rs.pick(DepsMessage.values()),
s.rangeTxn, s.rangeRoute, s.model);
}));
}
@@ -77,27 +67,27 @@ public class SimulatedRandomKeysWithRangeConflictTest
extends SimulatedAccordCom
public void keysAllOverConflictingWithRange()
{
stateful().withSteps(State.steps).check(commands(() -> State::new)
- .add(SimulatedRandomKeysWithRangeConflictTest::insertKey)
- .add(SimulatedRandomKeysWithRangeConflictTest::insertRange)
- .build());
+
.add(SimulatedRandomKeysWithRangeConflictTest::insertKey)
+
.add(SimulatedRandomKeysWithRangeConflictTest::insertRange)
+ .build());
}
public static class State
{
static final int steps = 300;
final SimulatedAccordCommandStore instance;
- final Map<RoutingKey, List<TxnId>> keyConflicts = new HashMap<>();
- final List<TxnId> rangeConflicts = new ArrayList<>(steps);
final TableMetadata tbl = reverseTokenTbl;
final Ranges wholeRange = Ranges.of(fullRange(tbl.id));
final FullRangeRoute rangeRoute =
wholeRange.toRoute(wholeRange.get(0).end());
final Txn rangeTxn = createTxn(Txn.Kind.ExclusiveSyncPoint,
wholeRange);
+ final DepsModel model;
public State(RandomSource rs)
{
AccordKeyspace.unsafeClear();
this.instance = new SimulatedAccordCommandStore(rs);
+ this.model = new
DepsModel(instance.store.unsafeRangesForEpoch().currentRanges());
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]