This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 4cf0070d604abd2db460a5f1c3f8cd8dc7d26696 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Tue Oct 1 10:34:42 2024 +0100 Follow-up to CASSANDRA-19967 and CASSANDRA-19869 --- .../src/main/java/accord/coordinate/Barrier.java | 2 +- .../java/accord/impl/DefaultRequestTimeouts.java | 47 +++++++++++++++++----- .../main/java/accord/impl/ErasedSafeCommand.java | 7 +++- .../java/accord/impl/progresslog/HomeState.java | 15 ++++++- .../src/main/java/accord/local/Command.java | 14 ++++--- .../src/main/java/accord/local/CommandStores.java | 36 ++++++++++++++++- .../src/main/java/accord/local/Commands.java | 2 - .../main/java/accord/local/CommonAttributes.java | 3 ++ .../src/main/java/accord/primitives/Timestamp.java | 19 ++++++--- .../main/java/accord/topology/TopologyManager.java | 2 +- .../src/main/java/accord/utils/ArrayBuffers.java | 36 +++++++++++++++++ .../src/main/java/accord/utils/LogGroupTimers.java | 2 +- .../test/java/accord/impl/LocalListenersTest.java | 4 +- .../java/accord/local/ImmutableCommandTest.java | 4 +- .../test/java/accord/primitives/KeyDepsTest.java | 5 +-- .../test/java/accord/utils/LogGroupTimersTest.java | 4 +- 16 files changed, 165 insertions(+), 37 deletions(-) diff --git a/accord-core/src/main/java/accord/coordinate/Barrier.java b/accord-core/src/main/java/accord/coordinate/Barrier.java index aa06015d..54de2fa2 100644 --- a/accord-core/src/main/java/accord/coordinate/Barrier.java +++ b/accord-core/src/main/java/accord/coordinate/Barrier.java @@ -182,7 +182,7 @@ public class Barrier extends AsyncResults.AbstractResult<TxnId> { AsyncSyncPoint async = syncPoint.apply(node, route); coordinateSyncPoint = async.async; - if (!barrierType.global) + if (barrierType.async) { Invariants.checkState(barrierType.async); TxnId txnId = async.txnId; diff --git a/accord-core/src/main/java/accord/impl/DefaultRequestTimeouts.java b/accord-core/src/main/java/accord/impl/DefaultRequestTimeouts.java index 8f11e5f1..7365a314 100644 --- a/accord-core/src/main/java/accord/impl/DefaultRequestTimeouts.java +++ b/accord-core/src/main/java/accord/impl/DefaultRequestTimeouts.java @@ -24,6 +24,7 @@ import java.util.function.Function; import accord.api.RequestTimeouts; import accord.local.Node; +import accord.utils.ArrayBuffers.BufferList; import accord.utils.LogGroupTimers; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -49,7 +50,8 @@ public class DefaultRequestTimeouts implements RequestTimeouts lock.lock(); try { - timeouts.remove(this); + if (isInHeap()) + timeouts.remove(this); } finally { @@ -91,16 +93,41 @@ public class DefaultRequestTimeouts implements RequestTimeouts @Override public void run() { - lock.lock(); - try + try (BufferList<Registered> collect = new BufferList<>()) { - long now = node.elapsed(MILLISECONDS); - // TODO (expected): should we handle reentrancy? Or at least throw an exception? - timeouts.advance(now, this, (s, r) -> r.timeout.timeout()); - } - finally - { - lock.unlock(); + int i = 0; + try + { + lock.lock(); + try + { + long now = node.elapsed(MILLISECONDS); + // TODO (expected): should we handle reentrancy? Or at least throw an exception? + timeouts.advance(now, collect, BufferList::add); + } + finally + { + lock.unlock(); + } + + while (i < collect.size()) + collect.get(i++).timeout.timeout(); + } + catch (Throwable t) + { + while (i < collect.size()) + { + try + { + collect.get(i++).timeout.timeout(); + } + catch (Throwable t2) + { + t.addSuppressed(t2); + } + } + throw t; + } } } diff --git a/accord-core/src/main/java/accord/impl/ErasedSafeCommand.java b/accord-core/src/main/java/accord/impl/ErasedSafeCommand.java index c58528cc..55c8c8fe 100644 --- a/accord-core/src/main/java/accord/impl/ErasedSafeCommand.java +++ b/accord-core/src/main/java/accord/impl/ErasedSafeCommand.java @@ -37,8 +37,13 @@ public class ErasedSafeCommand extends SafeCommand public ErasedSafeCommand(TxnId txnId, SaveStatus saveStatus) { super(txnId); + this.erased = erased(txnId, saveStatus); + } + + public static Command erased(TxnId txnId, SaveStatus saveStatus) + { Invariants.checkArgument(saveStatus.compareTo(Erased) >= 0); - this.erased = new Command.Truncated(txnId, saveStatus, saveStatus == ErasedOrVestigial ? NotDurable : UniversalOrInvalidated, StoreParticipants.empty(txnId), null, null, null); + return new Command.Truncated(txnId, saveStatus, saveStatus == ErasedOrVestigial ? NotDurable : UniversalOrInvalidated, StoreParticipants.empty(txnId), null, null, null); } @Override diff --git a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java index 3687aa8e..261aca05 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java +++ b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java @@ -132,8 +132,15 @@ abstract class HomeState extends WaitingState { Invariants.checkState(!isHomeDoneOrUninitialised()); Command command = safeCommand.current(); + if (command.hasBeen(Status.Truncated)) + { + // TODO (required): validate this better + setHomeDone(instance); + return; + } Invariants.checkState(!command.hasBeen(Status.Truncated), "Command %s is truncated", command); + Invariants.checkState(command.durability() != null); // TODO (expected): when invalidated, safer to maintain HomeState until known to be globally invalidated // TODO (now): validate that we clear HomeState when we receive a Durable reply, to replace the token check logic Invariants.checkState(!command.durability().isDurableOrInvalidated(), "Command is durable or invalidated, but we have not cleared the ProgressLog"); @@ -149,6 +156,13 @@ abstract class HomeState extends WaitingState if (state == null) return; + Command command = safeCommand.current(); + if (command.is(Status.Truncated)) + { + state.setHomeDone(instance); + return; + } + CoordinatePhase status = state.phase(); if (status.isAtMostReadyToExecute() && state.homeProgress() == Querying) { @@ -159,7 +173,6 @@ abstract class HomeState extends WaitingState } else { - Command command = safeCommand.current(); ProgressToken token = success.asProgressToken().merge(command); if (prevProgressToken != null) token = token.merge(prevProgressToken); diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java index a755f712..c3e2c851 100644 --- a/accord-core/src/main/java/accord/local/Command.java +++ b/accord-core/src/main/java/accord/local/Command.java @@ -197,18 +197,18 @@ public abstract class Command implements CommonAttributes { this.txnId = txnId; this.status = validateCommandClass(txnId, status, getClass()); - this.durability = durability; - this.participants = participants; - this.promised = promised; + this.durability = Invariants.nonNull(durability); + this.participants = Invariants.nonNull(participants); + this.promised = Invariants.nonNull(promised); } private AbstractCommand(CommonAttributes common, SaveStatus status, Ballot promised) { this.txnId = common.txnId(); this.status = validateCommandClass(txnId, status, getClass()); - this.durability = common.durability(); - this.participants = common.participants(); - this.promised = promised; + this.durability = Invariants.nonNull(common.durability()); + this.participants = Invariants.nonNull(common.participants()); + this.promised = Invariants.nonNull(promised); } @Override @@ -1020,6 +1020,8 @@ public abstract class Command implements CommonAttributes { super(common, status, promised, executeAt, accepted); this.waitingOn = waitingOn; + Invariants.nonNull(common.participants()); + Invariants.nonNull(common.route()); Invariants.checkState(common.route().kind().isFullRoute(), "Expected a full route but given %s", common.route().kind()); } diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index e5f684c3..53f619d5 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -52,6 +52,7 @@ import accord.primitives.Route; import accord.primitives.RoutingKeys; import accord.primitives.Timestamp; import accord.primitives.TxnId; +import accord.topology.Shard; import accord.topology.Topology; import accord.utils.Invariants; import accord.utils.MapReduce; @@ -452,8 +453,10 @@ public abstract class CommandStores // TODO (desired): only sync affected shards Ranges ranges = shard.ranges().currentRanges(); // ranges can be empty when ranges are lost or consolidated across epochs. - if (epoch > 1 && startSync && !ranges.isEmpty()) + if (epoch > 1 && startSync && requiresSync(ranges, prev.global, newTopology)) + { bootstrapUpdates.add(shard.store.sync(node, ranges, epoch)); + } result.add(shard); } @@ -489,6 +492,37 @@ public abstract class CommandStores return new TopologyUpdate(new Snapshot(result.toArray(new ShardHolder[0]), newLocalTopology, newTopology), bootstrap); } + private static boolean requiresSync(Ranges ranges, Topology oldTopology, Topology newTopology) + { + List<Shard> oldShards = oldTopology.foldl(ranges, (oldShard, shards, i) -> { + shards.add(oldShard); + return shards; + }, new ArrayList<>()); + + List<Shard> newShards = newTopology.foldl(ranges, (newShard, shards, i) -> { + shards.add(newShard); + return shards; + }, new ArrayList<>()); + + if (oldShards.size() != newShards.size()) + return true; + + for (int i = 0 ; i < oldShards.size() ; ++i) + { + Shard oldShard = oldShards.get(i); + Shard newShard = newShards.get(i); + if (!oldShard.fastPathElectorate.containsAll(newShard.fastPathElectorate)) + return true; + + if (!newShard.fastPathElectorate.containsAll(oldShard.fastPathElectorate)) + return true; + + if (!newShard.nodes.equals(oldShard.nodes)) + return true; + } + return false; + } + public <R> R unsafeFoldLeft(R initial, BiFunction<R, CommandStore, R> f) { Snapshot snapshot = current; diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java index 241e1c95..a85ad650 100644 --- a/accord-core/src/main/java/accord/local/Commands.java +++ b/accord-core/src/main/java/accord/local/Commands.java @@ -605,14 +605,12 @@ public class Commands .intersecting(executed.writes().keys); if (executes == null || executes.isEmpty()) { - // TODO (desirable, performance): Mark no-ops in CFK so we can notify later transactions immediately logger.trace("{}: applying no-op", txnId); safeCommand.applied(safeStore); safeStore.notifyListeners(safeCommand, command); } else { - // TODO (now): we should set applying within apply to avoid applying multiple times safeCommand.applying(safeStore); safeStore.notifyListeners(safeCommand, command); logger.trace("{}: applying", command.txnId()); diff --git a/accord-core/src/main/java/accord/local/CommonAttributes.java b/accord-core/src/main/java/accord/local/CommonAttributes.java index 9472e63e..583f70ef 100644 --- a/accord-core/src/main/java/accord/local/CommonAttributes.java +++ b/accord-core/src/main/java/accord/local/CommonAttributes.java @@ -25,6 +25,8 @@ import accord.primitives.Status; import accord.primitives.TxnId; import accord.utils.Invariants; +import static accord.primitives.Status.Durability.NotDurable; + public interface CommonAttributes { TxnId txnId(); @@ -51,6 +53,7 @@ public interface CommonAttributes { this.txnId = txnId; this.participants = StoreParticipants.empty(txnId); + this.durability = NotDurable; } public Mutable(CommonAttributes attributes) diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java b/accord-core/src/main/java/accord/primitives/Timestamp.java index af13a3f3..1249d574 100644 --- a/accord-core/src/main/java/accord/primitives/Timestamp.java +++ b/accord-core/src/main/java/accord/primitives/Timestamp.java @@ -209,19 +209,28 @@ public class Timestamp implements Comparable<Timestamp>, EpochSupplier public int compareTo(@Nonnull Timestamp that) { if (this == that) return 0; - int c = Long.compareUnsigned(this.msb, that.msb); - if (c == 0) c = Long.compare(lowHlc(this.lsb), lowHlc(that.lsb)); - if (c == 0) c = Long.compare(this.lsb & IDENTITY_FLAGS, that.lsb & IDENTITY_FLAGS); + int c = compareMsb(this.msb, that.msb); + if (c == 0) c = compareLsb(this.lsb, that.lsb); if (c == 0) c = this.node.compareTo(that.node); return c; } + public static int compareMsb(long msbA, long msbB) + { + return Long.compareUnsigned(msbA, msbB); + } + + public static int compareLsb(long lsbA, long lsbB) + { + int c = Long.compare(lowHlc(lsbA), lowHlc(lsbB)); + return c != 0 ? c : Long.compare(lsbA & IDENTITY_FLAGS, lsbB & IDENTITY_FLAGS); + } + public int compareToWithoutEpoch(@Nonnull Timestamp that) { if (this == that) return 0; int c = Long.compare(highHlc(this.msb), highHlc(that.msb)); - if (c == 0) c = Long.compare(lowHlc(this.lsb), lowHlc(that.lsb)); - if (c == 0) c = Long.compare(this.lsb & IDENTITY_FLAGS, that.lsb & IDENTITY_FLAGS); + if (c == 0) c = compareLsb(this.lsb, that.lsb); if (c == 0) c = this.node.compareTo(that.node); return c; } diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 23a5c6c4..c0bd2c90 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -731,7 +731,7 @@ public class TopologyManager // An issue was found where a range was removed from a replica and min selection picked the epoch before that, // which caused a node to get included in the txn that actually lost the range // See CASSANDRA-18804 - while (i < maxi && !select.isEmpty()) + while (i < maxi) { EpochState epochState = snapshot.epochs[i++]; topologies.add(epochState.global.forSelection(select, false)); diff --git a/accord-core/src/main/java/accord/utils/ArrayBuffers.java b/accord-core/src/main/java/accord/utils/ArrayBuffers.java index 388c5e2c..1bca73d3 100644 --- a/accord-core/src/main/java/accord/utils/ArrayBuffers.java +++ b/accord-core/src/main/java/accord/utils/ArrayBuffers.java @@ -23,7 +23,9 @@ import accord.api.RoutingKey; import accord.primitives.Range; import accord.primitives.TxnId; +import java.io.Closeable; import java.lang.reflect.Array; +import java.util.AbstractList; import java.util.Arrays; import java.util.function.IntFunction; @@ -783,4 +785,38 @@ public class ArrayBuffers } } + public static class BufferList<E> extends AbstractList<E> implements Closeable + { + private static final Object[] EMPTY = new Object[0]; + private Object[] buffer = EMPTY; + private int size; + + @Override + public E get(int index) + { + return (E) buffer[index]; + } + + @Override + public int size() + { + return size; + } + + @Override + public boolean add(E e) + { + if (size == buffer.length) + buffer = cachedAny().resize(buffer, size, Math.max(8, size * 2)); + buffer[size++] = e; + return true; + } + + public void close() + { + if (buffer == null) return; + cachedAny().forceDiscard(buffer, size); + buffer = null; + } + } } diff --git a/accord-core/src/main/java/accord/utils/LogGroupTimers.java b/accord-core/src/main/java/accord/utils/LogGroupTimers.java index eeb9ac21..8c59cbc4 100644 --- a/accord-core/src/main/java/accord/utils/LogGroupTimers.java +++ b/accord-core/src/main/java/accord/utils/LogGroupTimers.java @@ -95,7 +95,7 @@ public class LogGroupTimers<T extends LogGroupTimers.Timer> public void ensureScheduled(long now) { - now = Math.max(lastNow, now); + lastNow = now = Math.max(lastNow, now); T next = peekIfSoonerThan(now + preciseDelayThreshold); long runAt; if (next == null) diff --git a/accord-core/src/test/java/accord/impl/LocalListenersTest.java b/accord-core/src/test/java/accord/impl/LocalListenersTest.java index 7e85c960..157343fc 100644 --- a/accord-core/src/test/java/accord/impl/LocalListenersTest.java +++ b/accord-core/src/test/java/accord/impl/LocalListenersTest.java @@ -430,18 +430,20 @@ public class LocalListenersTest final TxnId txnId; final SaveStatus saveStatus; final Durability durability; + final StoreParticipants participants; TestCommand(TxnId txnId, SaveStatus saveStatus, Durability durability) { this.txnId = txnId; this.saveStatus = saveStatus; this.durability = durability; + this.participants = StoreParticipants.empty(txnId); } @Override public StoreParticipants participants() { - return null; + return participants; } @Override diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java index 08acf9a4..effde761 100644 --- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java +++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java @@ -131,7 +131,7 @@ public class ImmutableCommandTest Assertions.assertNull(command.executeAt()); } SafeCommandStore safeStore = commands.beginOperation(PreLoadContext.contextFor(txnId, keys.toParticipants())); - StoreParticipants participants = StoreParticipants.update(safeStore, keys.toParticipants(), txnId.epoch(), txnId, txnId.epoch()); + StoreParticipants participants = StoreParticipants.update(safeStore, ROUTE, txnId.epoch(), txnId, txnId.epoch()); SafeCommand safeCommand = safeStore.get(txnId, participants); Commands.preaccept(safeStore, safeCommand, participants, txnId, txnId.epoch(), txn.slice(FULL_RANGES, true), ROUTE); Command command = safeStore.get(txnId).current(); @@ -161,7 +161,7 @@ public class ImmutableCommandTest ((TestableConfigurationService)node.configService()).reportTopology(TopologyUtils.withEpoch(support.local.get(), 2)); Timestamp expectedTimestamp = Timestamp.fromValues(2, 110, ID1); getUninterruptibly(commands.execute(context, (Consumer<? super SafeCommandStore>) safeStore -> { - StoreParticipants participants = StoreParticipants.update(safeStore, keys.toParticipants(), txnId.epoch(), txnId, 2); + StoreParticipants participants = StoreParticipants.update(safeStore, ROUTE, txnId.epoch(), txnId, 2); Commands.preaccept(safeStore, safeStore.get(txnId, participants), participants, txnId, txnId.epoch(), txn.slice(FULL_RANGES, true), ROUTE); })); commands.execute(PreLoadContext.contextFor(txnId, txn.keys().toParticipants()), safeStore -> { diff --git a/accord-core/src/test/java/accord/primitives/KeyDepsTest.java b/accord-core/src/test/java/accord/primitives/KeyDepsTest.java index e2886902..374d7b73 100644 --- a/accord-core/src/test/java/accord/primitives/KeyDepsTest.java +++ b/accord-core/src/test/java/accord/primitives/KeyDepsTest.java @@ -53,7 +53,6 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import accord.api.Key; import accord.impl.IntHashKey; import accord.local.Node.Id; @@ -432,7 +431,7 @@ public class KeyDepsTest void testSimpleEquality() { - Assertions.assertArrayEquals(canonical.keySet().toArray(new Key[0]), test.keys().stream().toArray(Key[]::new)); + Assertions.assertArrayEquals(canonical.keySet().toArray(new RoutingKey[0]), test.keys().stream().toArray(RoutingKey[]::new)); for (Map.Entry<RoutingKey, NavigableSet<TxnId>> e : canonical.entrySet()) { List<TxnId> canonical = new ArrayList<>(e.getValue()); @@ -447,7 +446,7 @@ public class KeyDepsTest for (Map.Entry<TxnId, List<RoutingKey>> e : canonicalInverted.entrySet()) { Assertions.assertArrayEquals(toArray(e.getValue(), RoutingKey[]::new), - test.participatingKeys(e.getKey()).stream().toArray(Key[]::new)); + test.participatingKeys(e.getKey()).stream().toArray(RoutingKey[]::new)); } StringBuilder builder = new StringBuilder(); diff --git a/accord-core/src/test/java/accord/utils/LogGroupTimersTest.java b/accord-core/src/test/java/accord/utils/LogGroupTimersTest.java index 597f9c99..508a4a6d 100644 --- a/accord-core/src/test/java/accord/utils/LogGroupTimersTest.java +++ b/accord-core/src/test/java/accord/utils/LogGroupTimersTest.java @@ -47,7 +47,7 @@ public class LogGroupTimersTest @Test public void testOne() { - testOne(1269396034272574761L, 1000, 100); + testOne(5843168000636021001L, 1000, 100); } static class Timer extends LogGroupTimers.Timer @@ -244,7 +244,7 @@ public class LogGroupTimersTest void updateNow(long epoch) { long prevAt = at; - now = epoch + rnd.nextLong(schedulerImpreciseLateTolerance) - schedulerPreciseLateTolerance/2; + now = Math.max(0, epoch + rnd.nextLong(schedulerImpreciseLateTolerance) - schedulerPreciseLateTolerance/2); maxNow = Math.max(now, maxNow); scheduling.ensureScheduled(now); check(prevAt); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org