This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new 09dd3d69af CEP-15: (Accord) When nodes are removed from a cluster, need to update topology tracking to avoid being blocked 09dd3d69af is described below commit 09dd3d69af6755d6f68b6e0d019238f793f3246a Author: David Capwell <dcapw...@apache.org> AuthorDate: Tue Jun 18 13:55:38 2024 -0700 CEP-15: (Accord) When nodes are removed from a cluster, need to update topology tracking to avoid being blocked patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-19719 --- build.xml | 3 +- modules/accord | 2 +- .../service/accord/AccordConfigurationService.java | 138 +++- .../cassandra/service/accord/AccordKeyspace.java | 3 +- .../service/accord/AccordSyncPropagator.java | 82 ++- .../concurrent/SimulatedExecutorFactory.java | 18 + .../index/accord/AccordIndexStressTest.java | 8 +- .../cassandra/index/accord/RouteIndexTest.java | 15 +- .../cassandra/net/SimulatedMessageDelivery.java | 333 +++++++++ .../org/apache/cassandra/repair/FuzzTestBase.java | 272 +------- .../accord/AccordConfigurationServiceTest.java | 11 +- .../service/accord/AccordKeyspaceTest.java | 4 +- .../service/accord/AccordSyncPropagatorTest.java | 8 + .../cassandra/service/accord/EpochSyncTest.java | 753 +++++++++++++++++++++ .../service/accord/LoggingDiskStateManager.java | 93 +++ .../service/accord/MockDiskStateManager.java | 79 +++ .../cassandra/utils/StatefulRangeTreeTest.java | 14 +- 17 files changed, 1548 insertions(+), 288 deletions(-) diff --git a/build.xml b/build.xml index d3943b1a35..4ed704de7e 100644 --- a/build.xml +++ b/build.xml @@ -1278,13 +1278,12 @@ <condition property="maxMemory" value="8G" else="4G"> <equals arg1="${test.classlistprefix}" arg2="distributed"/> </condition> - <testmacrohelper inputdir="${test.dir}/${test.classlistprefix}" filelist="@{test.file.list}" exclude="**/*.java" timeout="${test.timeout}"> + <testmacrohelper inputdir="${test.dir}/${test.classlistprefix}" filelist="@{test.file.list}" exclude="**/*.java" timeout="${test.timeout}" maxmemory="${maxMemory}"> <jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/> <jvmarg value="-Dinvalid-legacy-sstable-root=${test.data}/invalid-legacy-sstables"/> <jvmarg value="-Dcassandra.ring_delay_ms=1000"/> <jvmarg value="-Dcassandra.tolerate_sstable_size=true"/> <jvmarg value="-Dcassandra.skip_sync=true" /> - <jvmarg value="-Xmx${maxMemory}"/> </testmacrohelper> </sequential> </macrodef> diff --git a/modules/accord b/modules/accord index f1f5ea5ccb..694ae39e2e 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit f1f5ea5ccbd6e0a8abf579a4331fa84a1b3d9f95 +Subproject commit 694ae39e2e00075bdabd47632dced0db12a9981d diff --git a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java index d2a14a44db..1e6cb1d769 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java @@ -34,6 +34,8 @@ import accord.topology.Topology; import accord.utils.Invariants; import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; +import org.agrona.collections.LongArrayList; +import org.apache.cassandra.concurrent.ScheduledExecutorPlus; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Shutdownable; import org.apache.cassandra.concurrent.Stage; @@ -57,6 +59,7 @@ import static org.apache.cassandra.utils.Simulate.With.MONITORS; public class AccordConfigurationService extends AbstractConfigurationService<AccordConfigurationService.EpochState, AccordConfigurationService.EpochHistory> implements ChangeListener, AccordEndpointMapper, AccordSyncPropagator.Listener, Shutdownable { private final AccordSyncPropagator syncPropagator; + private final DiskStateManager diskStateManager; private EpochDiskState diskState = EpochDiskState.EMPTY; @@ -114,15 +117,88 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc } } - public AccordConfigurationService(Node.Id node, MessageDelivery messagingService, IFailureDetector failureDetector) + @VisibleForTesting + interface DiskStateManager + { + EpochDiskState loadTopologies(AccordKeyspace.TopologyLoadConsumer consumer); + EpochDiskState setNotifyingLocalSync(long epoch, Set<Node.Id> pending, EpochDiskState diskState); + + EpochDiskState setCompletedLocalSync(long epoch, EpochDiskState diskState); + + EpochDiskState markLocalSyncAck(Node.Id id, long epoch, EpochDiskState diskState); + + EpochDiskState saveTopology(Topology topology, EpochDiskState diskState); + + EpochDiskState markRemoteTopologySync(Node.Id node, long epoch, EpochDiskState diskState); + + EpochDiskState markClosed(Ranges ranges, long epoch, EpochDiskState diskState); + + EpochDiskState truncateTopologyUntil(long epoch, EpochDiskState diskState); + } + + enum SystemTableDiskStateManager implements DiskStateManager + { + instance; + + @Override + public EpochDiskState loadTopologies(AccordKeyspace.TopologyLoadConsumer consumer) + { + return AccordKeyspace.loadTopologies(consumer); + } + + @Override + public EpochDiskState setNotifyingLocalSync(long epoch, Set<Node.Id> notify, EpochDiskState diskState) + { + return AccordKeyspace.setNotifyingLocalSync(epoch, notify, diskState); + } + + @Override + public EpochDiskState setCompletedLocalSync(long epoch, EpochDiskState diskState) + { + return AccordKeyspace.setCompletedLocalSync(epoch, diskState); + } + + @Override + public EpochDiskState markLocalSyncAck(Node.Id id, long epoch, EpochDiskState diskState) + { + return AccordKeyspace.markLocalSyncAck(id, epoch, diskState); + } + + @Override + public EpochDiskState saveTopology(Topology topology, EpochDiskState diskState) + { + return AccordKeyspace.saveTopology(topology, diskState); + } + + @Override + public EpochDiskState markRemoteTopologySync(Node.Id node, long epoch, EpochDiskState diskState) + { + return AccordKeyspace.markRemoteTopologySync(node, epoch, diskState); + } + + @Override + public EpochDiskState markClosed(Ranges ranges, long epoch, EpochDiskState diskState) + { + return AccordKeyspace.markClosed(ranges, epoch, diskState); + } + + @Override + public EpochDiskState truncateTopologyUntil(long epoch, EpochDiskState diskState) + { + return AccordKeyspace.truncateTopologyUntil(epoch, diskState); + } + } + + public AccordConfigurationService(Node.Id node, MessageDelivery messagingService, IFailureDetector failureDetector, DiskStateManager diskStateManager, ScheduledExecutorPlus scheduledTasks) { super(node); - this.syncPropagator = new AccordSyncPropagator(localId, this, messagingService, failureDetector, ScheduledExecutors.scheduledTasks, this); + this.syncPropagator = new AccordSyncPropagator(localId, this, messagingService, failureDetector, scheduledTasks, this); + this.diskStateManager = diskStateManager; } public AccordConfigurationService(Node.Id node) { - this(node, MessagingService.instance(), FailureDetector.instance); + this(node, MessagingService.instance(), FailureDetector.instance, SystemTableDiskStateManager.instance, ScheduledExecutors.scheduledTasks); } @Override @@ -137,7 +213,7 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc state = State.LOADING; updateMapping(ClusterMetadata.current()); EndpointMapping snapshot = mapping; - diskState = AccordKeyspace.loadTopologies(((epoch, topology, syncStatus, pendingSyncNotify, remoteSyncComplete, closed, redundant) -> { + diskState = diskStateManager.loadTopologies(((epoch, topology, syncStatus, pendingSyncNotify, remoteSyncComplete, closed, redundant) -> { if (topology != null) reportTopology(topology, syncStatus == SyncStatus.NOT_STARTED); @@ -221,12 +297,41 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc synchronized (AccordConfigurationService.this) { updateMapping(metadata); - reportTopology(AccordTopology.createAccordTopology(metadata)); + Topology topology = AccordTopology.createAccordTopology(metadata); + Topology current = isEmpty() ? Topology.EMPTY : currentTopology(); + reportTopology(topology); + Sets.SetView<Node.Id> removedNodes = Sets.difference(current.nodes(), topology.nodes()); + if (!removedNodes.isEmpty()) + onNodesRemoved(topology.epoch(), removedNodes); } }); } - private void maybeReportMetadata(ClusterMetadata metadata) + private synchronized void onNodesRemoved(long epoch, Set<Node.Id> removed) + { + syncPropagator.onNodesRemoved(removed); + for (long oldEpoch : nonCompletedEpochsBefore(epoch)) + { + for (Node.Id node : removed) + receiveRemoteSyncComplete(node, oldEpoch); + } + listeners.forEach(l -> l.onRemoveNodes(epoch, removed)); + } + + private long[] nonCompletedEpochsBefore(long max) + { + LongArrayList notComplete = new LongArrayList(); + for (long epoch = epochs.minEpoch(); epoch <= max && epoch <= epochs.maxEpoch(); epoch++) + { + EpochSnapshot snapshot = getEpochSnapshot(epoch); + if (snapshot.syncStatus != SyncStatus.COMPLETED) + notComplete.add(epoch); + } + return notComplete.toLongArray(); + } + + @VisibleForTesting + void maybeReportMetadata(ClusterMetadata metadata) { // don't report metadata until the previous one has been acknowledged synchronized (this) @@ -265,7 +370,7 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc return; Set<Node.Id> notify = topology.nodes().stream().filter(i -> !localId.equals(i)).collect(Collectors.toSet()); - diskState = AccordKeyspace.setNotifyingLocalSync(epoch, notify, diskState); + diskState = diskStateManager.setNotifyingLocalSync(epoch, notify, diskState); epochState.setSyncStatus(SyncStatus.NOTIFYING); syncPropagator.reportSyncComplete(epoch, notify, localId); } @@ -276,7 +381,7 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc EpochState epochState = getOrCreateEpochState(epoch); if (epochState.syncStatus != SyncStatus.NOTIFYING) return; - diskState = AccordKeyspace.markLocalSyncAck(id, epoch, diskState); + diskState = diskStateManager.markLocalSyncAck(id, epoch, diskState); } @Override @@ -284,21 +389,21 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc { EpochState epochState = getOrCreateEpochState(epoch); epochState.setSyncStatus(SyncStatus.COMPLETED); - diskState = AccordKeyspace.setCompletedLocalSync(epoch, diskState); + diskState = diskStateManager.setCompletedLocalSync(epoch, diskState); } @Override protected synchronized void topologyUpdatePreListenerNotify(Topology topology) { if (state == State.STARTED) - diskState = AccordKeyspace.saveTopology(topology, diskState); + diskState = diskStateManager.saveTopology(topology, diskState); } @Override protected synchronized void receiveRemoteSyncCompletePreListenerNotify(Node.Id node, long epoch) { if (state == State.STARTED) - diskState = AccordKeyspace.markRemoteTopologySync(node, epoch, diskState); + diskState = diskStateManager.markRemoteTopologySync(node, epoch, diskState); } @Override @@ -309,6 +414,11 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc syncPropagator.reportClosed(epoch, topology.nodes(), ranges); } + public AccordSyncPropagator syncPropagator() + { + return syncPropagator; + } + @Override public synchronized void reportEpochRedundant(Ranges ranges, long epoch) { @@ -321,14 +431,14 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc @Override public synchronized void receiveClosed(Ranges ranges, long epoch) { - diskState = AccordKeyspace.markClosed(ranges, epoch, diskState); + diskState = diskStateManager.markClosed(ranges, epoch, diskState); super.receiveClosed(ranges, epoch); } @Override public synchronized void receiveRedundant(Ranges ranges, long epoch) { - diskState = AccordKeyspace.markClosed(ranges, epoch, diskState); + diskState = diskStateManager.markClosed(ranges, epoch, diskState); super.receiveRedundant(ranges, epoch); } @@ -342,7 +452,7 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc protected synchronized void truncateTopologiesPostListenerNotify(long epoch) { if (state == State.STARTED) - diskState = AccordKeyspace.truncateTopologyUntil(epoch, diskState); + diskState = diskStateManager.truncateTopologyUntil(epoch, diskState); } private void checkStarted() diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index b96ffa9bd0..a2eb1c23d5 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -1586,7 +1586,8 @@ public class AccordKeyspace return minEpoch == maxEpoch && maxEpoch == 0; } - private EpochDiskState withNewMaxEpoch(long epoch) + @VisibleForTesting + EpochDiskState withNewMaxEpoch(long epoch) { Invariants.checkArgument(epoch > maxEpoch, "Epoch %d <= %d (max)", epoch, maxEpoch); return EpochDiskState.create(Math.max(1, minEpoch), epoch); diff --git a/src/java/org/apache/cassandra/service/accord/AccordSyncPropagator.java b/src/java/org/apache/cassandra/service/accord/AccordSyncPropagator.java index e16facee4a..2c9626718d 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordSyncPropagator.java +++ b/src/java/org/apache/cassandra/service/accord/AccordSyncPropagator.java @@ -19,6 +19,7 @@ package org.apache.cassandra.service.accord; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -29,6 +30,9 @@ import java.util.concurrent.TimeUnit; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import accord.local.Node; import accord.messages.SimpleReply; import accord.primitives.Ranges; @@ -38,6 +42,7 @@ import org.agrona.collections.Long2ObjectHashMap; import org.apache.cassandra.concurrent.ScheduledExecutorPlus; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.exceptions.RequestFailure; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; @@ -50,6 +55,7 @@ import org.apache.cassandra.net.RequestCallback; import org.apache.cassandra.net.Verb; import org.apache.cassandra.service.accord.serializers.KeySerializers; import org.apache.cassandra.service.accord.serializers.TopologySerializers; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.utils.CollectionSerializers; import static org.apache.cassandra.utils.CollectionSerializers.newListSerializer; @@ -59,6 +65,8 @@ import static org.apache.cassandra.utils.CollectionSerializers.newListSerializer */ public class AccordSyncPropagator { + private static final Logger logger = LoggerFactory.getLogger(AccordSyncPropagator.class); + public static final IVerbHandler<List<Notification>> verbHandler = message -> { if (!AccordService.isSetup()) return; @@ -120,6 +128,11 @@ public class AccordSyncPropagator return new Notification(epoch, Collections.emptySet(), Ranges.EMPTY, addRedundant); } + boolean isEmpty() + { + return syncComplete.isEmpty() && closed.isEmpty() && redundant.isEmpty(); + } + boolean ack(Notification notification) { if (!notification.syncComplete.isEmpty()) @@ -201,6 +214,15 @@ public class AccordSyncPropagator return !pending.isEmpty(); } + synchronized boolean hasPending(long epoch) + { + if (pending.isEmpty()) return false; + return pending.values().stream().allMatch(n -> { + PendingEpoch p = n.get(epoch); + return p != null && !p.isEmpty(); + }); + } + @Override public String toString() { @@ -210,6 +232,28 @@ public class AccordSyncPropagator '}'; } + public synchronized void onNodesRemoved(Set<Node.Id> removed) + { + for (Node.Id node : removed) + { + PendingEpochs pendingEpochs = pending.get(node.id); + if (pendingEpochs == null) continue; + long[] toComplete = new long[pendingEpochs.size()]; + Long2ObjectHashMap<PendingEpoch>.KeyIterator it = pendingEpochs.keySet().iterator(); + for (int i = 0; it.hasNext(); i++) + toComplete[i] = it.nextLong(); + Arrays.sort(toComplete); + for (long epoch : toComplete) + listener.onEndpointAck(node, epoch); + pending.remove(node.id); + for (long epoch : toComplete) + { + if (hasSyncCompletedFor(epoch)) + listener.onComplete(epoch); + } + } + } + public void reportSyncComplete(long epoch, Collection<Node.Id> notify, Node.Id syncCompleteId) { if (notify.isEmpty()) @@ -258,17 +302,13 @@ public class AccordSyncPropagator private boolean notify(Node.Id to, List<Notification> notifications) { InetAddressAndPort toEp = endpointMapper.mappedEndpoint(to); - if (!failureDetector.isAlive(toEp)) - { - scheduler.schedule(() -> notify(to, notifications), 1, TimeUnit.MINUTES); - return false; - } Message<List<Notification>> msg = Message.out(Verb.ACCORD_SYNC_NOTIFY_REQ, notifications); - messagingService.sendWithCallback(msg, toEp, new RequestCallback<SimpleReply>(){ + RequestCallback<SimpleReply> cb = new RequestCallback<>() + { @Override public void onResponse(Message<SimpleReply> msg) { - Invariants.checkState(msg.payload == SimpleReply.Ok, "Unexpected message: %s", msg); + Invariants.checkState(msg.payload == SimpleReply.Ok, "Unexpected message: %s", msg); Set<Long> completedEpochs = new HashSet<>(); // TODO review is it a good idea to call the listener while not holding the `AccordSyncPropagator` lock? synchronized (AccordSyncPropagator.this) @@ -304,7 +344,22 @@ public class AccordSyncPropagator { return true; } - }); + }; + if (!failureDetector.isAlive(toEp)) + { + // was the endpoint removed from membership? + ClusterMetadata metadata = ClusterMetadata.current(); + if (Gossiper.instance.getEndpointStateForEndpoint(toEp) == null && !metadata.directory.allJoinedEndpoints().contains(toEp) && !metadata.fullCMSMembers().contains(toEp)) + { + // endpoint no longer exists... + cb.onResponse(msg.responseWith(SimpleReply.Ok)); + return true; + } + logger.warn("Node{} is not alive, unable to notify of {}", to, notifications); + scheduler.schedule(() -> notify(to, notifications), 1, TimeUnit.MINUTES); + return false; + } + messagingService.sendWithCallback(msg, toEp, cb); return true; } @@ -352,5 +407,16 @@ public class AccordSyncPropagator this.closed = closed; this.redundant = redundant; } + + @Override + public String toString() + { + return "Notification{" + + "epoch=" + epoch + + ", syncComplete=" + syncComplete + + ", closed=" + closed + + ", redundant=" + redundant + + '}'; + } } } diff --git a/test/unit/org/apache/cassandra/concurrent/SimulatedExecutorFactory.java b/test/unit/org/apache/cassandra/concurrent/SimulatedExecutorFactory.java index 884927bd9a..677f33e704 100644 --- a/test/unit/org/apache/cassandra/concurrent/SimulatedExecutorFactory.java +++ b/test/unit/org/apache/cassandra/concurrent/SimulatedExecutorFactory.java @@ -18,6 +18,7 @@ package org.apache.cassandra.concurrent; +import java.sql.Timestamp; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -41,6 +42,7 @@ import javax.annotation.Nullable; import accord.utils.Gens; import accord.utils.RandomSource; import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.Generators; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; @@ -50,6 +52,7 @@ import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.InternalState import static org.apache.cassandra.concurrent.Interruptible.State.INTERRUPTED; import static org.apache.cassandra.concurrent.Interruptible.State.NORMAL; import static org.apache.cassandra.concurrent.Interruptible.State.SHUTTING_DOWN; +import static org.apache.cassandra.utils.AccordGenerators.fromQT; public class SimulatedExecutorFactory implements ExecutorFactory, Clock { @@ -96,6 +99,16 @@ public class SimulatedExecutorFactory implements ExecutorFactory, Clock private long nowNanos; private int repeatedTasks = 0; + public SimulatedExecutorFactory(RandomSource rs, Consumer<Throwable> onError) + { + this(rs, fromQT(Generators.TIMESTAMP_GEN.map(Timestamp::getTime)).mapToLong(TimeUnit.MILLISECONDS::toNanos).next(rs), onError); + } + + public SimulatedExecutorFactory(RandomSource rs) + { + this(rs, null); + } + public SimulatedExecutorFactory(RandomSource rs, long startTimeNanos) { this(rs, startTimeNanos, null); @@ -117,6 +130,11 @@ public class SimulatedExecutorFactory implements ExecutorFactory, Clock }); } + public boolean hasWork() + { + return queue.size() > repeatedTasks; + } + public boolean processOne() { // if we count the repeated tasks, then processAll will never complete diff --git a/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java b/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java index 02221eb200..c4085c6083 100644 --- a/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java +++ b/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java @@ -257,11 +257,11 @@ public class AccordIndexStressTest extends CQLTester { case Key: { - store = rs.pick(storeToTableToRoutingKeysToTxns.keySet()); + store = rs.pickUnorderedSet(storeToTableToRoutingKeysToTxns.keySet()); var actual = this.storeToTableToRoutingKeysToTxns.get(store); var tableToTokens = store2Table2Tokens.get(store); - table = rs.pick(actual.keySet()); + table = rs.pickUnorderedSet(actual.keySet()); var tokens = tableToTokens.get(table); var offset = rs.nextInt(0, tokens.length); @@ -274,11 +274,11 @@ public class AccordIndexStressTest extends CQLTester break; case Range: { - store = rs.pick(storeToTableToRangesToTxns.keySet()); + store = rs.pickUnorderedSet(storeToTableToRangesToTxns.keySet()); var tableToRangesToTxns = storeToTableToRangesToTxns.get(store); var tableToRanges = store2Table2Ranges.get(store); - table = rs.pick(tableToRangesToTxns.keySet()); + table = rs.pickUnorderedSet(tableToRangesToTxns.keySet()); var wrapper = tableToRanges.get(table); var ranges = wrapper.ranges; var tree = wrapper.tree; diff --git a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java index 84c64bcb45..5200316a2f 100644 --- a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java +++ b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -125,7 +126,7 @@ public class RouteIndexTest extends CQLTester.InMemory @Override public Gen<Command<State, ColumnFamilyStore, ?>> commands(State state) { - Map<Gen<Command<State, ColumnFamilyStore, ?>>, Integer> possible = new HashMap<>(); + Map<Gen<Command<State, ColumnFamilyStore, ?>>, Integer> possible = new LinkedHashMap<>(); possible.put(ignore -> FLUSH, 1); possible.put(ignore -> COMPACT, 1); possible.put(rs -> { @@ -139,9 +140,9 @@ public class RouteIndexTest extends CQLTester.InMemory if (!state.storeToTableToRangesToTxns.isEmpty()) { possible.put(rs -> { - int storeId = rs.pick(state.storeToTableToRangesToTxns.keySet()); + int storeId = rs.pickUnorderedSet(state.storeToTableToRangesToTxns.keySet()); var tables = state.storeToTableToRangesToTxns.get(storeId); - TableId tableId = rs.pick(tables.keySet()); + TableId tableId = rs.pickUnorderedSet(tables.keySet()); var ranges = tables.get(tableId); TreeSet<TokenRange> distinctRanges = ranges.stream().map(Map.Entry::getKey).collect(Collectors.toCollection(() -> new TreeSet<>(TokenRange::compareTo))); TokenRange range; @@ -154,14 +155,14 @@ public class RouteIndexTest extends CQLTester.InMemory switch (rs.nextInt(0, 2)) { case 0: // perfect match - range = rs.pick(distinctRanges); + range = rs.pickOrderedSet(distinctRanges); break; case 1: // mutli-match { - TokenRange a = rs.pick(distinctRanges); - TokenRange b = rs.pick(distinctRanges); + TokenRange a = rs.pickOrderedSet(distinctRanges); + TokenRange b = rs.pickOrderedSet(distinctRanges); while (a.equals(b)) - b = rs.pick(distinctRanges); + b = rs.pickOrderedSet(distinctRanges); if (b.compareTo(a) < 0) { TokenRange tmp = a; diff --git a/test/unit/org/apache/cassandra/net/SimulatedMessageDelivery.java b/test/unit/org/apache/cassandra/net/SimulatedMessageDelivery.java new file mode 100644 index 0000000000..ed24832974 --- /dev/null +++ b/test/unit/org/apache/cassandra/net/SimulatedMessageDelivery.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import javax.annotation.Nullable; + +import org.apache.cassandra.exceptions.RequestFailure; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; + +public class SimulatedMessageDelivery implements MessageDelivery +{ + public enum Action { DELIVER, DELIVER_WITH_FAILURE, DROP, DROP_PARTITIONED, FAILURE } + + public interface ActionSupplier + { + Action get(InetAddressAndPort self, Message<?> message, InetAddressAndPort to); + } + + public interface Scheduler + { + void schedule(Runnable command, long delay, TimeUnit unit); + } + + public interface DropListener + { + void onDrop(Action action, InetAddressAndPort from, Message<?> msg); + } + + private final InetAddressAndPort self; + private final ActionSupplier actions; + private final BiConsumer<InetAddressAndPort, Message<?>> reciever; + private final DropListener onDropped; + private final Scheduler scheduler; + private final Consumer<Throwable> onError; + private final Map<CallbackKey, CallbackContext> callbacks = new HashMap<>(); + private enum Status { Up, Down} + private Status status = Status.Up; + + public SimulatedMessageDelivery(InetAddressAndPort self, + ActionSupplier actions, + BiConsumer<InetAddressAndPort, Message<?>> reciever, + DropListener onDropped, + Scheduler scheduler, + Consumer<Throwable> onError) + { + this.self = self; + this.actions = actions; + this.reciever = reciever; + this.onDropped = onDropped; + this.scheduler = scheduler; + this.onError = onError; + } + + public void stop() + { + callbacks.clear(); + status = Status.Down; + } + + @Override + public <REQ> void send(Message<REQ> message, InetAddressAndPort to) + { + message = message.withFrom(self); + maybeEnqueue(message, to, null); + } + + @Override + public <REQ, RSP> void sendWithCallback(Message<REQ> message, InetAddressAndPort to, RequestCallback<RSP> cb) + { + message = message.withFrom(self); + maybeEnqueue(message, to, cb); + } + + @Override + public <REQ, RSP> void sendWithCallback(Message<REQ> message, InetAddressAndPort to, RequestCallback<RSP> cb, ConnectionType specifyConnection) + { + message = message.withFrom(self); + maybeEnqueue(message, to, cb); + } + + @Override + public <REQ, RSP> Future<Message<RSP>> sendWithResult(Message<REQ> message, InetAddressAndPort to) + { + AsyncPromise<Message<RSP>> promise = new AsyncPromise<>(); + sendWithCallback(message, to, new RequestCallback<RSP>() + { + @Override + public void onResponse(Message<RSP> msg) + { + promise.trySuccess(msg); + } + + @Override + public void onFailure(InetAddressAndPort from, RequestFailure failure) + { + promise.tryFailure(new MessagingService.FailureResponseException(from, failure.reason)); + } + + @Override + public boolean invokeOnFailure() + { + return true; + } + }); + return promise; + } + + @Override + public <V> void respond(V response, Message<?> message) + { + send(message.responseWith(response), message.respondTo()); + } + + private <REQ, RSP> void maybeEnqueue(Message<REQ> message, InetAddressAndPort to, @Nullable RequestCallback<RSP> callback) + { + if (status != Status.Up) + return; + CallbackContext cb; + if (callback != null) + { + CallbackKey key = new CallbackKey(message.id(), to); + if (callbacks.containsKey(key)) + throw new AssertionError("Message id " + message.id() + " to " + to + " already has a callback"); + cb = new CallbackContext(callback); + callbacks.put(key, cb); + } + else + { + cb = null; + } + Action action = actions.get(self, message, to); + switch (action) + { + case DELIVER: + reciever.accept(to, message); + break; + case DROP: + case DROP_PARTITIONED: + onDropped.onDrop(action, to, message); + break; + case DELIVER_WITH_FAILURE: + reciever.accept(to, message); + case FAILURE: + if (action == Action.FAILURE) + onDropped.onDrop(action, to, message); + if (callback != null) + scheduler.schedule(() -> callback.onFailure(to, RequestFailure.UNKNOWN), + message.verb().expiresAfterNanos(), TimeUnit.NANOSECONDS); + return; + default: + throw new UnsupportedOperationException("Unknown action type: " + action); + } + if (cb != null) + { + scheduler.schedule(() -> { + CallbackContext ctx = callbacks.remove(new CallbackKey(message.id(), to)); + if (ctx != null) + { + assert ctx == cb; + try + { + ctx.onFailure(to, RequestFailure.TIMEOUT); + } + catch (Throwable t) + { + onError.accept(t); + } + } + }, message.verb().expiresAfterNanos(), TimeUnit.NANOSECONDS); + } + } + + @SuppressWarnings("rawtypes") + public SimulatedMessageReceiver reciver(IVerbHandler onMessage) + { + return new SimulatedMessageReceiver(onMessage); + } + + public class SimulatedMessageReceiver + { + @SuppressWarnings("rawtypes") + final IVerbHandler onMessage; + + @SuppressWarnings("rawtypes") + public SimulatedMessageReceiver(IVerbHandler onMessage) + { + this.onMessage = onMessage; + } + + public void recieve(Message<?> msg) + { + if (status != Status.Up) + return; + if (msg.verb().isResponse()) + { + CallbackKey key = new CallbackKey(msg.id(), msg.from()); + if (callbacks.containsKey(key)) + { + CallbackContext callback = callbacks.remove(key); + if (callback == null) + return; + try + { + if (msg.isFailureResponse()) + callback.onFailure(msg.from(), (RequestFailure) msg.payload); + else callback.onResponse(msg); + } + catch (Throwable t) + { + onError.accept(t); + } + } + } + else + { + try + { + //noinspection unchecked + onMessage.doVerb(msg); + } + catch (Throwable t) + { + onError.accept(t); + } + } + } + } + + @SuppressWarnings("rawtypes") + public static class SimpleVerbHandler implements IVerbHandler + { + private final Map<Verb, IVerbHandler<?>> handlers; + + public SimpleVerbHandler(Map<Verb, IVerbHandler<?>> handlers) + { + this.handlers = handlers; + } + + @Override + public void doVerb(Message msg) throws IOException + { + IVerbHandler<?> handler = handlers.get(msg.verb()); + if (handler == null) + throw new AssertionError("Unexpected verb: " + msg.verb()); + //noinspection unchecked + handler.doVerb(msg); + } + } + + private static class CallbackContext + { + @SuppressWarnings("rawtypes") + final RequestCallback callback; + + @SuppressWarnings("rawtypes") + private CallbackContext(RequestCallback callback) + { + this.callback = Objects.requireNonNull(callback); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void onResponse(Message msg) + { + callback.onResponse(msg); + } + + public void onFailure(InetAddressAndPort from, RequestFailure failure) + { + if (callback.invokeOnFailure()) callback.onFailure(from, failure); + } + } + + private static class CallbackKey + { + private final long id; + private final InetAddressAndPort peer; + + private CallbackKey(long id, InetAddressAndPort peer) + { + this.id = id; + this.peer = peer; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CallbackKey that = (CallbackKey) o; + return id == that.id && peer.equals(that.peer); + } + + @Override + public int hashCode() + { + return Objects.hash(id, peer); + } + + @Override + public String toString() + { + return "CallbackKey{" + + "id=" + id + + ", peer=" + peer + + '}'; + } + } +} diff --git a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java index c97fc8b28a..7cd6b5f3ab 100644 --- a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java +++ b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -84,7 +83,6 @@ import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; -import org.apache.cassandra.exceptions.RequestFailure; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.HeartBeatState; @@ -98,12 +96,12 @@ import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.LocalStrategy; import org.apache.cassandra.locator.RangesAtEndpoint; -import org.apache.cassandra.net.ConnectionType; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessageDelivery; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.net.RequestCallback; +import org.apache.cassandra.net.SimulatedMessageDelivery; +import org.apache.cassandra.net.SimulatedMessageDelivery.SimulatedMessageReceiver; import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.repair.messages.ValidationResponse; @@ -139,6 +137,7 @@ import org.apache.cassandra.streaming.StreamingDataInputPlus; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tools.nodetool.Repair; import org.apache.cassandra.utils.AbstractTypeGenerators; +import org.apache.cassandra.utils.AccordGenerators; import org.apache.cassandra.utils.CassandraGenerators; import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.Closeable; @@ -149,8 +148,6 @@ import org.apache.cassandra.utils.MerkleTree; import org.apache.cassandra.utils.MerkleTrees; import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.TimeUUID; -import org.apache.cassandra.utils.concurrent.AsyncPromise; -import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.ImmediateFuture; import org.apache.cassandra.utils.progress.ProgressEventType; import org.assertj.core.api.Assertions; @@ -688,7 +685,7 @@ public abstract class FuzzTestBase extends CQLTester.InMemory { ClockAccess.includeThreadAsOwner(); this.rs = rs; - globalExecutor = new SimulatedExecutorFactory(rs, fromQT(Generators.TIMESTAMP_GEN.map(Timestamp::getTime)).mapToLong(TimeUnit.MILLISECONDS::toNanos).next(rs)); + globalExecutor = new SimulatedExecutorFactory(rs); orderedExecutor = globalExecutor.configureSequential("ignore").build(); unorderedScheduled = globalExecutor.scheduled("ignored"); @@ -802,169 +799,23 @@ public abstract class FuzzTestBase extends CQLTester.InMemory } } - private class CallbackContext + private SimulatedMessageDelivery.Action action(InetAddressAndPort self, Message<?> msg, InetAddressAndPort to) { - final RequestCallback callback; - - private CallbackContext(RequestCallback callback) - { - this.callback = Objects.requireNonNull(callback); - } - - public void onResponse(Message msg) - { - callback.onResponse(msg); - } - - public void onFailure(InetAddressAndPort from, RequestFailure failure) - { - if (callback.invokeOnFailure()) callback.onFailure(from, failure); - } + boolean toSelf = self.equals(to); + Node node = nodes.get(to); + Set<Faults> allowedFaults = allowedMessageFaults.apply(node, msg); + if (allowedFaults.contains(Faults.DROP) && !toSelf && networkDrops(self, to)) return SimulatedMessageDelivery.Action.DROP; + return SimulatedMessageDelivery.Action.DELIVER; } - private static class CallbackKey + private boolean networkDrops(InetAddressAndPort self, InetAddressAndPort to) { - private final long id; - private final InetAddressAndPort peer; - - private CallbackKey(long id, InetAddressAndPort peer) - { - this.id = id; - this.peer = peer; - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - CallbackKey that = (CallbackKey) o; - return id == that.id && peer.equals(that.peer); - } - - @Override - public int hashCode() - { - return Objects.hash(id, peer); - } - - @Override - public String toString() - { - return "CallbackKey{" + - "id=" + id + - ", peer=" + peer + - '}'; - } + return networkDrops.computeIfAbsent(new Connection(self, to), ignore -> Gens.bools().biasedRepeatingRuns(rs.nextInt(1, 11) / 100.0D, rs.nextInt(3, 15)).asSupplier(rs)).get(); } - private class Messaging implements MessageDelivery - { - final InetAddressAndPort broadcastAddressAndPort; - final Map<CallbackKey, CallbackContext> callbacks = new HashMap<>(); - - private Messaging(InetAddressAndPort broadcastAddressAndPort) + private long networkJitterNanos(InetAddressAndPort self, InetAddressAndPort to) { - this.broadcastAddressAndPort = broadcastAddressAndPort; - } - - @Override - public <REQ> void send(Message<REQ> message, InetAddressAndPort to) - { - message = message.withFrom(broadcastAddressAndPort); - maybeEnqueue(message, to, null); - } - - @Override - public <REQ, RSP> void sendWithCallback(Message<REQ> message, InetAddressAndPort to, RequestCallback<RSP> cb) - { - message = message.withFrom(broadcastAddressAndPort); - maybeEnqueue(message, to, cb); - } - - @Override - public <REQ, RSP> void sendWithCallback(Message<REQ> message, InetAddressAndPort to, RequestCallback<RSP> cb, ConnectionType specifyConnection) - { - message = message.withFrom(broadcastAddressAndPort); - maybeEnqueue(message, to, cb); - } - - private <REQ, RSP> void maybeEnqueue(Message<REQ> message, InetAddressAndPort to, @Nullable RequestCallback<RSP> callback) - { - CallbackContext cb; - if (callback != null) - { - CallbackKey key = new CallbackKey(message.id(), to); - if (callbacks.containsKey(key)) - throw new AssertionError("Message id " + message.id() + " to " + to + " already has a callback"); - cb = new CallbackContext(callback); - callbacks.put(key, cb); - } - else - { - cb = null; - } - boolean toSelf = this.broadcastAddressAndPort.equals(to); - Node node = nodes.get(to); - Set<Faults> allowedFaults = allowedMessageFaults.apply(node, message); - if (allowedFaults.isEmpty()) - { - // enqueue so stack overflow doesn't happen with the inlining - unorderedScheduled.submit(() -> node.handle(message)); - } - else - { - Runnable enqueue = () -> { - if (!allowedFaults.contains(Faults.DELAY)) - { - unorderedScheduled.submit(() -> node.handle(message)); - } - else - { - if (toSelf) unorderedScheduled.submit(() -> node.handle(message)); - else - unorderedScheduled.schedule(() -> node.handle(message), networkJitterNanos(to), TimeUnit.NANOSECONDS); - } - }; - - if (!allowedFaults.contains(Faults.DROP)) enqueue.run(); - else - { - if (!toSelf && networkDrops(to)) - { -// logger.warn("Dropped message {}", message); - // drop - } - else - { - enqueue.run(); - } - } - - if (cb != null) - { - unorderedScheduled.schedule(() -> { - CallbackContext ctx = callbacks.remove(new CallbackKey(message.id(), to)); - if (ctx != null) - { - assert ctx == cb; - try - { - ctx.onFailure(to, RequestFailure.TIMEOUT); - } - catch (Throwable t) - { - failures.add(t); - } - } - }, message.verb().expiresAfterNanos(), TimeUnit.NANOSECONDS); - } - } - } - - private long networkJitterNanos(InetAddressAndPort to) - { - return networkLatencies.computeIfAbsent(new Connection(broadcastAddressAndPort, to), ignore -> { + return networkLatencies.computeIfAbsent(new Connection(self, to), ignore -> { long min = TimeUnit.MICROSECONDS.toNanos(500); long maxSmall = TimeUnit.MILLISECONDS.toNanos(5); long max = TimeUnit.SECONDS.toNanos(5); @@ -974,42 +825,23 @@ public abstract class FuzzTestBase extends CQLTester.InMemory }).getAsLong(); } - private boolean networkDrops(InetAddressAndPort to) - { - return networkDrops.computeIfAbsent(new Connection(broadcastAddressAndPort, to), ignore -> Gens.bools().biasedRepeatingRuns(rs.nextInt(1, 11) / 100.0D, rs.nextInt(3, 15)).asSupplier(rs)).get(); - } - - @Override - public <REQ, RSP> Future<Message<RSP>> sendWithResult(Message<REQ> message, InetAddressAndPort to) - { - AsyncPromise<Message<RSP>> promise = new AsyncPromise<>(); - sendWithCallback(message, to, new RequestCallback<RSP>() - { - @Override - public void onResponse(Message<RSP> msg) - { - promise.trySuccess(msg); - } - - @Override - public void onFailure(InetAddressAndPort from, RequestFailure failure) - { - promise.tryFailure(new MessagingService.FailureResponseException(from, failure.reason)); - } - - @Override - public boolean invokeOnFailure() - { - return true; - } - }); - return promise; - } - - @Override - public <V> void respond(V response, Message<?> message) + private class Messaging extends SimulatedMessageDelivery + { + private Messaging(InetAddressAndPort broadcastAddressAndPort) { - send(message.responseWith(response), message.respondTo()); + super(broadcastAddressAndPort, + Cluster.this::action, + (to, msg) -> { + Node node = nodes.get(to); + Set<Faults> allowedFaults = allowedMessageFaults.apply(node, msg); + if (!allowedFaults.contains(Faults.DELAY) || broadcastAddressAndPort.equals(to)) + unorderedScheduled.submit(() -> node.handle(msg)); + else + unorderedScheduled.schedule(() -> node.handle(msg), networkJitterNanos(broadcastAddressAndPort, to), TimeUnit.NANOSECONDS); + }, + (action, to, msg) -> logger.warn("{} message {}", action, msg), + unorderedScheduled::schedule, + failures::add); } } @@ -1059,7 +891,7 @@ public abstract class FuzzTestBase extends CQLTester.InMemory final InetAddressAndPort addressAndPort; final Collection<Token> tokens; final ActiveRepairService activeRepairService; - final IVerbHandler verbHandler; + final SimulatedMessageReceiver receiver; final Messaging messaging; final IValidationManager validationManager; private FailingBiConsumer<ColumnFamilyStore, Validator> doValidation = DEFAULT_VALIDATION; @@ -1093,7 +925,7 @@ public abstract class FuzzTestBase extends CQLTester.InMemory validator.fail(e); } }); - this.verbHandler = new IVerbHandler<>() + this.receiver = messaging.reciver(new IVerbHandler<>() { private final RepairMessageVerbHandler repairVerbHandler = new RepairMessageVerbHandler(Node.this); private final IVerbHandler<PaxosStartPrepareCleanup.Request> paxosStartPrepareCleanup = PaxosStartPrepareCleanup.createVerbHandler(Node.this); @@ -1125,7 +957,7 @@ public abstract class FuzzTestBase extends CQLTester.InMemory repairVerbHandler.doVerb(message); } } - }; + }); activeRepairService.start(); } @@ -1165,38 +997,7 @@ public abstract class FuzzTestBase extends CQLTester.InMemory } for (MessageListener l : listeners) l.preHandle(this, msg); - if (msg.verb().isResponse()) - { - // handle callbacks - CallbackKey key = new CallbackKey(msg.id(), msg.from()); - if (messaging.callbacks.containsKey(key)) - { - CallbackContext callback = messaging.callbacks.remove(key); - if (callback == null) - return; - try - { - if (msg.isFailureResponse()) - callback.onFailure(msg.from(), (RequestFailure) msg.payload); - else callback.onResponse(msg); - } - catch (Throwable t) - { - failures.add(t); - } - } - } - else - { - try - { - verbHandler.doVerb(msg); - } - catch (Throwable e) - { - failures.add(e); - } - } + receiver.recieve(msg); } public UUID hostId() @@ -1379,10 +1180,7 @@ public abstract class FuzzTestBase extends CQLTester.InMemory private static <T> Gen<T> fromQT(org.quicktheories.core.Gen<T> qt) { - return rs -> { - JavaRandom r = new JavaRandom(rs.asJdkRandom()); - return qt.generate(r); - }; + return AccordGenerators.fromQT(qt); } public static class HackStrat extends LocalStrategy diff --git a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java index 7b77791c6e..2f689187ac 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java @@ -40,6 +40,7 @@ import accord.topology.Shard; import accord.topology.Topology; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.ServerTestUtils; +import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Murmur3Partitioner; @@ -176,7 +177,7 @@ public class AccordConfigurationServiceTest @Test public void initialEpochTest() throws Throwable { - AccordConfigurationService service = new AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector()); + AccordConfigurationService service = new AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector(), AccordConfigurationService.SystemTableDiskStateManager.instance, ScheduledExecutors.scheduledTasks); Assert.assertEquals(null, AccordKeyspace.loadEpochDiskState()); service.start(); Assert.assertEquals(null, AccordKeyspace.loadEpochDiskState()); @@ -201,7 +202,7 @@ public class AccordConfigurationServiceTest @Test public void loadTest() throws Throwable { - AccordConfigurationService service = new AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector()); + AccordConfigurationService service = new AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector(), AccordConfigurationService.SystemTableDiskStateManager.instance, ScheduledExecutors.scheduledTasks); service.start(); Topology topology1 = new Topology(1, new Shard(AccordTopology.fullRange(TBL1), ID_LIST, ID_SET)); @@ -221,7 +222,7 @@ public class AccordConfigurationServiceTest service.reportTopology(topology3); service.acknowledgeEpoch(EpochReady.done(3), true); - AccordConfigurationService loaded = new AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector()); + AccordConfigurationService loaded = new AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector(), AccordConfigurationService.SystemTableDiskStateManager.instance, ScheduledExecutors.scheduledTasks); loaded.updateMapping(mappingForEpoch(ClusterMetadata.current().epoch.getEpoch() + 1)); AbstractConfigurationServiceTest.TestListener listener = new AbstractConfigurationServiceTest.TestListener(loaded, true); loaded.registerListener(listener); @@ -240,7 +241,7 @@ public class AccordConfigurationServiceTest @Test public void truncateTest() { - AccordConfigurationService service = new AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector()); + AccordConfigurationService service = new AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector(), AccordConfigurationService.SystemTableDiskStateManager.instance, ScheduledExecutors.scheduledTasks); TestListener serviceListener = new TestListener(service, true); service.registerListener(serviceListener); service.start(); @@ -258,7 +259,7 @@ public class AccordConfigurationServiceTest Assert.assertEquals(EpochDiskState.create(3), service.diskState()); serviceListener.assertTruncates(3L); - AccordConfigurationService loaded = new AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector()); + AccordConfigurationService loaded = new AccordConfigurationService(ID1, new Messaging(), new MockFailureDetector(), AccordConfigurationService.SystemTableDiskStateManager.instance, ScheduledExecutors.scheduledTasks); loaded.updateMapping(mappingForEpoch(ClusterMetadata.current().epoch.getEpoch() + 1)); TestListener loadListener = new TestListener(loaded, true); loaded.registerListener(loadListener); diff --git a/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java b/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java index 55d1e7e117..9c613624fe 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java @@ -186,7 +186,7 @@ public class AccordKeyspaceTest extends CQLTester.InMemory // else this will loop forever... for (int attempt = 0; attempt < 10; attempt++) { - TableId tableId = rs.pick(tables.keySet()); + TableId tableId = rs.pickOrderedSet(tables.navigableKeySet()); IPartitioner partitioner = tables.get(tableId); ByteBuffer data = !(partitioner instanceof LocalPartitioner) ? Int32Type.instance.decompose(rs.nextInt()) : fromQT(getTypeSupport(partitioner.getTokenValidator()).bytesGen()).next(rs); @@ -258,7 +258,7 @@ public class AccordKeyspaceTest extends CQLTester.InMemory for (int i = 0, queries = rs.nextInt(1, 5); i < queries; i++) { - int store = rs.pick(storesToKeys.keySet()); + int store = rs.pickOrderedSet(storesToKeys.navigableKeySet()); var keysForStore = new ArrayList<>(storesToKeys.get(store)); int offset; diff --git a/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java b/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java index f57a3c1238..12d5c75c01 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java @@ -60,6 +60,9 @@ import org.apache.cassandra.concurrent.ScheduledExecutorPlus; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.exceptions.RequestFailure; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.HeartBeatState; import org.apache.cassandra.gms.IFailureDetectionEventListener; import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.locator.InetAddressAndPort; @@ -94,6 +97,10 @@ public class AccordSyncPropagatorTest Gen<Ranges> rangesGen = AccordGenerators.ranges().filter(r -> !r.isEmpty()); Gen<List<Node.Id>> nodesGen = Gens.lists(AccordGens.nodes()).unique().ofSizeBetween(1, 40); qt().withExamples(100).check(rs -> { + // when gossip and cluster metadata don't know an endpoint, retries are avoided (node removed) + // so when instances are created here they are added to gossip to trick the membership check... + Gossiper.instance.clearUnsafe(); + List<Node.Id> nodes = nodesGen.next(rs); Set<Node.Id> nodesAsSet = ImmutableSet.copyOf(nodes); @@ -214,6 +221,7 @@ public class AccordSyncPropagatorTest Sink sink = new Sink(id); IFailureDetector fd = new FailureDetector(address); instances.put(id, new Instace(id, address, cs, sink, fd, cs, new AccordSyncPropagator(id, Cluster.this, sink, fd, scheduler, cs))); + Gossiper.instance.endpointStateMap.put(address, new EndpointState(HeartBeatState.empty())); } this.nodeToAddress = nodeToAddress.build(); this.instances = instances.build(); diff --git a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java new file mode 100644 index 0000000000..b0e2fec47d --- /dev/null +++ b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java @@ -0,0 +1,753 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumMap; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Objects; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import com.google.common.collect.Sets; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.api.ConfigurationService; +import accord.api.ConfigurationService.EpochReady; +import accord.impl.SizeOfIntersectionSorter; +import accord.local.Node; +import accord.primitives.Ranges; +import accord.topology.Topology; +import accord.topology.TopologyManager; +import accord.utils.Gen; +import accord.utils.Gens; +import accord.utils.Invariants; +import accord.utils.Property.Command; +import accord.utils.Property.Commands; +import accord.utils.Property.UnitCommand; +import accord.utils.RandomSource; +import accord.utils.async.AsyncChain; +import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResult; +import org.apache.cassandra.concurrent.ScheduledExecutorPlus; +import org.apache.cassandra.concurrent.SimulatedExecutorFactory; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Murmur3Partitioner.LongToken; +import org.apache.cassandra.gms.IFailureDetectionEventListener; +import org.apache.cassandra.gms.IFailureDetector; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.SimulatedMessageDelivery; +import org.apache.cassandra.net.SimulatedMessageDelivery.Action; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.schema.DistributedMetadataLogKeyspace; +import org.apache.cassandra.schema.DistributedSchema; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableParams; +import org.apache.cassandra.schema.Tables; +import org.apache.cassandra.service.accord.AccordConfigurationService.EpochSnapshot; +import org.apache.cassandra.service.consensus.TransactionalMode; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.StubClusterMetadataService; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeAddresses; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.ownership.DataPlacement; +import org.apache.cassandra.tcm.ownership.DataPlacements; +import org.apache.cassandra.utils.ByteArrayUtil; +import org.apache.cassandra.utils.Pair; +import org.assertj.core.api.Assertions; + +import static accord.utils.Property.stateful; + +public class EpochSyncTest +{ + private static final Logger logger = LoggerFactory.getLogger(EpochSyncTest.class); + + static + { + DatabaseDescriptor.clientInitialization(); + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); + + ClusterMetadataService.setInstance(StubClusterMetadataService.forTesting()); + } + + @Test + public void test() + { + stateful().withExamples(50).check(new Commands<Cluster, Void>() + { + @Override + public Gen<Cluster> genInitialState() + { + return Cluster::new; + } + + @Override + public Void createSut(Cluster Cluster) + { + return null; + } + + @Override + public Gen<Command<Cluster, Void, ?>> commands(Cluster cluster) + { + List<Node.Id> alive = cluster.alive(); + Map<Gen<Command<Cluster, Void, ?>>, Integer> possible = new LinkedHashMap<>(); + if (alive.size() < cluster.maxNodes) + { + // add node + possible.put(rs -> { + Node.Id id = new Node.Id(++cluster.nodeCounter); + long token = cluster.tokenGen.nextLong(rs); + while (cluster.tokens.contains(token)) + token = cluster.tokenGen.nextLong(rs); + long epoch = cluster.current.epoch.getEpoch() + 1; + long finalToken = token; + return new SimpleCommand("Add Node " + id + "; token=" + token + ", epoch=" + epoch, + c -> c.addNode(id, finalToken)); + }, 5); + } + if (alive.size() > cluster.minNodes) + { + possible.put(rs -> { + Node.Id pick = rs.pick(alive); + long token = cluster.instances.get(pick).token; + long epoch = cluster.current.epoch.getEpoch() + 1; + return new SimpleCommand("Remove Node " + pick + "; token=" + token + "; epoch=" + epoch, c -> c.removeNode(pick)); + }, 3); + } + if (cluster.hasWork()) + { + possible.put(rs -> new SimpleCommand("Process Some", + c -> {//noinspection StatementWithEmptyBody + for (int i = 0, attempts = rs.nextInt(1, 100); i < attempts && c.processOne(); i++) + { + } + }), 10); + } + + possible.put(rs -> new SimpleCommand("Validate", + c -> c.validate(false)), 1); + possible.put(rs -> new SimpleCommand("Bump Epoch " + (cluster.current.epoch.getEpoch() + 1), + Cluster::bumpEpoch), 10); + return Gens.oneOf(possible); + } + + @Override + public void destroyState(Cluster cluster) + { + cluster.processAll(); + cluster.validate(true); + } + }); + } + + private static class SimpleCommand implements UnitCommand<Cluster, Void> + { + private final String name; + private final Consumer<Cluster> fn; + + private SimpleCommand(String name, Consumer<Cluster> fn) + { + this.name = name; + this.fn = fn; + } + + @Override + public String detailed(Cluster Cluster) + { + return name; + } + + @Override + public void applyUnit(Cluster Cluster) + { + fn.accept(Cluster); + } + + @Override + public void runUnit(Void Void) + { + + } + } + + private static class Cluster + { + private static final int rf = 2; + private static final ReplicationParams replication_params = ReplicationParams.simple(rf); + private static final ReplicationParams meta = ReplicationParams.simpleMeta(1, Collections.singleton("dc1")); + + private final RandomSource rs; + private final int minNodes, maxNodes; + private final Gen.LongGen tokenGen; + private final SortedSet<Long> tokens = new TreeSet<>(); + private final Map<Node.Id, Instance> instances = new HashMap<>(); + private final Set<Node.Id> removed = new HashSet<>(); + private final List<Throwable> failures = new ArrayList<>(); + private final SimulatedExecutorFactory globalExecutor; + private final ScheduledExecutorPlus scheduler; + private int nodeCounter = 0; + private ClusterMetadata current = new ClusterMetadata(Murmur3Partitioner.instance, Directory.EMPTY, + new DistributedSchema(Keyspaces.of( + DistributedMetadataLogKeyspace.initialMetadata(Collections.singleton("dc1")), + KeyspaceMetadata.create("test", KeyspaceParams.simple(rf), Tables.of(TableMetadata.minimal("test", "tb1").unbuild().params(TableParams.builder().transactionalMode(TransactionalMode.full).build()).build()))))); + private final IFailureDetector fd = new IFailureDetector() + { + @Override + public boolean isAlive(InetAddressAndPort ep) + { + return !removed.contains(nodeId(ep)); + } + + @Override + public void interpret(InetAddressAndPort ep) + { + + } + + @Override + public void report(InetAddressAndPort ep) + { + + } + + @Override + public void remove(InetAddressAndPort ep) + { + + } + + @Override + public void forceConviction(InetAddressAndPort ep) + { + + } + + @Override + public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener) + { + + } + + @Override + public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener) + { + + } + }; + + private static InetAddressAndPort address(Node.Id id) + { + try + { + return InetAddressAndPort.getByAddress(ByteArrayUtil.bytes(id.id)); + } + catch (UnknownHostException e) + { + throw new AssertionError("Unable to create address for id " + id, e); + } + } + + public enum EpochTracker { topologyManager, accordSyncPropagator, configurationService} + + Set<EpochTracker> globalSynced(long epoch) + { + return alive().stream() + .filter(n -> instances.get(n).epoch.getEpoch() <= epoch) + .map(n -> instances.get(n).synced(epoch)) + .reduce(EnumSet.allOf(EpochTracker.class), Sets::intersection); + } + + boolean allSynced(long epoch) + { + Set<EpochTracker> done = globalSynced(epoch); + return done.contains(EpochTracker.topologyManager); + } + + private static Node.Id nodeId(InetAddressAndPort address) + { + return new Node.Id(ByteArrayUtil.getInt(address.addressBytes)); + } + + public Cluster(RandomSource rs) + { + this.rs = rs; + this.minNodes = 3; + this.maxNodes = 10; + this.tokenGen = rs2 -> rs2.nextLong(Long.MIN_VALUE + 1, Long.MAX_VALUE); + + this.globalExecutor = new SimulatedExecutorFactory(rs, failures::add); + this.scheduler = globalExecutor.scheduled("ignored"); + Stage.MISC.unsafeSetExecutor(scheduler); + + scheduler.scheduleWithFixedDelay(() -> { + if (aliveCount() < 2) return; + if (!partitions.isEmpty() && rs.nextBoolean()) + { + // remove partition + if (partitions.size() == 1) + { + partitions.clear(); + return; + } + partitions.remove(rs.pickOrderedSet(partitions)); + } + else + { + // add partition + List<Node.Id> alive = alive(); + InetAddressAndPort a = address(rs.pick(alive)); + InetAddressAndPort b = address(rs.pick(alive)); + while (a.equals(b)) + b = address(rs.pick(alive)); + partitions.add(new Connection(a, b)); + } + }, 1, 1, TimeUnit.MINUTES); + } + + void validate(boolean isDone) + { + for (Node.Id id : alive()) + { + Instance inst = instances.get(id); + if (removed.contains(id)) continue; // ignore removed nodes + AccordConfigurationService conf = inst.config; + TopologyManager tm = inst.topology; + for (long epoch = inst.epoch.getEpoch(); epoch <= current.epoch.getEpoch(); epoch++) + { + // validate config + EpochSnapshot snapshot = conf.getEpochSnapshot(epoch); + if (isDone) + { + Assertions.assertThat(snapshot).describedAs("node%s does not have epoch %d", id, epoch).isNotNull(); + Assertions.assertThat(snapshot.syncStatus).isEqualTo(AccordConfigurationService.SyncStatus.COMPLETED); + + // validate topology manager + Assertions.assertThat(tm.hasEpoch(epoch)).describedAs("node%s does not have epoch %d", id, epoch).isTrue(); + Ranges ranges = tm.globalForEpoch(epoch).ranges().mergeTouching(); + Ranges actual = tm.syncComplete(epoch).mergeTouching(); + Assertions.assertThat(actual).describedAs("node%s does not have all expected sync ranges for epoch %d; missing %s", id, epoch, ranges.subtract(actual)).isEqualTo(ranges); + } + else + { + if (snapshot == null || snapshot.syncStatus != AccordConfigurationService.SyncStatus.COMPLETED) continue; + + if (!allSynced(epoch)) + continue; + + Assertions.assertThat(tm.hasEpoch(epoch)).describedAs("node%s does not have epoch %d", id, epoch).isTrue(); + Topology topology = tm.globalForEpoch(epoch); + Ranges ranges = topology.ranges().mergeTouching(); + Ranges actual = tm.syncComplete(epoch).mergeTouching(); + // TopologyManager defines syncComplete for an epoch as (epoch - 1).syncComplete. This means that an epoch has reached quorum, but will still miss ranges as previous epochs have not + if (!ranges.equals(actual) && tm.minEpoch() != epoch && !ranges.equals(tm.syncComplete(epoch - 1).mergeTouching())) + continue; + Assertions.assertThat(actual) + .describedAs("node%s does not have all expected sync ranges for epoch %d; missing %s; peers=%s; previous epochs %s", id, epoch, ranges.subtract(actual), topology.nodes(), + LongStream.range(inst.epoch.getEpoch(), epoch + 1).mapToObj(e -> e + " -> " + conf.getEpochSnapshot(e).syncStatus + "(synced=" + globalSynced(e) + "): " + tm.syncComplete(e)).collect(Collectors.joining("\n"))) + .isEqualTo(ranges); + } + } + } + } + + String displayTopology() + { + List<Node.Id> alive = alive(); + List<Pair<Node.Id, Long>> withToken = new ArrayList<>(alive.size()); + for (Node.Id n : alive) + withToken.add(Pair.create(n, instances.get(n).token)); + withToken.sort(Comparator.comparing(a -> a.right)); + StringBuilder sb = new StringBuilder(); + for (var p : withToken) + sb.append(p.left).append('\t').append(p.right).append('\n'); + return sb.toString(); + } + + @Override + public String toString() + { + return "Topology:\n" + displayTopology(); + } + + boolean hasWork() + { + return globalExecutor.hasWork(); + } + + boolean processOne() + { + boolean result = globalExecutor.processOne(); + checkFailures(); + return result; + } + + @SuppressWarnings("StatementWithEmptyBody") + void processAll() + { + while (processOne()) + { + } + } + + public void checkFailures() + { + if (Thread.interrupted()) + failures.add(new InterruptedException()); + if (failures.isEmpty()) return; + AssertionError error = new AssertionError("Unexpected exceptions found"); + failures.forEach(error::addSuppressed); + failures.clear(); + throw error; + } + + List<Node.Id> alive() + { + ArrayList<Node.Id> ids = new ArrayList<>(Sets.difference(instances.keySet(), removed)); + ids.sort(Comparator.naturalOrder()); + return ids; + } + + int aliveCount() + { + return instances.size() - removed.size(); + } + + private final NavigableSet<Connection> partitions = new TreeSet<>(); + + private boolean partitioned(InetAddressAndPort self, InetAddressAndPort to) + { + return partitions.contains(new Connection(self, to)); + } + + private SimulatedMessageDelivery createMessaging(Node.Id id) + { + InetAddressAndPort address = address(id); + return new SimulatedMessageDelivery(address, + (self, msg, to) -> { + if (removed.contains(nodeId(self)) || removed.contains(nodeId(to))) + return Action.DROP; + if (!self.equals(to) && partitioned(self, to)) + return Action.DROP_PARTITIONED; + if (rs.decide(.01)) + return rs.nextBoolean() ? Action.DELIVER_WITH_FAILURE : Action.FAILURE; + return Action.DELIVER; + }, + (to, msg) -> instances.get(nodeId(to)).reciver.recieve(msg), + (action, to, msg) -> logger.warn("{} message {}", action, msg), + scheduler::schedule, + failures::add); + } + + void addNode(Node.Id id, long token) + { + Invariants.checkState(!tokens.contains(token), "Attempted to add token %d for node %s but token is already taken", token, id); + Epoch epoch = Epoch.create(current.epoch.getEpoch() + 1); + + Instance instance = new Instance(id, token, epoch, createMessaging(id), fd); + instances.put(id, instance); + tokens.add(token); + + current = current.forceEpoch(epoch) + .withPlacements(DataPlacements.builder(2) + .with(meta, DataPlacement.empty()) + .with(replication_params, rebuildPlacements(epoch)) + .build()) + .withDirectory(current.directory.with(new NodeAddresses(address(id)), new Location("dc1", "r1"))); + notify(current); + } + + void removeNode(Node.Id pick) + { + Instance inst = Objects.requireNonNull(instances.get(pick), "Unknown id " + pick); + Invariants.checkState(!removed.contains(pick), "Can not remove node twice; node " + pick); + tokens.remove(inst.token); + removed.add(pick); + inst.stop(); + current = current.forceEpoch(Epoch.create(current.epoch.getEpoch() + 1)) + .withDirectory(current.directory.without(new NodeId(pick.id))); + + current = current.withPlacements(DataPlacements.builder(2) + .with(meta, DataPlacement.empty()) + .with(replication_params, rebuildPlacements(current.epoch)) + .build()); + notify(current); + } + + private DataPlacement rebuildPlacements(Epoch epoch) + { + DataPlacement.Builder builder = DataPlacement.builder(); + for (Node.Id inst : alive()) + for (Replica replica : instances.get(inst).replica()) + builder.withReadReplica(epoch, replica).withWriteReplica(epoch, replica); + return builder.build(); + } + + void bumpEpoch() + { + current = current.forceEpoch(Epoch.create(current.epoch.getEpoch() + 1)); + notify(current); + } + + private void notify(ClusterMetadata current) + { + Ranges ranges = AccordTopology.createAccordTopology(current).ranges().mergeTouching(); + if (!current.directory.isEmpty()) + Assertions.assertThat(ranges).hasSize(1); + ((StubClusterMetadataService) ClusterMetadataService.instance()).setMetadata(current); + for (Node.Id id : alive()) + { + Instance inst = instances.get(id); + inst.maybeStart(); + inst.config.maybeReportMetadata(current); + } + } + + @SuppressWarnings("SameParameterValue") + private <T> AsyncChain<T> schedule(long time, TimeUnit unit, Callable<T> task) + { + return new AsyncChains.Head<>() + { + @Override + protected void start(BiConsumer<? super T, Throwable> callback) + { + scheduler.schedule(() -> { + T value; + try + { + value = task.call(); + } + catch (Throwable t) + { + callback.accept(null, t); + return; + } + callback.accept(value, null); + }, time, unit); + } + }; + } + + private enum Status { Init, Started} + private class Instance + { + private final Node.Id id; + private final long token; + private final AccordConfigurationService config; + private final SimulatedMessageDelivery messaging; + private final SimulatedMessageDelivery.SimulatedMessageReceiver reciver; + private final TopologyManager topology; + private final Epoch epoch; + private Status status = Status.Init; + + Instance(Node.Id node, long token, Epoch epoch, SimulatedMessageDelivery messagingService, IFailureDetector failureDetector) + { + this.id = node; + this.token = token; + this.epoch = epoch; + this.topology = new TopologyManager(SizeOfIntersectionSorter.SUPPLIER, id); + AccordConfigurationService.DiskStateManager instance = MockDiskStateManager.instance; + config = new AccordConfigurationService(node, messagingService, failureDetector, instance, scheduler); + config.registerListener(new ConfigurationService.Listener() + { + @Override + public AsyncResult<Void> onTopologyUpdate(Topology topology, boolean startSync) + { +// EpochReady ready = EpochReady.done(topology.epoch()); + AsyncResult<Void> metadata = schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> null).beginAsResult(); + AsyncResult<Void> coordination = metadata.flatMap(ignore -> schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> null)).beginAsResult(); + AsyncResult<Void> data = coordination.flatMap(ignore -> schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> null)).beginAsResult(); + AsyncResult<Void> reads = data.flatMap(ignore -> schedule(rs.nextInt(1, 10), TimeUnit.SECONDS, (Callable<Void>) () -> null)).beginAsResult(); + EpochReady ready = new EpochReady(topology.epoch(), metadata, coordination, data, reads); + + topology().onTopologyUpdate(topology, () -> ready); + ready.coordination.addCallback(() -> topology().onEpochSyncComplete(id, topology.epoch())); + if (topology().minEpoch() == topology.epoch() && topology().epoch() != topology.epoch()) + return ready.coordination; + config.acknowledgeEpoch(ready, startSync); + return ready.coordination; + } + + @Override + public void onRemoteSyncComplete(Node.Id node, long epoch) + { + topology.onEpochSyncComplete(node, epoch); + } + + @Override + public void onRemoveNodes(long epoch, Collection<Node.Id> removed) + { + topology.onRemoveNodes(epoch, removed); + } + + @Override + public void truncateTopologyUntil(long epoch) + { + topology.truncateTopologyUntil(epoch); + } + + @Override + public void onEpochClosed(Ranges ranges, long epoch) + { + topology.onEpochClosed(ranges, epoch); + } + + @Override + public void onEpochRedundant(Ranges ranges, long epoch) + { + topology.onEpochRedundant(ranges, epoch); + } + }); + + Map<Verb, IVerbHandler<?>> handlers = new EnumMap<>(Verb.class); + //noinspection unchecked + handlers.put(Verb.ACCORD_SYNC_NOTIFY_REQ, msg -> AccordService.receive(messagingService, config, (Message<List<AccordSyncPropagator.Notification>>) (Message<?>) msg)); + this.messaging = messagingService; + this.reciver = messagingService.reciver(new SimulatedMessageDelivery.SimpleVerbHandler(handlers)); + } + + void maybeStart() + { + if (status == Status.Init) + { + start(); + status = Status.Started; + } + } + + private void start() + { + config.start(); + } + + TopologyManager topology() + { + return topology; + } + + Collection<Replica> replica() + { + InetAddressAndPort address = Cluster.address(id); + SortedSet<Long> lessThan = tokens.headSet(token); + if (lessThan.isEmpty()) + { + // wrap around + return Arrays.asList(new Replica(address, new LongToken(Long.MIN_VALUE), new LongToken(token), true), + new Replica(address, new LongToken(tokens.last()), new LongToken(Long.MIN_VALUE), true)); + } + + return Collections.singletonList(new Replica(address, new LongToken(lessThan.last()), new LongToken(token), true)); + } + + Set<EpochTracker> synced(long epoch) + { + if (epoch < this.epoch.getEpoch()) throw new IllegalArgumentException("Asked for epoch before this instance existed"); + EnumSet<EpochTracker> done = EnumSet.noneOf(EpochTracker.class); + EpochSnapshot snapshot = config.getEpochSnapshot(epoch); + if (snapshot != null && snapshot.syncStatus == AccordConfigurationService.SyncStatus.COMPLETED) + done.add(EpochTracker.configurationService); + if (topology.hasReachedQuorum(epoch)) + done.add(EpochTracker.topologyManager); + if (!config.syncPropagator().hasPending(epoch)) + done.add(EpochTracker.accordSyncPropagator); + return done; + } + + void stop() + { + messaging.stop(); + } + } + } + + private static class Connection implements Comparable<Connection> + { + final InetAddressAndPort from, to; + + private Connection(InetAddressAndPort from, InetAddressAndPort to) + { + this.from = from; + this.to = to; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Connection that = (Connection) o; + return from.equals(that.from) && to.equals(that.to); + } + + @Override + public int hashCode() + { + return Objects.hash(from, to); + } + + @Override + public String toString() + { + return "Connection{" + "from=" + from + ", to=" + to + '}'; + } + + @Override + public int compareTo(Connection o) + { + int rc = from.compareTo(o.from); + if (rc == 0) + rc = to.compareTo(o.to); + return rc; + } + } +} diff --git a/test/unit/org/apache/cassandra/service/accord/LoggingDiskStateManager.java b/test/unit/org/apache/cassandra/service/accord/LoggingDiskStateManager.java new file mode 100644 index 0000000000..7b8ce0e335 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/accord/LoggingDiskStateManager.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import accord.local.Node; +import accord.primitives.Ranges; +import accord.topology.Topology; +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Set; + +/** + * When trying to inspect the order in which disk state is modified, this class can aid by adding logging. This class + * mostly exists for testing to aid in debugging. + */ +@SuppressWarnings("unused") +@VisibleForTesting +public class LoggingDiskStateManager implements AccordConfigurationService.DiskStateManager { + private static final Logger logger = LoggerFactory.getLogger(LoggingDiskStateManager.class); + private final Node.Id self; + private final AccordConfigurationService.DiskStateManager delegate; + + public LoggingDiskStateManager(Node.Id self, AccordConfigurationService.DiskStateManager delegate) { + this.self = self; + this.delegate = delegate; + } + + @Override + public AccordKeyspace.EpochDiskState loadTopologies(AccordKeyspace.TopologyLoadConsumer consumer) { + logger.info("[node={}] Calling loadTopologies()", self); + return delegate.loadTopologies(consumer); + } + + @Override + public AccordKeyspace.EpochDiskState setNotifyingLocalSync(long epoch, Set<Node.Id> pending, AccordKeyspace.EpochDiskState diskState) { + logger.info("[node={}] Calling setNotifyingLocalSync({}, {}, {})", self, epoch, pending, diskState); + return delegate.setNotifyingLocalSync(epoch, pending, diskState); + } + + @Override + public AccordKeyspace.EpochDiskState setCompletedLocalSync(long epoch, AccordKeyspace.EpochDiskState diskState) { + logger.info("[node={}] Calling setCompletedLocalSync({}, {})", self, epoch, diskState); + return delegate.setCompletedLocalSync(epoch, diskState); + } + + @Override + public AccordKeyspace.EpochDiskState markLocalSyncAck(Node.Id id, long epoch, AccordKeyspace.EpochDiskState diskState) { + logger.info("[node={}] Calling markLocalSyncAck({}, {}, {})", self, id, epoch, diskState); + return delegate.markLocalSyncAck(id, epoch, diskState); + } + + @Override + public AccordKeyspace.EpochDiskState saveTopology(Topology topology, AccordKeyspace.EpochDiskState diskState) { + logger.info("[node={}] Calling saveTopology({}, {})", self, topology.epoch(), diskState); + return delegate.saveTopology(topology, diskState); + } + + @Override + public AccordKeyspace.EpochDiskState markRemoteTopologySync(Node.Id id, long epoch, AccordKeyspace.EpochDiskState diskState) { + logger.info("[node={}] Calling markRemoteTopologySync({}, {}, {})", self, id, epoch, diskState); + return delegate.markRemoteTopologySync(id, epoch, diskState); + } + + @Override + public AccordKeyspace.EpochDiskState markClosed(Ranges ranges, long epoch, AccordKeyspace.EpochDiskState diskState) { + logger.info("[node={}] Calling markClosed({}, {}, {})", self, ranges, epoch, diskState); + return delegate.markClosed(ranges, epoch, diskState); + } + + @Override + public AccordKeyspace.EpochDiskState truncateTopologyUntil(long epoch, AccordKeyspace.EpochDiskState diskState) { + logger.info("[node={}] Calling truncateTopologyUntil({}, {})", self, epoch, diskState); + return delegate.truncateTopologyUntil(epoch, diskState); + } +} diff --git a/test/unit/org/apache/cassandra/service/accord/MockDiskStateManager.java b/test/unit/org/apache/cassandra/service/accord/MockDiskStateManager.java new file mode 100644 index 0000000000..9e37602634 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/accord/MockDiskStateManager.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import accord.local.Node; +import accord.primitives.Ranges; +import accord.topology.Topology; +import accord.utils.Invariants; + +import java.util.Set; + +public enum MockDiskStateManager implements AccordConfigurationService.DiskStateManager { + instance; + + @Override + public AccordKeyspace.EpochDiskState loadTopologies(AccordKeyspace.TopologyLoadConsumer consumer) { + return AccordKeyspace.EpochDiskState.EMPTY; + } + + @Override + public AccordKeyspace.EpochDiskState setNotifyingLocalSync(long epoch, Set<Node.Id> pending, AccordKeyspace.EpochDiskState diskState) { + return maybeUpdateMaxEpoch(diskState, epoch); + } + + @Override + public AccordKeyspace.EpochDiskState setCompletedLocalSync(long epoch, AccordKeyspace.EpochDiskState diskState) { + return maybeUpdateMaxEpoch(diskState, epoch); + } + + @Override + public AccordKeyspace.EpochDiskState markLocalSyncAck(Node.Id id, long epoch, AccordKeyspace.EpochDiskState diskState) { + return maybeUpdateMaxEpoch(diskState, epoch); + } + + @Override + public AccordKeyspace.EpochDiskState saveTopology(Topology topology, AccordKeyspace.EpochDiskState diskState) { + return maybeUpdateMaxEpoch(diskState, topology.epoch()); + } + + @Override + public AccordKeyspace.EpochDiskState markRemoteTopologySync(Node.Id node, long epoch, AccordKeyspace.EpochDiskState diskState) { + return maybeUpdateMaxEpoch(diskState, epoch); + } + + @Override + public AccordKeyspace.EpochDiskState markClosed(Ranges ranges, long epoch, AccordKeyspace.EpochDiskState diskState) { + return maybeUpdateMaxEpoch(diskState, epoch); + } + + @Override + public AccordKeyspace.EpochDiskState truncateTopologyUntil(long epoch, AccordKeyspace.EpochDiskState diskState) { + return maybeUpdateMaxEpoch(diskState, epoch); + } + + private static AccordKeyspace.EpochDiskState maybeUpdateMaxEpoch(AccordKeyspace.EpochDiskState diskState, long epoch) { + if (diskState.isEmpty()) + return AccordKeyspace.EpochDiskState.create(epoch); + Invariants.checkArgument(epoch >= diskState.minEpoch, "Epoch %d < %d (min)", epoch, diskState.minEpoch); + if (epoch > diskState.maxEpoch) + diskState = diskState.withNewMaxEpoch(epoch); + return diskState; + } +} diff --git a/test/unit/org/apache/cassandra/utils/StatefulRangeTreeTest.java b/test/unit/org/apache/cassandra/utils/StatefulRangeTreeTest.java index ceed706236..e3e471b550 100644 --- a/test/unit/org/apache/cassandra/utils/StatefulRangeTreeTest.java +++ b/test/unit/org/apache/cassandra/utils/StatefulRangeTreeTest.java @@ -21,7 +21,7 @@ package org.apache.cassandra.utils; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.TreeSet; @@ -99,7 +99,7 @@ public class StatefulRangeTreeTest @Override public Gen<Command<State, Sut, ?>> commands(State state) { - Map<Gen<Command<State, Sut, ?>>, Integer> possible = new HashMap<>(); + Map<Gen<Command<State, Sut, ?>>, Integer> possible = new LinkedHashMap<>(); possible.put(rs -> new Create(state.newRange(rs), SMALL_INT_GEN.nextInt(rs)), state.createWeight); possible.put(rs -> new Read(state.newRange(rs)), state.readWeight); possible.put(rs -> new KeyRead(IntKey.routing(state.tokenGen.nextInt(rs))), state.readWeight); @@ -108,15 +108,15 @@ public class StatefulRangeTreeTest possible.put(ignore -> Clear.instance, state.clearWeight); if (!state.uniqRanges.isEmpty()) { - possible.put(rs -> new Read(rs.pick(state.uniqRanges)), state.readWeight); + possible.put(rs -> new Read(rs.pickOrderedSet(state.uniqRanges)), state.readWeight); possible.put(rs -> { - Range range = rs.pick(state.uniqRanges); + Range range = rs.pickOrderedSet(state.uniqRanges); int token = rs.nextInt(((IntKey.Routing) range.start()).key, ((IntKey.Routing) range.end()).key) + 1; return new KeyRead(IntKey.routing(token)); }, state.readWeight); - possible.put(rs -> new RangeRead(rs.pick(state.uniqRanges)), state.readWeight); - possible.put(rs -> new Update(rs.pick(state.uniqRanges), SMALL_INT_GEN.nextInt(rs)), state.updateWeight); - possible.put(rs -> new Delete(rs.pick(state.uniqRanges)), state.deleteWeight); + possible.put(rs -> new RangeRead(rs.pickOrderedSet(state.uniqRanges)), state.readWeight); + possible.put(rs -> new Update(rs.pickOrderedSet(state.uniqRanges), SMALL_INT_GEN.nextInt(rs)), state.updateWeight); + possible.put(rs -> new Delete(rs.pickOrderedSet(state.uniqRanges)), state.deleteWeight); } return Gens.oneOf(possible); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org