This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 8de6be8166fc1163c9f4fc78b0c252084f67eb72 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Wed Oct 16 12:06:42 2024 +0100 Use ExclusiveSyncPoints to join a new topology For correctness, the dependencies we adopt on joining a new topology must exclude the possibility of respondents accepting additional transactions with a lower TxnId, so proxying on the existing `ExclusiveSyncPoint` mechanisms is logical for the time-being. This patch removes the `FetchMajorityDeps` logic in favour of simply waiting for a suitable `ExclusiveSyncPoint` to be proposed. patch by Benedict, reviewed by Alex Petrov for CASSANDRA-20056 --- .../src/main/java/accord/api/Scheduler.java | 7 + ...tyScheduling.java => DurabilityScheduling.java} | 100 +++++--- .../java/accord/impl/InMemoryCommandStore.java | 43 +--- .../main/java/accord/impl/MajorityDepsFetcher.java | 271 --------------------- .../src/main/java/accord/local/CommandStore.java | 97 +++++--- accord-core/src/main/java/accord/local/Node.java | 10 + .../main/java/accord/local/RedundantBefore.java | 32 ++- .../main/java/accord/local/RedundantStatus.java | 1 + .../main/java/accord/local/SafeCommandStore.java | 34 +-- .../main/java/accord/local/cfk/CommandsForKey.java | 11 +- .../main/java/accord/local/cfk/PostProcess.java | 3 +- .../java/accord/local/cfk/SafeCommandsForKey.java | 11 +- .../java/accord/local/cfk/UpdateUnmanagedMode.java | 24 ++ .../src/main/java/accord/local/cfk/Updating.java | 99 +++----- .../java/accord/utils/ThreadPoolScheduler.java | 6 + .../main/java/accord/utils/async/AsyncResults.java | 5 +- .../test/java/accord/impl/RemoteListenersTest.java | 9 +- .../src/test/java/accord/impl/basic/Cluster.java | 39 +-- .../accord/impl/basic/DelayedCommandStores.java | 8 - .../src/test/java/accord/impl/basic/Journal.java | 17 -- .../java/accord/impl/basic/RandomDelayQueue.java | 17 +- .../impl/basic/RecurringPendingRunnable.java | 4 +- .../test/java/accord/impl/list/ListRequest.java | 6 +- .../java/accord/local/cfk/CommandsForKeyTest.java | 10 +- accord-core/src/test/resources/burn-logback.xml | 2 + .../src/main/java/accord/maelstrom/Cluster.java | 19 +- 26 files changed, 335 insertions(+), 550 deletions(-) diff --git a/accord-core/src/main/java/accord/api/Scheduler.java b/accord-core/src/main/java/accord/api/Scheduler.java index 77a12c3d..f2511ae6 100644 --- a/accord-core/src/main/java/accord/api/Scheduler.java +++ b/accord-core/src/main/java/accord/api/Scheduler.java @@ -53,6 +53,12 @@ public interface Scheduler return CANCELLED; } + @Override + public Scheduled selfRecurring(Runnable run, long delay, TimeUnit units) + { + return CANCELLED; + } + @Override public void now(Runnable run) { @@ -70,5 +76,6 @@ public interface Scheduler Scheduled recurring(Runnable run, long delay, TimeUnit units); Scheduled once(Runnable run, long delay, TimeUnit units); + Scheduled selfRecurring(Runnable run, long delay, TimeUnit units); void now(Runnable run); } diff --git a/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java b/accord-core/src/main/java/accord/impl/DurabilityScheduling.java similarity index 89% rename from accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java rename to accord-core/src/main/java/accord/impl/DurabilityScheduling.java index 7aa69073..10934617 100644 --- a/accord-core/src/main/java/accord/impl/CoordinateDurabilityScheduling.java +++ b/accord-core/src/main/java/accord/impl/DurabilityScheduling.java @@ -29,6 +29,7 @@ import com.google.common.primitives.Ints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.api.ConfigurationService; import accord.api.Scheduler; import accord.coordinate.CoordinateGloballyDurable; import accord.coordinate.CoordinationFailed; @@ -46,12 +47,13 @@ import accord.topology.Topology; import accord.utils.Invariants; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; +import org.agrona.BitUtil; import static accord.coordinate.CoordinateShardDurable.coordinate; import static accord.coordinate.CoordinateSyncPoint.exclusiveSyncPoint; import static accord.primitives.Txn.Kind.ExclusiveSyncPoint; import static java.util.concurrent.TimeUnit.MICROSECONDS; -import static java.util.concurrent.TimeUnit.MINUTES; /** * Helper methods and classes to invoke coordination to propagate information about durability. @@ -78,9 +80,9 @@ import static java.util.concurrent.TimeUnit.MINUTES; * TODO (expected): do not start new ExclusiveSyncPoint if we have more than X already agreed and not yet applied * Didn't go with recurring because it doesn't play well with async execution of these tasks */ -public class CoordinateDurabilityScheduling +public class DurabilityScheduling implements ConfigurationService.Listener { - private static final Logger logger = LoggerFactory.getLogger(CoordinateDurabilityScheduling.class); + private static final Logger logger = LoggerFactory.getLogger(DurabilityScheduling.class); private final Node node; private Scheduler.Scheduled scheduled; @@ -129,9 +131,16 @@ public class CoordinateDurabilityScheduling private final Map<Range, ShardScheduler> shardSchedulers = new HashMap<>(); private int globalIndex; - private long nextGlobalSyncTimeMicros; + boolean started; volatile boolean stop; + @Override + public AsyncResult<Void> onTopologyUpdate(Topology topology, boolean isLoad, boolean startSync) + { + updateTopology(topology); + return AsyncResults.success(null); + } + private class ShardScheduler { Shard shard; @@ -196,6 +205,7 @@ public class CoordinateDurabilityScheduling if (defunct) return; + Invariants.checkState(index < numberOfSplits); long nowMicros = node.elapsed(MICROSECONDS); long microsOffset = (index * shardCycleTimeMicros) / numberOfSplits; long scheduleAt = cycleStartedAtMicros + microsOffset; @@ -206,7 +216,7 @@ public class CoordinateDurabilityScheduling if (numberOfSplits > targetShardSplits && index % 4 == 0) { index /= 4; - numberOfSplits /=4; + numberOfSplits /= 4; } scheduleAt(nowMicros, scheduleAt); } @@ -232,6 +242,7 @@ public class CoordinateDurabilityScheduling Range range; int nextIndex; { + Invariants.checkState(index < numberOfSplits); int i = index; Range selectRange = null; while (selectRange == null) @@ -241,12 +252,12 @@ public class CoordinateDurabilityScheduling } Runnable schedule = () -> { - // TODO (required): allocate stale HLC from a reservation of HLCs for this purpose + // TODO (expected): allocate stale HLC from a reservation of HLCs for this purpose TxnId syncId = node.nextTxnId(ExclusiveSyncPoint, Domain.Range); startShardSync(syncId, Ranges.of(range), nextIndex); }; if (scheduleAt <= nowMicros) schedule.run(); - else scheduled = node.scheduler().once(schedule, scheduleAt - nowMicros, MICROSECONDS); + else scheduled = node.scheduler().selfRecurring(schedule, scheduleAt - nowMicros, MICROSECONDS); } /** @@ -255,7 +266,7 @@ public class CoordinateDurabilityScheduling */ private void startShardSync(TxnId syncId, Ranges ranges, int nextIndex) { - scheduled = node.scheduler().once(() -> node.withEpoch(syncId.epoch(), (ignored, withEpochFailure) -> { + scheduled = node.scheduler().selfRecurring(() -> node.withEpoch(syncId.epoch(), (ignored, withEpochFailure) -> { if (withEpochFailure != null) { // don't wait on epoch failure - we aren't the cause of any problems @@ -295,7 +306,7 @@ public class CoordinateDurabilityScheduling private void coordinateShardDurableAfterExclusiveSyncPoint(Node node, SyncPoint<Range> exclusiveSyncPoint, int nextIndex) { - scheduled = node.scheduler().once(() -> { + scheduled = node.scheduler().selfRecurring(() -> { scheduled = null; node.commandStores().any().execute(() -> { coordinate(node, exclusiveSyncPoint) @@ -353,14 +364,14 @@ public class CoordinateDurabilityScheduling } } - public CoordinateDurabilityScheduling(Node node) + public DurabilityScheduling(Node node) { this.node = node; } public void setTargetShardSplits(int targetShardSplits) { - this.targetShardSplits = targetShardSplits; + this.targetShardSplits = BitUtil.findNextPositivePowerOfTwo(targetShardSplits); } public void setDefaultRetryDelay(long retryDelay, TimeUnit units) @@ -399,16 +410,21 @@ public class CoordinateDurabilityScheduling public synchronized void start() { Invariants.checkState(!stop); // cannot currently restart safely + started = true; + updateTopology(); long nowMicros = node.elapsed(MICROSECONDS); - setNextGlobalSyncTime(nowMicros); - scheduled = node.scheduler().recurring(this::run, 1L, MINUTES); + long scheduleAt = computeNextGlobalSyncTime(nowMicros); + scheduled = node.scheduler().selfRecurring(this::run, scheduleAt - nowMicros, MICROSECONDS); } - public void stop() + public synchronized void stop() { if (scheduled != null) scheduled.cancel(); stop = true; + for (ShardScheduler scheduler : shardSchedulers.values()) + scheduler.markDefunct(); + shardSchedulers.clear(); } /** @@ -419,17 +435,18 @@ public class CoordinateDurabilityScheduling if (stop) return; - // TODO (expected): invoke this as soon as topology is updated in topology manager - updateTopology(); - if (currentGlobalTopology == null || currentGlobalTopology.size() == 0) - return; - - // TODO (expected): schedule this directly based on the global sync frequency - this is an artefact of previously scheduling shard syncs as well long nowMicros = node.elapsed(MICROSECONDS); - if (nextGlobalSyncTimeMicros <= nowMicros) + try { + if (currentGlobalTopology == null || currentGlobalTopology.size() == 0) + return; + startGlobalSync(); - setNextGlobalSyncTime(nowMicros); + } + finally + { + long scheduleAt = computeNextGlobalSyncTime(nowMicros); + node.scheduler().selfRecurring(this::run, scheduleAt - nowMicros, MICROSECONDS); } } @@ -453,7 +470,15 @@ public class CoordinateDurabilityScheduling public synchronized void updateTopology() { Topology latestGlobal = node.topology().current(); - if (latestGlobal == currentGlobalTopology) + updateTopology(latestGlobal); + } + + private synchronized void updateTopology(Topology latestGlobal) + { + if (!started) + return; + + if (latestGlobal == currentGlobalTopology || (currentGlobalTopology != null && latestGlobal.epoch() < currentGlobalTopology.epoch())) return; Topology latestLocal = latestGlobal.forNode(node.id()); @@ -495,13 +520,10 @@ public class CoordinateDurabilityScheduling * It's assumed it is fine if nodes overlap or reorder or skip for whatever activity we are picking turns for as long as it is approximately * the right pacing. */ - private void setNextGlobalSyncTime(long nowMicros) + private long computeNextGlobalSyncTime(long nowMicros) { if (currentGlobalTopology == null) - { - nextGlobalSyncTimeMicros = nowMicros; - return; - } + return nowMicros + globalCycleTimeMicros; // How long it takes for all nodes to go once long totalRoundDuration = currentGlobalTopology.nodes().size() * globalCycleTimeMicros; @@ -516,6 +538,26 @@ public class CoordinateDurabilityScheduling if (targetTimeInCurrentRound < nowMicros) targetTime += totalRoundDuration; - nextGlobalSyncTimeMicros = targetTime; + return targetTime; + } + + @Override + public void onRemoteSyncComplete(Node.Id node, long epoch) + { + } + + @Override + public void truncateTopologyUntil(long epoch) + { + } + + @Override + public void onEpochClosed(Ranges ranges, long epoch) + { + } + + @Override + public void onEpochRedundant(Ranges ranges, long epoch) + { } } \ No newline at end of file diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index e01614c2..03a249a0 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -71,10 +71,10 @@ import accord.local.SafeCommandStore; import accord.local.cfk.CommandsForKey; import accord.primitives.AbstractRanges; import accord.primitives.AbstractUnseekableKeys; -import accord.primitives.Deps; import accord.primitives.PartialDeps; import accord.primitives.Participants; import accord.primitives.Range; +import accord.primitives.RangeDeps; import accord.primitives.Ranges; import accord.primitives.Routable.Domain; import accord.primitives.RoutableKey; @@ -91,6 +91,8 @@ import accord.utils.Invariants; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; import accord.utils.async.Cancellable; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; import static accord.local.KeyHistory.COMMANDS; import static accord.local.SafeCommandStore.TestDep.ANY_DEPS; @@ -1435,52 +1437,19 @@ public abstract class InMemoryCommandStore extends CommandStore } } - @VisibleForTesting - public void load(Deps loading) - { - registerHistoricalTransactions(loading, - ((key, txnId) -> { - executeInContext(InMemoryCommandStore.this, - PreLoadContext.contextFor(key, COMMANDS), - safeStore -> { - safeStore.get(key).registerHistorical(safeStore, txnId); - return null; - }); - })); - } - @Override - protected void registerHistoricalTransactions(Range range, Deps deps, SafeCommandStore safeStore) - { - registerHistoricalTransactions(deps, (key, txnId) -> safeStore.get(key).registerHistorical(safeStore, txnId)); - } - - private void registerHistoricalTransactions(Deps deps, BiConsumer<RoutingKey, TxnId> registerHistorical) + protected void registerTransitive(SafeCommandStore safeStore, RangeDeps rangeDeps) { RangesForEpoch rangesForEpoch = this.rangesForEpoch; Ranges allRanges = rangesForEpoch.all(); - deps.keyDeps.keys().forEach(allRanges, key -> { - deps.keyDeps.forEach(key, (txnId, txnIdx) -> { - // TODO (desired, efficiency): this can be made more efficient by batching by epoch - if (rangesForEpoch.coordinates(txnId).contains(key)) - return; // already coordinates, no need to replicate - // TODO (required): check this logic, esp. next line, matches C* - if (!rangesForEpoch.allSince(txnId.epoch()).contains(key)) - return; - - registerHistorical.accept(key, txnId); - }); - - }); TreeMap<TxnId, RangeCommand> rangeCommands = this.rangeCommands; TreeMap<TxnId, Ranges> historicalRangeCommands = historicalRangeCommands(); - deps.rangeDeps.forEachUniqueTxnId(allRanges, null, (ignore, txnId) -> { - + rangeDeps.forEachUniqueTxnId(allRanges, null, (ignore, txnId) -> { if (rangeCommands.containsKey(txnId)) return; - Ranges ranges = deps.rangeDeps.ranges(txnId); + Ranges ranges = rangeDeps.ranges(txnId); if (rangesForEpoch.coordinates(txnId).intersects(ranges)) return; // already coordinates, no need to replicate // TODO (required): check this logic, esp. next line, matches C* diff --git a/accord-core/src/main/java/accord/impl/MajorityDepsFetcher.java b/accord-core/src/main/java/accord/impl/MajorityDepsFetcher.java deleted file mode 100644 index dcb94801..00000000 --- a/accord-core/src/main/java/accord/impl/MajorityDepsFetcher.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package accord.impl; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import accord.api.Scheduler; -import accord.coordinate.CollectCalculatedDeps; -import accord.coordinate.CoordinationFailed; -import accord.local.CommandStore; -import accord.local.Node; -import accord.local.ShardDistributor; -import accord.primitives.FullRoute; -import accord.primitives.Range; -import accord.primitives.Ranges; -import accord.primitives.Timestamp; -import accord.primitives.TxnId; -import accord.utils.async.AsyncResult; - -import static accord.local.KeyHistory.COMMANDS; -import static accord.local.PreLoadContext.contextFor; -import static java.util.concurrent.TimeUnit.MICROSECONDS; - -/** - * Copied from CoordinateDurabilityScheduling - * TODO (required): deprecate in favour of piggy-backing on exclusive sync points - */ -public class MajorityDepsFetcher -{ - private static final Logger logger = LoggerFactory.getLogger(MajorityDepsFetcher.class); - - private final Node node; - - private int targetShardSplits = 64; - private long defaultRetryDelayMicros = TimeUnit.SECONDS.toMicros(1); - private long maxRetryDelayMicros = TimeUnit.MINUTES.toMicros(1); - private int maxNumberOfSplits = 1 << 10; - - private final Map<Range, ShardScheduler> shardSchedulers = new HashMap<>(); - - private class ShardScheduler - { - final CommandStore commandStore; - final Range range; - final List<AsyncResult.Settable<Void>> waiting; - - long epoch; - boolean defunct; - - int index; - int numberOfSplits; - Scheduler.Scheduled scheduled; - long retryDelayMicros = defaultRetryDelayMicros; - - private ShardScheduler(CommandStore commandStore, Range range, List<AsyncResult.Settable<Void>> waiting, long epoch) - { - this.commandStore = commandStore; - this.range = range; - this.waiting = waiting; - this.numberOfSplits = targetShardSplits; - this.epoch = epoch; - } - - void markDefunct() - { - this.defunct = true; - } - - void schedule() - { - synchronized (MajorityDepsFetcher.this) - { - if (defunct) - return; - - long nowMicros = node.elapsed(MICROSECONDS); - if (retryDelayMicros > defaultRetryDelayMicros) - retryDelayMicros = Math.max(defaultRetryDelayMicros, (long) (0.9 * retryDelayMicros)); - scheduleAt(nowMicros, nowMicros); - } - } - - void retry() - { - synchronized (MajorityDepsFetcher.this) - { - if (defunct) - return; - - long nowMicros = node.elapsed(MICROSECONDS); - long scheduleAt = nowMicros + retryDelayMicros; - retryDelayMicros += retryDelayMicros / 2; - if (retryDelayMicros > maxRetryDelayMicros) - { - retryDelayMicros = maxRetryDelayMicros; - } - if (numberOfSplits * 2 <= maxNumberOfSplits) - { - index *= 2; - numberOfSplits *= 2; - } - scheduleAt(nowMicros, scheduleAt); - } - } - - void scheduleAt(long nowMicros, long scheduleAt) - { - synchronized (MajorityDepsFetcher.this) - { - ShardDistributor distributor = node.commandStores().shardDistributor(); - Range range; - int nextIndex; - { - int i = index; - Range selectRange = null; - while (selectRange == null) - selectRange = distributor.splitRange(this.range, index, ++i, numberOfSplits); - range = selectRange; - nextIndex = i; - } - - Runnable schedule = () -> start(range, nextIndex); - if (scheduleAt <= nowMicros) schedule.run(); - else scheduled = node.scheduler().once(schedule, scheduleAt - nowMicros, MICROSECONDS); - } - } - - /** - * The first step for coordinating shard durable is to run an exclusive sync point - * the result of which can then be used to run - */ - private void start(Range slice, int nextIndex) - { - TxnId id = TxnId.fromValues(epoch - 1, 0, node.id()); - Timestamp before = Timestamp.minForEpoch(epoch); - - node.withEpoch(id.epoch(), (ignored, withEpochFailure) -> { - if (withEpochFailure != null) - { - // don't wait on epoch failure - we aren't the cause of any problems - start(slice, nextIndex); - Throwable wrapped = CoordinationFailed.wrap(withEpochFailure); - logger.trace("Exception waiting for epoch before coordinating exclusive sync point for local shard durability, epoch " + id.epoch(), wrapped); - node.agent().onUncaughtException(wrapped); - return; - } - scheduled = null; - FullRoute<Range> route = (FullRoute<Range>) node.computeRoute(id, Ranges.of(slice)); - logger.debug("Fetching deps to sync epoch {} for range {}", epoch, slice); - CollectCalculatedDeps.withCalculatedDeps(node, id, route, route, before, (deps, fail) -> { - if (fail != null) - { - logger.warn("Failed to fetch deps for syncing epoch {} for {}", epoch, slice, fail); - retry(); - } - else - { - // TODO (correctness) : PreLoadContext only works with Seekables, which doesn't allow mixing Keys and Ranges... But Deps has both Keys AND Ranges! - // ATM all known implementations store ranges in-memory, but this will not be true soon, so this will need to be addressed - commandStore.execute(contextFor(null, deps.keyDeps.keys(), COMMANDS), safeStore -> { - safeStore.registerHistoricalTransactions(epoch, slice, deps); - }).begin((success, fail2) -> { - if (fail2 != null) - { - retry(); - logger.warn("Failed to apply deps for syncing epoch {} for range {}", epoch, slice, fail2); - } - else - { - try - { - synchronized (MajorityDepsFetcher.this) - { - index = nextIndex; - if (index >= numberOfSplits) - { - waiting.forEach(w -> w.trySuccess(null)); - logger.info("Successfully fetched majority deps for {} at epoch {}", this.range, epoch); - defunct = true; - shardSchedulers.remove(this.range, ShardScheduler.this); - } - else - { - schedule(); - } - } - } - catch (Throwable t) - { - retry(); - logger.error("Unexpected exception handling durability scheduling callback; starting from scratch", t); - } - } - }); - } - }); - }); - } - } - - public MajorityDepsFetcher(Node node) - { - this.node = node; - } - - public synchronized void cancel(Range range, long epoch) - { - ShardScheduler scheduler = shardSchedulers.get(range); - if (scheduler == null || scheduler.epoch > epoch) - return; - - scheduler.markDefunct(); - shardSchedulers.remove(range); - } - - public void setTargetShardSplits(int targetShardSplits) - { - this.targetShardSplits = targetShardSplits; - } - - public void setDefaultRetryDelay(long retryDelay, TimeUnit units) - { - this.defaultRetryDelayMicros = units.toMicros(retryDelay); - } - - public void setMaxRetryDelay(long retryDelay, TimeUnit units) - { - this.maxRetryDelayMicros = units.toMicros(retryDelay); - } - - public synchronized void fetchMajorityDeps(CommandStore commandStore, Range range, long epoch, AsyncResult.Settable<Void> waiting) - { - ShardScheduler scheduler = shardSchedulers.get(range); - List<AsyncResult.Settable<Void>> waitingList = Collections.singletonList(waiting); - if (scheduler != null) - { - if (scheduler.epoch >= epoch) - return; - scheduler.markDefunct(); - waitingList = new ArrayList<>(waitingList); - waitingList.addAll(scheduler.waiting); - } - scheduler = new ShardScheduler(commandStore, range, waitingList, epoch); - scheduler.schedule(); - } - -} \ No newline at end of file diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 29edb281..76e83b04 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -25,9 +25,8 @@ import accord.api.DataStore; import javax.annotation.Nullable; import accord.api.Agent; -import accord.impl.MajorityDepsFetcher; import accord.local.CommandStores.RangesForEpoch; -import accord.primitives.Range; +import accord.primitives.RangeDeps; import accord.primitives.Routables; import accord.primitives.Unseekables; import accord.utils.async.AsyncChain; @@ -35,13 +34,10 @@ import accord.utils.async.AsyncChain; import accord.api.ConfigurationService.EpochReady; import accord.utils.DeterministicIdentitySet; import accord.utils.Invariants; -import accord.utils.async.AsyncChains; import accord.utils.async.AsyncResult; import java.util.AbstractMap.SimpleImmutableEntry; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Set; @@ -57,11 +53,11 @@ import com.google.common.collect.ImmutableSortedMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import accord.primitives.Deps; import accord.primitives.Ranges; import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.utils.async.AsyncResults; +import org.agrona.collections.LongHashSet; import static accord.api.ConfigurationService.EpochReady.DONE; import static accord.local.PreLoadContext.empty; @@ -174,6 +170,20 @@ public abstract class CommandStore implements AgentExecutor private final Set<Bootstrap> bootstraps = Collections.synchronizedSet(new DeterministicIdentitySet<>()); @Nullable private RejectBefore rejectBefore; + static class WaitingOnSync + { + final AsyncResults.SettableResult<Void> whenDone; + final Ranges allRanges; + Ranges ranges; + + WaitingOnSync(AsyncResults.SettableResult<Void> whenDone, Ranges ranges) + { + this.whenDone = whenDone; + this.allRanges = this.ranges = ranges; + } + } + private final TreeMap<Long, WaitingOnSync> waitingOnSync = new TreeMap<>(); + protected CommandStore(int id, NodeCommandStoreService node, Agent agent, @@ -238,7 +248,7 @@ public abstract class CommandStore implements AgentExecutor public abstract <T> AsyncChain<T> submit(PreLoadContext context, Function<? super SafeCommandStore, T> apply); public abstract void shutdown(); - protected abstract void registerHistoricalTransactions(Range range, Deps deps, SafeCommandStore safeStore); + protected abstract void registerTransitive(SafeCommandStore safeStore, RangeDeps deps); protected void unsafeSetRejectBefore(RejectBefore newRejectBefore) { @@ -470,34 +480,11 @@ public abstract class CommandStore implements AgentExecutor */ protected Supplier<EpochReady> sync(Node node, Ranges ranges, long epoch, boolean isLoad) { - return () -> syncInternal(node, ranges, epoch, isLoad); - } - - protected EpochReady syncInternal(Node node, Ranges ranges, long epoch, boolean isLoad) - { - AsyncResults.SettableResult<Void> whenDone = new AsyncResults.SettableResult<>(); - fetchMajorityDeps(whenDone, node, epoch, ranges); - return new EpochReady(epoch, DONE, whenDone, whenDone, whenDone); - } - - private MajorityDepsFetcher fetcher; - protected void cancelFetch(Range range, long epoch) - { - if (fetcher != null) - fetcher.cancel(range, epoch); - } - // TODO (required, correctness): replace with a simple wait on suitable exclusive sync point(s) - private void fetchMajorityDeps(AsyncResult.Settable<Void> coordination, Node node, long epoch, Ranges ranges) - { - if (fetcher == null) fetcher = new MajorityDepsFetcher(node); - List<AsyncResult.Settable<Void>> waiting = new ArrayList<>(); - for (Range range : ranges) - { - AsyncResult.Settable<Void> rangeComplete = AsyncResults.settable(); - fetcher.fetchMajorityDeps(this, range, epoch, rangeComplete); - waiting.add(rangeComplete); - } - AsyncChains.reduce(waiting, (a, b) -> null).begin(coordination.settingCallback()); + return () -> { + AsyncResults.SettableResult<Void> whenDone = new AsyncResults.SettableResult<>(); + waitingOnSync.put(epoch, new WaitingOnSync(whenDone, ranges)); + return new EpochReady(epoch, DONE, whenDone, whenDone, whenDone); + }; } Supplier<EpochReady> unbootstrap(long epoch, Ranges removedRanges) @@ -534,7 +521,7 @@ public abstract class CommandStore implements AgentExecutor public void markShardDurable(SafeCommandStore safeStore, TxnId globalSyncId, Ranges durableRanges) { final Ranges slicedRanges = durableRanges.slice(safeStore.ranges().allUntil(globalSyncId.epoch()), Minimal); - TxnId locallyRedundantBefore = safeStore.redundantBefore().minLocallyAppliedOrInvalidatedBefore(slicedRanges); + TxnId locallyRedundantBefore = safeStore.redundantBefore().min(slicedRanges, e -> e.locallyAppliedOrInvalidatedBefore); RedundantBefore addShardRedundant = RedundantBefore.create(slicedRanges, Long.MIN_VALUE, Long.MAX_VALUE, TxnId.NONE, globalSyncId, TxnId.NONE, TxnId.NONE); safeStore.upsertRedundantBefore(addShardRedundant); updatedRedundantBefore(safeStore, globalSyncId, slicedRanges); @@ -542,7 +529,12 @@ public abstract class CommandStore implements AgentExecutor if (locallyRedundantBefore.compareTo(globalSyncId) < 0) { - logger.warn("Trying to markShardDurable we have not yet caught-up to locally. Local: {}, Global: {}, Ranges: {}", locallyRedundantBefore, globalSyncId, slicedRanges); + // TODO (expected): if bootstrapping only part of the range, mark the rest for GC; or relax this as can safely GC behind bootstrap + TxnId maxBootstrap = safeStore.redundantBefore().max(slicedRanges, e -> e.bootstrappedAt); + if (maxBootstrap.compareTo(globalSyncId) >= 0) + logger.info("Ignoring markShardDurable for a point we are bootstrapping. Bootstrapping: {}, Global: {}, Ranges: {}", maxBootstrap, globalSyncId, slicedRanges); + else + logger.warn("Trying to markShardDurable a point we have not yet caught-up to locally. Local: {}, Global: {}, Ranges: {}", locallyRedundantBefore, globalSyncId, slicedRanges); return; } @@ -564,6 +556,37 @@ public abstract class CommandStore implements AgentExecutor { } + protected void markSynced(TxnId syncId, Ranges ranges) + { + if (waitingOnSync.isEmpty()) + return; + + LongHashSet remove = null; + for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet()) + { + if (e.getKey() > syncId.epoch()) + break; + + Ranges remaining = e.getValue().ranges; + Ranges synced = remaining.slice(ranges, Minimal); + e.getValue().ranges = remaining = remaining.without(ranges); + if (e.getValue().ranges.isEmpty()) + { + logger.info("Completed full sync for {} on epoch {} using {}", e.getValue().allRanges, e.getKey(), syncId); + e.getValue().whenDone.trySuccess(null); + if (remove == null) + remove = new LongHashSet(); + remove.add(e.getKey()); + } + else + { + logger.info("Completed partial sync for {} on epoch {} using {}; {} still to sync", synced, e.getKey(), syncId, remaining); + } + } + if (remove != null) + remove.forEach(waitingOnSync::remove); + } + // TODO (expected): we can immediately truncate dependencies locally once an exclusiveSyncPoint applies, we don't need to wait for the whole shard // TODO (required): integrate validation of staleness with implementation (e.g. C* should know it has been marked stale) // also: we no longer expect epochs that are losing a range to be marked stale, make sure logic reflects this diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index f3f46c82..15347737 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -62,6 +62,7 @@ import accord.coordinate.CoordinationFailed; import accord.coordinate.MaybeRecover; import accord.coordinate.Outcome; import accord.coordinate.RecoverWithRoute; +import accord.impl.DurabilityScheduling; import accord.messages.Callback; import accord.messages.Reply; import accord.messages.ReplyContext; @@ -170,6 +171,7 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ // TODO (expected, consider): this really needs to be thought through some more, as it needs to be per-instance in some cases, and per-node in others private final Scheduler scheduler; + private final DurabilityScheduling durabilityScheduling; // TODO (expected, liveness): monitor the contents of this collection for stalled coordination, and excise them private final Map<TxnId, AsyncResult<? extends Outcome>> coordinating = new ConcurrentHashMap<>(); @@ -202,10 +204,12 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ this.random = random; this.persistDurableBefore = new PersistentField<>(() -> durableBefore, DurableBefore::merge, durableBeforePersister, this::setPersistedDurableBefore); this.commandStores = factory.create(this, agent, dataSupplier.get(), random.fork(), shardDistributor, progressLogFactory.apply(this), localListenersFactory.apply(this)); + this.durabilityScheduling = new DurabilityScheduling(this); // TODO (desired): make frequency configurable scheduler.recurring(() -> commandStores.forEachCommandStore(store -> store.progressLog.maybeNotify()), 1, SECONDS); scheduler.recurring(timeouts::maybeNotify, 100, MILLISECONDS); configService.registerListener(this); + configService.registerListener(durabilityScheduling); } public LocalConfig localConfig() @@ -223,6 +227,11 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ persistDurableBefore.load(); } + public DurabilityScheduling durabilityScheduling() + { + return durabilityScheduling; + } + /** * This starts the node for tests and makes sure that the provided topology is acknowledged correctly. This method is not * safe for production systems as it doesn't handle restarts and partially acknowledged histories @@ -232,6 +241,7 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ public AsyncResult<Void> unsafeStart() { EpochReady ready = onTopologyUpdateInternal(configService.currentTopology(), false, false); + durabilityScheduling.updateTopology(); ready.coordination.addCallback(() -> this.topology.onEpochSyncComplete(id, topology.epoch())); configService.acknowledgeEpoch(ready, false); return ready.metadata; diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java b/accord-core/src/main/java/accord/local/RedundantBefore.java index f859f7bf..bcd6bcc1 100644 --- a/accord-core/src/main/java/accord/local/RedundantBefore.java +++ b/accord-core/src/main/java/accord/local/RedundantBefore.java @@ -21,6 +21,7 @@ package accord.local; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.function.Function; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -152,6 +153,8 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> Invariants.checkArgument(locallyDecidedAndAppliedOrInvalidatedBefore.equals(TxnId.NONE) || locallyDecidedAndAppliedOrInvalidatedBefore.domain().isRange()); Invariants.checkArgument(shardAppliedOrInvalidatedBefore.equals(TxnId.NONE) || shardAppliedOrInvalidatedBefore.domain().isRange()); Invariants.checkArgument(gcBefore.equals(TxnId.NONE) || gcBefore.domain().isRange()); + Invariants.checkArgument(locallyDecidedAndAppliedOrInvalidatedBefore.compareTo(locallyAppliedOrInvalidatedBefore) <= 0); + Invariants.checkArgument(shardAppliedOrInvalidatedBefore.compareTo(shardOnlyAppliedOrInvalidatedBefore) <= 0); Invariants.checkArgument(gcBefore.compareTo(shardAppliedOrInvalidatedBefore) <= 0); } @@ -215,6 +218,7 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> TxnId locallyAppliedOrInvalidatedBefore = TxnId.nonNullOrMax(this.locallyAppliedOrInvalidatedBefore, newGcBefore); TxnId shardAppliedOrInvalidatedBefore = TxnId.nonNullOrMax(this.shardAppliedOrInvalidatedBefore, newGcBefore); + TxnId shardOnlyAppliedOrInvalidatedBefore = TxnId.nonNullOrMax(this.shardOnlyAppliedOrInvalidatedBefore, newGcBefore); return new Entry(range, startOwnershipEpoch, endOwnershipEpoch, locallyAppliedOrInvalidatedBefore, locallyDecidedAndAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, shardOnlyAppliedOrInvalidatedBefore, newGcBefore, bootstrappedAt, staleUntilAtLeast); } @@ -299,26 +303,20 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> return safeToRead; } - static TxnId minGcBefore(Entry entry, @Nullable TxnId minGcBefore) + static TxnId min(Entry entry, @Nullable TxnId min, Function<Entry, TxnId> get) { if (entry == null) - return minGcBefore; + return min; - if (minGcBefore == null) - return entry.gcBefore; - - return TxnId.min(minGcBefore, entry.gcBefore); + return TxnId.nonNullOrMin(min, get.apply(entry)); } - static TxnId minLocallyAppliedOrInvalidatedBefore(Entry entry, @Nullable TxnId minLocallyAppliedOrInvalidatedBefore) + static TxnId max(Entry entry, @Nullable TxnId max, Function<Entry, TxnId> get) { if (entry == null) - return minLocallyAppliedOrInvalidatedBefore; - - if (minLocallyAppliedOrInvalidatedBefore == null) - return entry.locallyAppliedOrInvalidatedBefore; + return max; - return TxnId.min(minLocallyAppliedOrInvalidatedBefore, entry.locallyAppliedOrInvalidatedBefore); + return TxnId.nonNullOrMax(max, get.apply(entry)); } static Ranges expectToExecute(Entry entry, @Nonnull Ranges executeRanges, TxnId txnId, @Nullable Timestamp executeAt) @@ -596,14 +594,14 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> return foldl(ranges, Entry::validateSafeToRead, ranges, forBootstrapAt, null, r -> false); } - public TxnId minGcBefore(Routables<?> participants) + public TxnId min(Routables<?> participants, Function<Entry, TxnId> get) { - return TxnId.nonNullOrMax(TxnId.NONE, foldl(participants, Entry::minGcBefore, null, ignore -> false)); + return TxnId.nonNullOrMax(TxnId.NONE, foldl(participants, Entry::min, null, get, ignore -> false)); } - public TxnId minLocallyAppliedOrInvalidatedBefore(Routables<?> participants) + public TxnId max(Routables<?> participants, Function<Entry, TxnId> get) { - return TxnId.nonNullOrMax(TxnId.NONE, foldl(participants, Entry::minLocallyAppliedOrInvalidatedBefore, null, ignore -> false)); + return foldl(participants, Entry::max, TxnId.NONE, get, ignore -> false); } /** @@ -681,7 +679,7 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> return new Entry(a.range.newRange( a.range.start().compareTo(b.range.start()) <= 0 ? a.range.start() : b.range.start(), a.range.end().compareTo(b.range.end()) >= 0 ? a.range.end() : b.range.end() - ), a.startOwnershipEpoch, a.endOwnershipEpoch, a.locallyDecidedAndAppliedOrInvalidatedBefore, a.locallyAppliedOrInvalidatedBefore, a.shardAppliedOrInvalidatedBefore, a.shardOnlyAppliedOrInvalidatedBefore, a.gcBefore, a.bootstrappedAt, a.staleUntilAtLeast); + ), a.startOwnershipEpoch, a.endOwnershipEpoch, a.locallyAppliedOrInvalidatedBefore, a.locallyDecidedAndAppliedOrInvalidatedBefore, a.shardAppliedOrInvalidatedBefore, a.shardOnlyAppliedOrInvalidatedBefore, a.gcBefore, a.bootstrappedAt, a.staleUntilAtLeast); } @Override diff --git a/accord-core/src/main/java/accord/local/RedundantStatus.java b/accord-core/src/main/java/accord/local/RedundantStatus.java index e3338cd8..3ecdc073 100644 --- a/accord-core/src/main/java/accord/local/RedundantStatus.java +++ b/accord-core/src/main/java/accord/local/RedundantStatus.java @@ -93,6 +93,7 @@ public enum RedundantStatus NOT_OWNED.merge.put(SHARD_REDUNDANT, SHARD_REDUNDANT); WAS_OWNED.merge = new EnumMap<>(RedundantStatus.class); WAS_OWNED.merge.put(NOT_OWNED, NOT_OWNED); + WAS_OWNED.merge.put(WAS_OWNED, WAS_OWNED); WAS_OWNED.merge.put(LIVE, LIVE); WAS_OWNED.merge.put(PARTIALLY_PRE_BOOTSTRAP_OR_STALE, PARTIALLY_PRE_BOOTSTRAP_OR_STALE); WAS_OWNED.merge.put(PRE_BOOTSTRAP_OR_STALE, PRE_BOOTSTRAP_OR_STALE); diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index 5c80701b..5de54fed 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -29,12 +29,10 @@ import accord.api.ProgressLog; import accord.api.RoutingKey; import accord.local.cfk.CommandsForKey; import accord.local.cfk.SafeCommandsForKey; +import accord.local.cfk.UpdateUnmanagedMode; import accord.primitives.AbstractUnseekableKeys; -import accord.primitives.Deps; import accord.primitives.Participants; -import accord.primitives.Range; import accord.primitives.Ranges; -import accord.primitives.Routable.Domain; import accord.primitives.RoutingKeys; import accord.primitives.SaveStatus; import accord.primitives.Status; @@ -48,6 +46,8 @@ import accord.utils.Invariants; import static accord.local.KeyHistory.COMMANDS; import static accord.local.RedundantBefore.PreBootstrapOrStale.FULLY; +import static accord.local.cfk.UpdateUnmanagedMode.REGISTER; +import static accord.primitives.Routable.Domain.Range; import static accord.primitives.Routables.Slice.Minimal; import static accord.primitives.SaveStatus.Applied; @@ -200,7 +200,7 @@ public abstract class SafeCommandStore public void updateExclusiveSyncPoint(Command prev, Command updated) { - if (updated.txnId().kind() != Kind.ExclusiveSyncPoint || updated.txnId().domain() != Domain.Range) return; + if (updated.txnId().kind() != Kind.ExclusiveSyncPoint || updated.txnId().domain() != Range) return; if (updated.route() == null) return; SaveStatus oldSaveStatus = prev == null ? SaveStatus.Uninitialised : prev.saveStatus(); @@ -265,11 +265,6 @@ public abstract class SafeCommandStore commandStore().unsafeSetRangesForEpoch(rangesForEpoch); } - public void registerHistoricalTransactions(long epoch, Range range, Deps deps) - { - commandStore().registerHistoricalTransactions(range, deps, this); - } - public void updateCommandsForKey(Command prev, Command next) { if (!CommandsForKey.needsUpdate(prev, next)) @@ -278,7 +273,10 @@ public abstract class SafeCommandStore TxnId txnId = next.txnId(); if (CommandsForKey.manages(txnId)) updateManagedCommandsForKey(this, prev, next); if (!CommandsForKey.managesExecution(txnId) && next.hasBeen(Status.Stable) && !next.hasBeen(Status.Truncated) && !prev.hasBeen(Status.Stable)) - updateUnmanagedExecutionCommandsForKey(this, next); + updateUnmanagedCommandsForKey(this, next, REGISTER); + // TODO (expected): register deps during Accept phase to more quickly sync epochs +// else if (txnId.is(Range) && next.known().deps.hasProposedOrDecidedDeps()) +// updateUnmanagedCommandsForKey(this, next, REGISTER_DEPS_ONLY); } private static void updateManagedCommandsForKey(SafeCommandStore safeStore, Command prev, Command next) @@ -305,7 +303,7 @@ public abstract class SafeCommandStore } } - private static void updateUnmanagedExecutionCommandsForKey(SafeCommandStore safeStore, Command next) + private static void updateUnmanagedCommandsForKey(SafeCommandStore safeStore, Command next, UpdateUnmanagedMode mode) { TxnId txnId = next.txnId(); // TODO (required): use StoreParticipants.executes() @@ -320,19 +318,25 @@ public abstract class SafeCommandStore for (RoutingKey key : keys) { if (!waitingOn.isWaitingOnKey(index++)) continue; - safeStore.get(key).registerUnmanaged(safeStore, safeStore.get(txnId)); + safeStore.get(key).registerUnmanaged(safeStore, safeStore.get(txnId), mode); + } + + if (next.txnId().is(Range)) + { + CommandStore commandStore = safeStore.commandStore(); + Ranges ranges = next.participants().touches.toRanges(); + commandStore.registerTransitive(safeStore, next.partialDeps().rangeDeps); + commandStore.markSynced(txnId, ranges); } } else { safeStore = safeStore; - safeStore.commandStore().execute(context, safeStore0 -> updateUnmanagedExecutionCommandsForKey(safeStore0, next)) + safeStore.commandStore().execute(context, safeStore0 -> updateUnmanagedCommandsForKey(safeStore0, next, mode)) .begin(safeStore.commandStore().agent); } } - - /** * Visits keys first and then ranges, both in ascending order. * Within each key or range visits all visible txnids needed for the given scope in ascending order of queried timestamp. diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java index a5e5d056..4ee98a64 100644 --- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java +++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java @@ -69,6 +69,7 @@ import static accord.local.cfk.Pruning.isWaitingOnPruned; import static accord.local.cfk.Pruning.loadingPrunedFor; import static accord.local.cfk.Pruning.pruneById; import static accord.local.cfk.Pruning.prunedBeforeId; +import static accord.local.cfk.UpdateUnmanagedMode.UPDATE; import static accord.local.cfk.Updating.insertOrUpdate; import static accord.local.SafeCommandStore.TestDep.ANY_DEPS; import static accord.local.SafeCommandStore.TestDep.WITH; @@ -1414,9 +1415,10 @@ public class CommandsForKey extends CommandsForKeyUpdate implements CommandsSumm return new CommandsForKey(key, boundsInfo, byId, committedByExecuteAt, minUndecidedById, maxAppliedWriteByExecuteAt, newLoadingPruned, prunedBeforeById, unmanageds); } - CommandsForKeyUpdate registerUnmanaged(SafeCommand safeCommand) + CommandsForKeyUpdate registerUnmanaged(SafeCommand safeCommand, UpdateUnmanagedMode mode) { - return Updating.updateUnmanaged(this, safeCommand, true, null); + Invariants.checkState(mode != UPDATE); + return Updating.updateUnmanaged(this, safeCommand, mode, null); } void postProcess(SafeCommandStore safeStore, CommandsForKey prevCfk, @Nullable Command command, NotifySink notifySink) @@ -1700,11 +1702,6 @@ public class CommandsForKey extends CommandsForKeyUpdate implements CommandsSumm return Pruning.maybePrune(this, pruneInterval, minHlcDelta); } - CommandsForKeyUpdate registerHistorical(TxnId txnId) - { - return Updating.registerHistorical(this, txnId); - } - int insertPos(Timestamp timestamp) { return insertPos(byId, timestamp); diff --git a/accord-core/src/main/java/accord/local/cfk/PostProcess.java b/accord-core/src/main/java/accord/local/cfk/PostProcess.java index 80c71666..8b21352b 100644 --- a/accord-core/src/main/java/accord/local/cfk/PostProcess.java +++ b/accord-core/src/main/java/accord/local/cfk/PostProcess.java @@ -46,6 +46,7 @@ import static accord.local.cfk.CommandsForKey.InternalStatus.INVALID_OR_TRUNCATE import static accord.local.cfk.CommandsForKey.InternalStatus.STABLE; import static accord.local.cfk.CommandsForKey.Unmanaged.Pending.APPLY; import static accord.local.cfk.CommandsForKey.maxContiguousManagedAppliedIndex; +import static accord.local.cfk.UpdateUnmanagedMode.UPDATE; import static accord.local.cfk.Updating.updateUnmanaged; import static accord.local.cfk.Updating.updateUnmanagedAsync; import static accord.local.cfk.Utils.findApply; @@ -198,7 +199,7 @@ abstract class PostProcess SafeCommand safeCommand = safeStore.ifLoadedAndInitialised(txnId); if (safeCommand != null) { - CommandsForKeyUpdate update = updateUnmanaged(cfk, safeCommand, false, addUnmanageds); + CommandsForKeyUpdate update = updateUnmanaged(cfk, safeCommand, UPDATE, addUnmanageds); if (update != cfk) { Invariants.checkState(update.cfk() == cfk); diff --git a/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java b/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java index 18e24219..f2e4c1a9 100644 --- a/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java +++ b/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java @@ -28,7 +28,6 @@ import accord.local.RedundantBefore; import accord.local.SafeCommand; import accord.local.SafeCommandStore; import accord.primitives.Status; -import accord.primitives.TxnId; public abstract class SafeCommandsForKey implements SafeState<CommandsForKey> { @@ -88,16 +87,10 @@ public abstract class SafeCommandsForKey implements SafeState<CommandsForKey> updateCfk.postProcess(safeStore, prevCfk, command, notifySink); } - public void registerUnmanaged(SafeCommandStore safeStore, SafeCommand unmanaged) + public void registerUnmanaged(SafeCommandStore safeStore, SafeCommand unmanaged, UpdateUnmanagedMode mode) { CommandsForKey prevCfk = current(); - update(safeStore, null, prevCfk, prevCfk.registerUnmanaged(unmanaged)); - } - - public void registerHistorical(SafeCommandStore safeStore, TxnId txnId) - { - CommandsForKey prevCfk = current(); - update(safeStore, null, prevCfk, prevCfk.registerHistorical(txnId)); + update(safeStore, null, prevCfk, prevCfk.registerUnmanaged(unmanaged, mode)); } public void updateRedundantBefore(SafeCommandStore safeStore, RedundantBefore.Entry redundantBefore) diff --git a/accord-core/src/main/java/accord/local/cfk/UpdateUnmanagedMode.java b/accord-core/src/main/java/accord/local/cfk/UpdateUnmanagedMode.java new file mode 100644 index 00000000..c5749bff --- /dev/null +++ b/accord-core/src/main/java/accord/local/cfk/UpdateUnmanagedMode.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.local.cfk; + +public enum UpdateUnmanagedMode +{ + REGISTER_DEPS_ONLY, REGISTER, UPDATE +} \ No newline at end of file diff --git a/accord-core/src/main/java/accord/local/cfk/Updating.java b/accord-core/src/main/java/accord/local/cfk/Updating.java index c4c5f966..97adbb49 100644 --- a/accord-core/src/main/java/accord/local/cfk/Updating.java +++ b/accord-core/src/main/java/accord/local/cfk/Updating.java @@ -49,15 +49,15 @@ import accord.utils.SortedCursor; import static accord.local.KeyHistory.COMMANDS; import static accord.local.cfk.CommandsForKey.InternalStatus.APPLIED; import static accord.local.cfk.CommandsForKey.InternalStatus.COMMITTED; -import static accord.local.cfk.CommandsForKey.InternalStatus.HISTORICAL; import static accord.local.cfk.CommandsForKey.InternalStatus.INVALID_OR_TRUNCATED_OR_PRUNED; import static accord.local.cfk.CommandsForKey.InternalStatus.TRANSITIVELY_KNOWN; import static accord.local.cfk.CommandsForKey.Unmanaged.Pending.APPLY; import static accord.local.cfk.CommandsForKey.Unmanaged.Pending.COMMIT; import static accord.local.cfk.CommandsForKey.reportLinearizabilityViolations; import static accord.local.cfk.CommandsForKey.mayExecute; -import static accord.local.cfk.Pruning.loadPruned; import static accord.local.cfk.Pruning.loadingPrunedFor; +import static accord.local.cfk.UpdateUnmanagedMode.REGISTER_DEPS_ONLY; +import static accord.local.cfk.UpdateUnmanagedMode.UPDATE; import static accord.local.cfk.Utils.insertMissing; import static accord.local.cfk.Utils.mergeAndFilterMissing; import static accord.local.cfk.Utils.missingTo; @@ -721,18 +721,25 @@ class Updating static CommandsForKeyUpdate updateUnmanaged(CommandsForKey cfk, SafeCommand safeCommand) { - return Updating.updateUnmanaged(cfk, safeCommand, false, null); + return Updating.updateUnmanaged(cfk, safeCommand, UPDATE, null); + } + + static CommandsForKeyUpdate registerDependencies(CommandsForKey cfk, SafeCommand safeCommand) + { + return Updating.updateUnmanaged(cfk, safeCommand, REGISTER_DEPS_ONLY, null); } /** - * Three modes of operation: - * - {@code register}: inserts any missing dependencies from safeCommand into the collection; may return CommandsForKeyUpdate - * - {@code !register, update == null}: fails if any dependencies are missing; always returns a CommandsForKey - * - {@code !register && update != null}: fails if any dependencies are missing; always returns the original CommandsForKey, and maybe adds a new Unmanaged to {@code update} + * Four modes of operation: + * - {@code REGISTER_DEPS_ONLY}: inserts any missing dependencies into the collection; may return CommandsForKeyUpdate for loading pruned commands + * - {@code REGISTER}: inserts any missing dependencies into the collection and inserts the unmanaged command; may return CommandsForKeyUpdate + * - {@code UPDATE, update == null}: fails if any dependencies are missing; always returns a CommandsForKey + * - {@code UPDATE && update != null}: fails if any dependencies are missing; always returns the original CommandsForKey, and maybe adds a new Unmanaged to {@code update} */ - static CommandsForKeyUpdate updateUnmanaged(CommandsForKey cfk, SafeCommand safeCommand, boolean register, @Nullable List<CommandsForKey.Unmanaged> update) + static CommandsForKeyUpdate updateUnmanaged(CommandsForKey cfk, SafeCommand safeCommand, UpdateUnmanagedMode mode, @Nullable List<CommandsForKey.Unmanaged> update) { - Invariants.checkArgument(!register || update == null); + boolean register = mode != UPDATE; + Invariants.checkArgument(mode == UPDATE || update == null); if (safeCommand.current().hasBeen(Status.Truncated)) return cfk; @@ -851,34 +858,38 @@ class Updating } cachedTxnIds().discard(missing, clearMissingCount); - CommandsForKey.Unmanaged newPendingRecord; - if (waitingToApply) + CommandsForKey.Unmanaged[] newUnmanaged = cfk.unmanageds; + if (mode != REGISTER_DEPS_ONLY) { - if (executesAt instanceof TxnInfo) - executesAt = ((TxnInfo) executesAt).plainExecuteAt(); - - if (waitingTxnId.awaitsOnlyDeps() && executesAt != null) + CommandsForKey.Unmanaged newPendingRecord; + if (waitingToApply) { - if (executesAt.compareTo(command.waitingOn.executeAtLeast(Timestamp.NONE)) > 0) + if (executesAt instanceof TxnInfo) + executesAt = ((TxnInfo) executesAt).plainExecuteAt(); + + if (waitingTxnId.awaitsOnlyDeps() && executesAt != null) { - Command.WaitingOn.Update waitingOn = new Command.WaitingOn.Update(command.waitingOn); - waitingOn.updateExecuteAtLeast(executesAt); - safeCommand.updateWaitingOn(waitingOn); + if (executesAt.compareTo(command.waitingOn.executeAtLeast(Timestamp.NONE)) > 0) + { + Command.WaitingOn.Update waitingOn = new Command.WaitingOn.Update(command.waitingOn); + waitingOn.updateExecuteAtLeast(executesAt); + safeCommand.updateWaitingOn(waitingOn); + } } + + newPendingRecord = new CommandsForKey.Unmanaged(APPLY, command.txnId(), executesAt); } + else newPendingRecord = new CommandsForKey.Unmanaged(COMMIT, command.txnId(), txnIds.get(txnIds.size() - 1)); - newPendingRecord = new CommandsForKey.Unmanaged(APPLY, command.txnId(), executesAt); - } - else newPendingRecord = new CommandsForKey.Unmanaged(COMMIT, command.txnId(), txnIds.get(txnIds.size() - 1)); + if (update != null) + { + update.add(newPendingRecord); + return cfk; + } - if (update != null) - { - update.add(newPendingRecord); - return cfk; + newUnmanaged = SortedArrays.insert(cfk.unmanageds, newPendingRecord, CommandsForKey.Unmanaged[]::new); } - CommandsForKey.Unmanaged[] newUnmanaged = SortedArrays.insert(cfk.unmanageds, newPendingRecord, CommandsForKey.Unmanaged[]::new); - CommandsForKey result; if (newById == byId) result = new CommandsForKey(cfk, newLoadingPruned, newUnmanaged); else @@ -897,36 +908,4 @@ class Updating return new CommandsForKeyUpdate.CommandsForKeyUpdateWithPostProcess(cfk, new PostProcess.NotifyNotWaiting(null, new TxnId[] { safeCommand.txnId() })); } - - static CommandsForKeyUpdate registerHistorical(CommandsForKey cfk, TxnId txnId) - { - if (txnId.compareTo(cfk.redundantBefore()) < 0) - return cfk; - - int i = Arrays.binarySearch(cfk.byId, txnId); - if (i >= 0) - { - if (cfk.byId[i].status().compareTo(HISTORICAL) >= 0) - return cfk; - return cfk.update(i, txnId, cfk.byId[i], TxnInfo.create(txnId, HISTORICAL, cfk.mayExecute(txnId), txnId, Ballot.ZERO), false, null); - } - else if (txnId.compareTo(cfk.prunedBefore()) >= 0) - { - return cfk.insert(-1 - i, txnId, TxnInfo.create(txnId, HISTORICAL, cfk.mayExecute(txnId), txnId, Ballot.ZERO), false, null); - } - else if (txnId.compareTo(cfk.safelyPrunedBefore()) < 0) - { - return cfk; - } - else - { - TxnId[] loadingPrunedFor = loadingPrunedFor(cfk.loadingPruned, txnId, null); - if (loadingPrunedFor != null && Arrays.binarySearch(loadingPrunedFor, txnId) >= 0) - return cfk; - - TxnId[] txnIdArray = new TxnId[] { txnId }; - Object[] newLoadingPruned = loadPruned(cfk.loadingPruned, txnIdArray, NO_TXNIDS); - return PostProcess.LoadPruned.load(txnIdArray, cfk.update(newLoadingPruned)); - } - } } diff --git a/accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java b/accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java index 76009851..f26a94fd 100644 --- a/accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java +++ b/accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java @@ -87,6 +87,12 @@ public class ThreadPoolScheduler implements Scheduler return new FutureAsScheduled(exec.schedule(wrap(run), delay, units)); } + @Override + public Scheduled selfRecurring(Runnable run, long delay, TimeUnit units) + { + return once(run, delay, units); + } + @Override public void now(Runnable run) { diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResults.java b/accord-core/src/main/java/accord/utils/async/AsyncResults.java index 0382d43b..9406781f 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncResults.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncResults.java @@ -33,7 +33,7 @@ import static accord.utils.Invariants.illegalState; public class AsyncResults { - public static final AsyncResult<Void> SUCCESS_VOID = success(null); + public static final AsyncResult SUCCESS_NULL = new Immediate<>(null); private AsyncResults() {} @@ -340,6 +340,9 @@ public class AsyncResults public static <V> AsyncResult<V> success(V value) { + if (value == null) + return SUCCESS_NULL; + return new Immediate<>(value); } diff --git a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java index 80d26851..dcaf36bd 100644 --- a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java +++ b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java @@ -51,11 +51,10 @@ import accord.local.PreLoadContext; import accord.local.RedundantBefore; import accord.local.SafeCommand; import accord.local.SafeCommandStore; -import accord.primitives.Range; +import accord.primitives.RangeDeps; import accord.primitives.SaveStatus; import accord.primitives.Status.Durability; import accord.local.cfk.SafeCommandsForKey; -import accord.primitives.Deps; import accord.primitives.Route; import accord.primitives.Timestamp; import accord.primitives.Txn; @@ -65,6 +64,8 @@ import accord.utils.AccordGens; import accord.utils.RandomSource; import accord.utils.RandomTestRunner; import accord.utils.async.AsyncChain; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; import org.agrona.collections.IntHashSet; import org.agrona.collections.ObjectHashSet; @@ -399,8 +400,7 @@ public class RemoteListenersTest @Override public AsyncChain<Void> execute(PreLoadContext context, Consumer<? super SafeCommandStore> consumer) { return null; } @Override public <T> AsyncChain<T> submit(PreLoadContext context, Function<? super SafeCommandStore, T> apply) { return null; } @Override public void shutdown() {} - @Override protected void registerHistoricalTransactions(Range range, Deps deps, SafeCommandStore safeStore) {} - + @Override protected void registerTransitive(SafeCommandStore safeStore, RangeDeps deps) { } @Override public <T> AsyncChain<T> submit(Callable<T> task) { return null; } } @@ -434,6 +434,5 @@ public class RemoteListenersTest @Override public ProgressLog progressLog() { return null; } @Override public NodeCommandStoreService node() { return null; } @Override public CommandStores.RangesForEpoch ranges() { return null; } - @Override public void registerHistoricalTransactions(long epoch, Range range, Deps deps) { } } } diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index 5c23fd18..fbe83707 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -63,7 +63,7 @@ import accord.coordinate.Invalidated; import accord.coordinate.Preempted; import accord.coordinate.Timeout; import accord.coordinate.Truncated; -import accord.impl.CoordinateDurabilityScheduling; +import accord.impl.DurabilityScheduling; import accord.impl.DefaultLocalListeners; import accord.impl.DefaultRemoteListeners; import accord.impl.DefaultRequestTimeouts; @@ -317,14 +317,22 @@ public class Cluster implements Scheduler @Override public Scheduled once(Runnable run, long delay, TimeUnit units) { - RecurringPendingRunnable result = new RecurringPendingRunnable(null, run, () -> delay, units); + RecurringPendingRunnable result = new RecurringPendingRunnable(null, run, () -> delay, units, false); + pending.add(result, delay, units); + return result; + } + + @Override + public Scheduled selfRecurring(Runnable run, long delay, TimeUnit units) + { + RecurringPendingRunnable result = new RecurringPendingRunnable(null, run, () -> delay, units, true); pending.add(result, delay, units); return result; } public Scheduled recurring(Runnable run, LongSupplier delay, TimeUnit units) { - RecurringPendingRunnable result = new RecurringPendingRunnable(pending, run, delay, units); + RecurringPendingRunnable result = new RecurringPendingRunnable(pending, run, delay, units, true); ++recurring; result.onCancellation(() -> --recurring); pending.add(result, delay.getAsLong(), units); @@ -455,7 +463,7 @@ public class Cluster implements Scheduler messageListener.onTopologyChange(t); }; TopologyRandomizer configRandomizer = new TopologyRandomizer(randomSupplier, topology, topologyUpdates, nodeMap::get, schemaApply); - List<CoordinateDurabilityScheduling> durabilityScheduling = new ArrayList<>(); + List<DurabilityScheduling> durabilityScheduling = new ArrayList<>(); List<Service> services = new ArrayList<>(); for (Id id : nodes) { @@ -475,24 +483,24 @@ public class Cluster implements Scheduler randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER, DefaultRemoteListeners::new, DefaultRequestTimeouts::new, DefaultProgressLogs::new, DefaultLocalListeners.Factory::new, DelayedCommandStores.factory(sinks.pending, isLoadedCheck, journal), new CoordinationAdapter.DefaultFactory(), DurableBefore.NOOP_PERSISTER, localConfig); - CoordinateDurabilityScheduling durability = new CoordinateDurabilityScheduling(node); + DurabilityScheduling durability = node.durabilityScheduling(); // TODO (desired): randomise durability.setShardCycleTime(30, SECONDS); durability.setGlobalCycleTime(180, SECONDS); durabilityScheduling.add(durability); nodeMap.put(id, node); - durabilityScheduling.add(new CoordinateDurabilityScheduling(node)); + durabilityScheduling.add(new DurabilityScheduling(node)); services.add(new BarrierService(node, randomSupplier.get())); } Runnable updateDurabilityRate; { IntSupplier targetSplits = random.biasedUniformIntsSupplier(1, 16, 2, 4, 4, 16).get(); - IntSupplier shardCycleTimeSeconds = random.biasedUniformIntsSupplier(5, 60, 10, 30, 1, 30).get(); + IntSupplier shardCycleTimeSeconds = random.biasedUniformIntsSupplier(5, 60, 10, 60, 1, 30).get(); IntSupplier globalCycleTimeSeconds = random.biasedUniformIntsSupplier(1, 90, 10, 30,10, 60).get(); updateDurabilityRate = () -> { int c = targetSplits.getAsInt(); - int s = shardCycleTimeSeconds.getAsInt(); + int s = shardCycleTimeSeconds.getAsInt() * topologyFactory.rf; int g = globalCycleTimeSeconds.getAsInt(); durabilityScheduling.forEach(d -> { d.setTargetShardSplits(c); @@ -551,7 +559,6 @@ public class Cluster implements Scheduler DelayedCommandStores.DelayedCommandStore store = (DelayedCommandStores.DelayedCommandStore) s; store.clearForTesting(); journal.reconstructAll(store.loader(), store.id()); - journal.loadHistoricalTransactions(store::load, store.id()); } while (sinks.drain(pred)); CommandsForKey.enableLinearizabilityViolationsReporting(); @@ -559,16 +566,17 @@ public class Cluster implements Scheduler }); }, () -> random.nextInt(1, 10), SECONDS); - durabilityScheduling.forEach(CoordinateDurabilityScheduling::start); + durabilityScheduling.forEach(DurabilityScheduling::start); services.forEach(Service::start); - noMoreWorkSignal.accept(() -> { + Runnable stop = () -> { reconfigure.cancel(); purge.cancel(); restart.cancel(); - durabilityScheduling.forEach(CoordinateDurabilityScheduling::stop); + durabilityScheduling.forEach(DurabilityScheduling::stop); services.forEach(Service::close); - }); + }; + noMoreWorkSignal.accept(stop); readySignal.accept(nodeMap); Packet next; @@ -577,10 +585,7 @@ public class Cluster implements Scheduler while (sinks.processPending()); - chaos.cancel(); - reconfigure.cancel(); - durabilityScheduling.forEach(CoordinateDurabilityScheduling::stop); - services.forEach(Service::close); + stop.run(); sinks.links = sinks.linkConfig.defaultLinks; // give progress log et al a chance to finish diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java index b5747eb8..ed1df416 100644 --- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java +++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java @@ -55,7 +55,6 @@ import accord.local.NodeCommandStoreService; import accord.local.PreLoadContext; import accord.local.SafeCommandStore; import accord.local.ShardDistributor; -import accord.primitives.Deps; import accord.primitives.Range; import accord.primitives.RoutableKey; import accord.primitives.Txn; @@ -287,13 +286,6 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread return new DelayedSafeStore(this, ranges, context, commands, timestampsForKey, commandsForKeys); } - @Override - protected void registerHistoricalTransactions(Range range, Deps deps, SafeCommandStore safeStore) - { - journal.registerHistoricalTransactions(id(), deps); - super.registerHistoricalTransactions(range, deps, safeStore); - } - @Override public void unsafeRunIn(Runnable fn) { diff --git a/accord-core/src/test/java/accord/impl/basic/Journal.java b/accord-core/src/test/java/accord/impl/basic/Journal.java index f9ba1a42..ec0c1803 100644 --- a/accord-core/src/test/java/accord/impl/basic/Journal.java +++ b/accord-core/src/test/java/accord/impl/basic/Journal.java @@ -28,7 +28,6 @@ import java.util.NavigableMap; import java.util.Objects; import java.util.Set; import java.util.TreeMap; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.IntFunction; import java.util.stream.Collectors; @@ -46,7 +45,6 @@ import accord.primitives.SaveStatus; import accord.primitives.Status; import accord.local.StoreParticipants; import accord.primitives.Ballot; -import accord.primitives.Deps; import accord.primitives.PartialDeps; import accord.primitives.PartialTxn; import accord.primitives.Timestamp; @@ -63,7 +61,6 @@ import static accord.utils.Invariants.illegalState; public class Journal { private final Long2ObjectHashMap<NavigableMap<TxnId, List<Diff>>> diffsPerCommandStore = new Long2ObjectHashMap<>(); - private final Map<Integer, List<Deps>> historicalTransactions = new HashMap<>(); private final Node.Id id; @@ -155,15 +152,6 @@ public class Journal Last } - public void loadHistoricalTransactions(Consumer<Deps> consumer, int commandStoreId) - { - List<Deps> depsList = historicalTransactions.get(commandStoreId); - if (depsList == null) - return; - for (Deps deps : depsList) - consumer.accept(deps); - } - public Command reconstruct(int commandStoreId, TxnId txnId) { List<Diff> diffs = this.diffsPerCommandStore.get(commandStoreId).get(txnId); @@ -339,11 +327,6 @@ public class Journal } } - public void registerHistoricalTransactions(int commandStoreId, Deps deps) - { - this.historicalTransactions.computeIfAbsent(commandStoreId, (k) -> new ArrayList<>()).add(deps); - } - public void onExecute(int commandStoreId, Command before, Command after, boolean isPrimary) { if (loading || (before == null && after == null)) diff --git a/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java b/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java index 5bc40e69..a90f150a 100644 --- a/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java +++ b/accord-core/src/test/java/accord/impl/basic/RandomDelayQueue.java @@ -123,7 +123,7 @@ public class RandomDelayQueue implements PendingQueue public void addNoDelay(Pending item) { queue.add(new Item(now, seq++, item)); - if (item instanceof RecurringPendingRunnable) + if (isRecurring(item)) ++recurring; } @@ -133,16 +133,16 @@ public class RandomDelayQueue implements PendingQueue if (delay < 0) throw illegalArgument("Delay must be positive or 0, but given " + delay); queue.add(new Item(now + units.toMillis(delay) + jitterMillis.getAsLong(), seq++, item)); - if (item instanceof RecurringPendingRunnable) + if (isRecurring(item)) ++recurring; } @Override public boolean remove(Pending item) { - if (item instanceof RecurringPendingRunnable) + if (isRecurring(item)) --recurring; - return queue.remove(item); + return queue.removeIf(i -> i.item == item); } @Override @@ -152,7 +152,7 @@ public class RandomDelayQueue implements PendingQueue if (item == null) return null; - if (item.item instanceof RecurringPendingRunnable) + if (isRecurring(item.item)) --recurring; now = item.time; @@ -173,7 +173,7 @@ public class RandomDelayQueue implements PendingQueue else { queue.add(item); - if (item.item instanceof RecurringPendingRunnable) + if (isRecurring(item.item)) ++recurring; } } @@ -301,4 +301,9 @@ public class RandomDelayQueue implements PendingQueue this.seed = seed; } } + + private static boolean isRecurring(Pending pending) + { + return pending instanceof RecurringPendingRunnable && ((RecurringPendingRunnable) pending).isRecurring; + } } diff --git a/accord-core/src/test/java/accord/impl/basic/RecurringPendingRunnable.java b/accord-core/src/test/java/accord/impl/basic/RecurringPendingRunnable.java index d8b876a0..6dbfd754 100644 --- a/accord-core/src/test/java/accord/impl/basic/RecurringPendingRunnable.java +++ b/accord-core/src/test/java/accord/impl/basic/RecurringPendingRunnable.java @@ -28,15 +28,17 @@ class RecurringPendingRunnable implements PendingRunnable, Scheduled final PendingQueue requeue; final LongSupplier delay; final TimeUnit units; + final boolean isRecurring; Runnable run; Runnable onCancellation; - RecurringPendingRunnable(PendingQueue requeue, Runnable run, LongSupplier delay, TimeUnit units) + RecurringPendingRunnable(PendingQueue requeue, Runnable run, LongSupplier delay, TimeUnit units, boolean isRecurring) { this.requeue = requeue; this.run = run; this.delay = delay; this.units = units; + this.isRecurring = isRecurring; } @Override diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java b/accord-core/src/test/java/accord/impl/list/ListRequest.java index ea43e67e..46ac4c2b 100644 --- a/accord-core/src/test/java/accord/impl/list/ListRequest.java +++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java @@ -18,6 +18,7 @@ package accord.impl.list; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Function; @@ -30,7 +31,6 @@ import accord.coordinate.Invalidated; import accord.coordinate.Truncated; import accord.coordinate.Timeout; import accord.impl.MessageListener; -import accord.impl.basic.Cluster; import accord.impl.basic.Packet; import accord.impl.basic.SimulatedFault; import accord.local.Node; @@ -166,12 +166,12 @@ public class ListRequest implements Request } node.reply(client, replyContext, ListResult.heartBeat(client, ((Packet)replyContext).requestId, id), null); - ((Cluster) node.scheduler()).onDone(() -> checkOnResult(homeKey, id, 0, null)); + node.scheduler().once(() -> checkOnResult(homeKey, id, 0, null), 5L, TimeUnit.MINUTES); } else if (fail instanceof SimulatedFault) { node.reply(client, replyContext, ListResult.heartBeat(client, ((Packet)replyContext).requestId, id), null); - ((Cluster) node.scheduler()).onDone(() -> checkOnResult(null, id, 0, null)); + node.scheduler().once(() -> checkOnResult(null, id, 0, null), 5L, TimeUnit.MINUTES); } else { diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java index 710780fe..9d3f9fd4 100644 --- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java +++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java @@ -61,6 +61,7 @@ import accord.local.PreLoadContext; import accord.local.RedundantBefore; import accord.local.SafeCommand; import accord.local.SafeCommandStore; +import accord.primitives.RangeDeps; import accord.primitives.SaveStatus; import accord.primitives.Status; import accord.local.StoreParticipants; @@ -87,9 +88,11 @@ import accord.utils.DefaultRandom; import accord.utils.Invariants; import accord.utils.RandomSource; import accord.utils.async.AsyncChain; +import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; import static accord.local.Command.NotDefined.notDefined; +import static accord.local.cfk.UpdateUnmanagedMode.REGISTER; import static accord.primitives.Routable.Domain.Key; import static accord.primitives.Status.Durability.NotDurable; @@ -636,7 +639,7 @@ public class CommandsForKeyTest if (!CommandsForKey.managesExecution(update.next.txnId()) && update.next.hasBeen(Status.Stable) && !update.next.hasBeen(Status.Truncated)) { CommandsForKey prev = safeCfk.current(); - result = prev.registerUnmanaged(safeCommand); + result = prev.registerUnmanaged(safeCommand, REGISTER); safeCfk.set(result.cfk()); result.postProcess(safeStore, prev, null, canon); } @@ -961,10 +964,7 @@ public class CommandsForKeyTest } @Override - protected void registerHistoricalTransactions(Range range, Deps deps, SafeCommandStore safeStore) - { - throw new UnsupportedOperationException(); - } + protected void registerTransitive(SafeCommandStore safeStore, RangeDeps deps){ } @Override public <T> AsyncChain<T> submit(Callable<T> task) diff --git a/accord-core/src/test/resources/burn-logback.xml b/accord-core/src/test/resources/burn-logback.xml index 9f4e1b18..30f90316 100644 --- a/accord-core/src/test/resources/burn-logback.xml +++ b/accord-core/src/test/resources/burn-logback.xml @@ -36,6 +36,8 @@ </filter> </appender> + <logger name="accord.impl.DurabilityScheduling" level="ERROR"/> + <root level="INFO"> <appender-ref ref="FILE"/> <appender-ref ref="STDOUT"/> diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java index fa8a2048..46d497c6 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java @@ -242,14 +242,16 @@ public class Cluster implements Scheduler class CancellableRunnable implements Runnable, Scheduled { final boolean recurring; + final boolean selfRecurring; final long delay; final TimeUnit units; Runnable run; - CancellableRunnable(Runnable run, boolean recurring, long delay, TimeUnit units) + CancellableRunnable(Runnable run, boolean recurring, boolean selfRecurring, long delay, TimeUnit units) { this.run = run; this.recurring = recurring; + this.selfRecurring = selfRecurring; this.delay = delay; this.units = units; } @@ -260,7 +262,7 @@ public class Cluster implements Scheduler if (run != null) { run.run(); - if (recurring) pending.add(this, delay, units); + if (recurring && !selfRecurring) pending.add(this, delay, units); else run = null; } } @@ -281,7 +283,7 @@ public class Cluster implements Scheduler @Override public Scheduled recurring(Runnable run, long delay, TimeUnit units) { - CancellableRunnable result = new CancellableRunnable(run, true, delay, units); + CancellableRunnable result = new CancellableRunnable(run, true, false, delay, units); ++recurring; pending.add(result, delay, units); return result; @@ -290,7 +292,16 @@ public class Cluster implements Scheduler @Override public Scheduled once(Runnable run, long delay, TimeUnit units) { - CancellableRunnable result = new CancellableRunnable(run, false, delay, units); + CancellableRunnable result = new CancellableRunnable(run, false, false, delay, units); + pending.add(result, delay, units); + return result; + } + + @Override + public Scheduled selfRecurring(Runnable run, long delay, TimeUnit units) + { + CancellableRunnable result = new CancellableRunnable(run, true, true, delay, units); + ++recurring; pending.add(result, delay, units); return result; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org