belliottsmith commented on code in PR #3679:
URL: https://github.com/apache/cassandra/pull/3679#discussion_r1853821458
##########
src/java/org/apache/cassandra/service/accord/CommandsForRanges.java:
##########
@@ -160,33 +158,279 @@ private <P1, T> T mapReduce(@Nonnull Timestamp
testTimestamp, @Nullable TxnId te
Invariants.checkState(testTxnId.equals(summary.findAsDep));
}
- // TODO (required): ensure we are excluding any ranges that are
now shard-redundant (not sure if this is enforced yet)
for (Range range : summary.ranges)
{
- if (!this.ranges.intersects(range))
- continue;
- collect.computeIfAbsent(range, ignore -> new
ArrayList<>()).add(summary);
+ if (keysOrRanges.intersects(range))
+ collect.computeIfAbsent(range, ignore -> new
ArrayList<>()).add(summary);
}
}));
- for (Map.Entry<Range, List<CommandsForRangesLoader.Summary>> e :
collect.entrySet())
+ for (Map.Entry<Range, List<Summary>> e : collect.entrySet())
{
- for (CommandsForRangesLoader.Summary command : e.getValue())
+ for (Summary command : e.getValue())
accumulate = map.apply(p1, e.getKey(), command.txnId,
command.executeAt, accumulate);
}
return accumulate;
}
- public CommandsForRanges slice(Ranges slice)
+ public static class Summary
+ {
+ public final @Nonnull TxnId txnId;
+ public final @Nonnull Timestamp executeAt;
+ public final @Nonnull SaveStatus saveStatus;
+ public final @Nonnull Ranges ranges;
+
+ public final TxnId findAsDep;
+ public final boolean hasAsDep;
+
+ @VisibleForTesting
+ Summary(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt, @Nonnull
SaveStatus saveStatus, @Nonnull Ranges ranges, TxnId findAsDep, boolean
hasAsDep)
+ {
+ this.txnId = txnId;
+ this.executeAt = executeAt;
+ this.saveStatus = saveStatus;
+ this.ranges = ranges;
+ this.findAsDep = findAsDep;
+ this.hasAsDep = hasAsDep;
+ }
+
+ public Summary slice(Ranges slice)
+ {
+ return new Summary(txnId, executeAt, saveStatus,
ranges.slice(slice, Minimal), findAsDep, hasAsDep);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Summary{" +
+ "txnId=" + txnId +
+ ", executeAt=" + executeAt +
+ ", saveStatus=" + saveStatus +
+ ", ranges=" + ranges +
+ ", findAsDep=" + findAsDep +
+ ", hasAsDep=" + hasAsDep +
+ '}';
+ }
+ }
+
+ public static class Manager implements AccordCache.Listener<TxnId, Command>
+ {
+ private final AccordCommandStore commandStore;
+ private final RoutesSearcher searcher = new RoutesSearcher();
+ private final NavigableMap<TxnId, Ranges> transitive = new TreeMap<>();
+ private final ObjectHashSet<TxnId> cachedRangeTxns = new
ObjectHashSet<>();
+
+ public Manager(AccordCommandStore commandStore)
+ {
+ this.commandStore = commandStore;
+ try (AccordCommandStore.ExclusiveCaches caches =
commandStore.lockCaches())
+ {
+ caches.commands().register(this);
+ }
+ }
+
+ @Override
+ public void onAdd(AccordCacheEntry<TxnId, Command> state)
+ {
+ TxnId txnId = state.key();
+ if (txnId.is(Routable.Domain.Range))
+ cachedRangeTxns.add(txnId);
+ }
+
+ @Override
+ public void onEvict(AccordCacheEntry<TxnId, Command> state)
+ {
+ TxnId txnId = state.key();
+ if (txnId.is(Routable.Domain.Range))
+ cachedRangeTxns.remove(txnId);
+ }
+
+ public CommandsForRanges.Loader loader(@Nullable TxnId primaryTxnId,
KeyHistory keyHistory, Unseekables<?> keysOrRanges)
+ {
+ RedundantBefore redundantBefore =
commandStore.unsafeGetRedundantBefore();
+ TxnId minTxnId = redundantBefore.min(keysOrRanges, e ->
e.gcBefore);
+ Timestamp maxTxnId = primaryTxnId == null || keyHistory ==
KeyHistory.RECOVER || !primaryTxnId.is(ExclusiveSyncPoint) ? Timestamp.MAX :
primaryTxnId;
+ TxnId findAsDep = primaryTxnId != null && keyHistory ==
KeyHistory.RECOVER ? primaryTxnId : null;
+ return new CommandsForRanges.Loader(this, keysOrRanges,
redundantBefore, minTxnId, maxTxnId, findAsDep);
+ }
+
+ public void mergeTransitive(TxnId txnId, Ranges ranges, BiFunction<?
super Ranges, ? super Ranges, ? extends Ranges> remappingFunction)
+ {
+ transitive.merge(txnId, ranges, remappingFunction);
+ }
+
+ public void gcBefore(TxnId gcBefore, Ranges ranges)
+ {
+ Iterator<Map.Entry<TxnId, Ranges>> iterator =
transitive.headMap(gcBefore).entrySet().iterator();
+ while (iterator.hasNext())
+ {
+ Map.Entry<TxnId, Ranges> e = iterator.next();
+ Ranges newRanges = e.getValue().without(ranges);
+ if (newRanges.isEmpty())
+ iterator.remove();
+ e.setValue(newRanges);
+ }
+ }
+ }
+
+ public static class Loader
{
- Ranges ranges = this.ranges.slice(slice, Minimal);
- NavigableMap<Timestamp, CommandsForRangesLoader.Summary> copy = new
TreeMap<>();
- for (Map.Entry<Timestamp, CommandsForRangesLoader.Summary> e :
map.entrySet())
+ private final Manager manager;
+ final Unseekables<?> searchKeysOrRanges;
+ final RedundantBefore redundantBefore;
+ final TxnId minTxnId;
+ final Timestamp maxTxnId;
+ @Nullable final TxnId findAsDep;
+
+ public Loader(Manager manager, Unseekables<?> searchKeysOrRanges,
RedundantBefore redundantBefore, TxnId minTxnId, Timestamp maxTxnId, @Nullable
TxnId findAsDep)
+ {
+ this.manager = manager;
+ this.searchKeysOrRanges = searchKeysOrRanges;
+ this.redundantBefore = redundantBefore;
+ this.minTxnId = minTxnId;
+ this.maxTxnId = maxTxnId;
+ this.findAsDep = findAsDep;
+ }
+
+ public void intersects(Consumer<TxnId> forEach)
{
- if (!e.getValue().ranges.intersects(slice)) continue;
- copy.put(e.getKey(), e.getValue().slice(slice));
+ switch (searchKeysOrRanges.domain())
+ {
+ case Range:
+ for (Unseekable range : searchKeysOrRanges)
+ manager.searcher.intersects(manager.commandStore.id(),
(TokenRange) range, minTxnId, maxTxnId, forEach);
+ break;
+ case Key:
+ for (Unseekable key : searchKeysOrRanges)
+ manager.searcher.intersects(manager.commandStore.id(),
(AccordRoutingKey) key, minTxnId, maxTxnId, forEach);
+ }
+
+ if (!manager.transitive.isEmpty())
+ {
+ for (var e : manager.transitive.tailMap(minTxnId,
true).entrySet())
+ {
+ if (e.getValue().intersects(searchKeysOrRanges))
+ forEach.accept(e.getKey());
+ }
+ }
+ }
+
+ public void forEachInCache(Consumer<Summary> forEach,
AccordCommandStore.Caches caches)
+ {
+ for (TxnId txnId : manager.cachedRangeTxns)
+ {
+ AccordCacheEntry<TxnId, Command> state =
caches.commands().getUnsafe(txnId);
+ Summary summary = from(state);
+ if (summary != null)
+ forEach.accept(summary);
+ }
+ }
+
+ public Summary load(TxnId txnId)
+ {
+ if (findAsDep == null)
+ {
+ SavedCommand.MinimalCommand cmd =
manager.commandStore.loadMinimal(txnId);
+ if (cmd != null)
+ return from(cmd);
+ }
+ else
+ {
+ Command cmd = manager.commandStore.loadCommand(txnId);
+ if (cmd != null)
+ return from(cmd);
+ }
+
+ Ranges ranges = manager.transitive.get(txnId);
+ if (ranges == null)
+ return null;
+
+ ranges = ranges.intersecting(searchKeysOrRanges);
+ if (ranges.isEmpty())
+ return null;
+
+ return new Summary(txnId, txnId, SaveStatus.NotDefined, ranges,
null, false);
+ }
+
+ public Summary from(AccordCacheEntry<TxnId, Command> state)
+ {
+ if (state.key().domain() != Routable.Domain.Range)
+ return null;
+
+ switch (state.status())
+ {
+ default: throw new AssertionError("Unhandled status: " +
state.status());
+ case LOADING:
+ case WAITING_TO_LOAD:
+ case UNINITIALIZED:
+ return null;
+
+ case LOADED:
+ case MODIFIED:
+ case SAVING:
+ case FAILED_TO_SAVE:
+ }
+
+ TxnId txnId = state.key();
+ if (!txnId.isVisible() || txnId.compareTo(minTxnId) < 0 ||
txnId.compareTo(maxTxnId) >= 0)
+ return null;
+
+ Command command = state.getExclusive();
+ if (command == null)
+ return null;
+ return from(command);
+ }
+
+ public Summary from(Command cmd)
+ {
+ return from(cmd.txnId(), cmd.executeAt(), cmd.saveStatus(),
cmd.participants(), cmd.partialDeps());
+ }
+
+ public Summary from(SavedCommand.MinimalCommand cmd)
+ {
+ Invariants.checkState(findAsDep == null);
+ return from(cmd.txnId, cmd.executeAt, cmd.saveStatus,
cmd.participants, null);
+ }
+
+ private Summary from(TxnId txnId, Timestamp executeAt, SaveStatus
saveStatus, StoreParticipants participants, @Nullable PartialDeps partialDeps)
+ {
+ if (participants == null)
+ return null;
+
+ Ranges keysOrRanges = participants.touches().toRanges();
Review Comment:
because we rely on quorum intersections across topology changes. "Touches"
means we have processed a phase (e.g. a PreAccept, Accept, Commit etc) that
intersected with ranges we own _at some point_, and we did this because some
future epoch broadcast backwards to us, to ensure commands we process intersect
these later command's quorums. This means for both recovery and for dependency
calculation, anyone asking for information about a command intersecting those
ranges needs to encounter this command.
Ownership means we're responsible _for that command_
Touches means we're responsible for some _interactions_ with that command
Touches is always bigger than owns, but still represents ranges we're
responsible for.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]