dcapwell commented on code in PR #3679:
URL: https://github.com/apache/cassandra/pull/3679#discussion_r1847105488
##########
src/java/org/apache/cassandra/service/accord/CommandsForRanges.java:
##########
@@ -160,33 +158,279 @@ private <P1, T> T mapReduce(@Nonnull Timestamp
testTimestamp, @Nullable TxnId te
Invariants.checkState(testTxnId.equals(summary.findAsDep));
}
- // TODO (required): ensure we are excluding any ranges that are
now shard-redundant (not sure if this is enforced yet)
for (Range range : summary.ranges)
{
- if (!this.ranges.intersects(range))
- continue;
- collect.computeIfAbsent(range, ignore -> new
ArrayList<>()).add(summary);
+ if (keysOrRanges.intersects(range))
+ collect.computeIfAbsent(range, ignore -> new
ArrayList<>()).add(summary);
}
}));
- for (Map.Entry<Range, List<CommandsForRangesLoader.Summary>> e :
collect.entrySet())
+ for (Map.Entry<Range, List<Summary>> e : collect.entrySet())
{
- for (CommandsForRangesLoader.Summary command : e.getValue())
+ for (Summary command : e.getValue())
accumulate = map.apply(p1, e.getKey(), command.txnId,
command.executeAt, accumulate);
}
return accumulate;
}
- public CommandsForRanges slice(Ranges slice)
+ public static class Summary
+ {
+ public final @Nonnull TxnId txnId;
+ public final @Nonnull Timestamp executeAt;
+ public final @Nonnull SaveStatus saveStatus;
+ public final @Nonnull Ranges ranges;
+
+ public final TxnId findAsDep;
+ public final boolean hasAsDep;
+
+ @VisibleForTesting
+ Summary(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt, @Nonnull
SaveStatus saveStatus, @Nonnull Ranges ranges, TxnId findAsDep, boolean
hasAsDep)
+ {
+ this.txnId = txnId;
+ this.executeAt = executeAt;
+ this.saveStatus = saveStatus;
+ this.ranges = ranges;
+ this.findAsDep = findAsDep;
+ this.hasAsDep = hasAsDep;
+ }
+
+ public Summary slice(Ranges slice)
+ {
+ return new Summary(txnId, executeAt, saveStatus,
ranges.slice(slice, Minimal), findAsDep, hasAsDep);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Summary{" +
+ "txnId=" + txnId +
+ ", executeAt=" + executeAt +
+ ", saveStatus=" + saveStatus +
+ ", ranges=" + ranges +
+ ", findAsDep=" + findAsDep +
+ ", hasAsDep=" + hasAsDep +
+ '}';
+ }
+ }
+
+ public static class Manager implements AccordCache.Listener<TxnId, Command>
+ {
+ private final AccordCommandStore commandStore;
+ private final RoutesSearcher searcher = new RoutesSearcher();
+ private final NavigableMap<TxnId, Ranges> transitive = new TreeMap<>();
+ private final ObjectHashSet<TxnId> cachedRangeTxns = new
ObjectHashSet<>();
+
+ public Manager(AccordCommandStore commandStore)
+ {
+ this.commandStore = commandStore;
+ try (AccordCommandStore.ExclusiveCaches caches =
commandStore.lockCaches())
+ {
+ caches.commands().register(this);
+ }
+ }
+
+ @Override
+ public void onAdd(AccordCacheEntry<TxnId, Command> state)
+ {
+ TxnId txnId = state.key();
+ if (txnId.is(Routable.Domain.Range))
+ cachedRangeTxns.add(txnId);
+ }
+
+ @Override
+ public void onEvict(AccordCacheEntry<TxnId, Command> state)
+ {
+ TxnId txnId = state.key();
+ if (txnId.is(Routable.Domain.Range))
+ cachedRangeTxns.remove(txnId);
+ }
+
+ public CommandsForRanges.Loader loader(@Nullable TxnId primaryTxnId,
KeyHistory keyHistory, Unseekables<?> keysOrRanges)
+ {
+ RedundantBefore redundantBefore =
commandStore.unsafeGetRedundantBefore();
+ TxnId minTxnId = redundantBefore.min(keysOrRanges, e ->
e.gcBefore);
+ Timestamp maxTxnId = primaryTxnId == null || keyHistory ==
KeyHistory.RECOVER || !primaryTxnId.is(ExclusiveSyncPoint) ? Timestamp.MAX :
primaryTxnId;
+ TxnId findAsDep = primaryTxnId != null && keyHistory ==
KeyHistory.RECOVER ? primaryTxnId : null;
+ return new CommandsForRanges.Loader(this, keysOrRanges,
redundantBefore, minTxnId, maxTxnId, findAsDep);
+ }
+
+ public void mergeTransitive(TxnId txnId, Ranges ranges, BiFunction<?
super Ranges, ? super Ranges, ? extends Ranges> remappingFunction)
+ {
+ transitive.merge(txnId, ranges, remappingFunction);
+ }
+
+ public void gcBefore(TxnId gcBefore, Ranges ranges)
+ {
+ Iterator<Map.Entry<TxnId, Ranges>> iterator =
transitive.headMap(gcBefore).entrySet().iterator();
+ while (iterator.hasNext())
+ {
+ Map.Entry<TxnId, Ranges> e = iterator.next();
+ Ranges newRanges = e.getValue().without(ranges);
+ if (newRanges.isEmpty())
+ iterator.remove();
+ e.setValue(newRanges);
+ }
+ }
+ }
+
+ public static class Loader
{
- Ranges ranges = this.ranges.slice(slice, Minimal);
- NavigableMap<Timestamp, CommandsForRangesLoader.Summary> copy = new
TreeMap<>();
- for (Map.Entry<Timestamp, CommandsForRangesLoader.Summary> e :
map.entrySet())
+ private final Manager manager;
+ final Unseekables<?> searchKeysOrRanges;
+ final RedundantBefore redundantBefore;
+ final TxnId minTxnId;
+ final Timestamp maxTxnId;
+ @Nullable final TxnId findAsDep;
+
+ public Loader(Manager manager, Unseekables<?> searchKeysOrRanges,
RedundantBefore redundantBefore, TxnId minTxnId, Timestamp maxTxnId, @Nullable
TxnId findAsDep)
+ {
+ this.manager = manager;
+ this.searchKeysOrRanges = searchKeysOrRanges;
+ this.redundantBefore = redundantBefore;
+ this.minTxnId = minTxnId;
+ this.maxTxnId = maxTxnId;
+ this.findAsDep = findAsDep;
+ }
+
+ public void intersects(Consumer<TxnId> forEach)
{
- if (!e.getValue().ranges.intersects(slice)) continue;
- copy.put(e.getKey(), e.getValue().slice(slice));
+ switch (searchKeysOrRanges.domain())
+ {
+ case Range:
+ for (Unseekable range : searchKeysOrRanges)
+ manager.searcher.intersects(manager.commandStore.id(),
(TokenRange) range, minTxnId, maxTxnId, forEach);
+ break;
+ case Key:
+ for (Unseekable key : searchKeysOrRanges)
+ manager.searcher.intersects(manager.commandStore.id(),
(AccordRoutingKey) key, minTxnId, maxTxnId, forEach);
+ }
+
+ if (!manager.transitive.isEmpty())
+ {
+ for (var e : manager.transitive.tailMap(minTxnId,
true).entrySet())
+ {
+ if (e.getValue().intersects(searchKeysOrRanges))
+ forEach.accept(e.getKey());
+ }
+ }
+ }
+
+ public void forEachInCache(Consumer<Summary> forEach,
AccordCommandStore.Caches caches)
+ {
+ for (TxnId txnId : manager.cachedRangeTxns)
+ {
+ AccordCacheEntry<TxnId, Command> state =
caches.commands().getUnsafe(txnId);
+ Summary summary = from(state);
+ if (summary != null)
+ forEach.accept(summary);
+ }
+ }
+
+ public Summary load(TxnId txnId)
+ {
+ if (findAsDep == null)
+ {
+ SavedCommand.MinimalCommand cmd =
manager.commandStore.loadMinimal(txnId);
+ if (cmd != null)
+ return from(cmd);
+ }
+ else
+ {
+ Command cmd = manager.commandStore.loadCommand(txnId);
+ if (cmd != null)
+ return from(cmd);
+ }
+
+ Ranges ranges = manager.transitive.get(txnId);
+ if (ranges == null)
+ return null;
+
+ ranges = ranges.intersecting(searchKeysOrRanges);
+ if (ranges.isEmpty())
+ return null;
+
+ return new Summary(txnId, txnId, SaveStatus.NotDefined, ranges,
null, false);
+ }
+
+ public Summary from(AccordCacheEntry<TxnId, Command> state)
+ {
+ if (state.key().domain() != Routable.Domain.Range)
+ return null;
+
+ switch (state.status())
+ {
+ default: throw new AssertionError("Unhandled status: " +
state.status());
+ case LOADING:
+ case WAITING_TO_LOAD:
+ case UNINITIALIZED:
+ return null;
+
+ case LOADED:
+ case MODIFIED:
+ case SAVING:
+ case FAILED_TO_SAVE:
+ }
+
+ TxnId txnId = state.key();
+ if (!txnId.isVisible() || txnId.compareTo(minTxnId) < 0 ||
txnId.compareTo(maxTxnId) >= 0)
+ return null;
+
+ Command command = state.getExclusive();
+ if (command == null)
+ return null;
+ return from(command);
+ }
+
+ public Summary from(Command cmd)
+ {
+ return from(cmd.txnId(), cmd.executeAt(), cmd.saveStatus(),
cmd.participants(), cmd.partialDeps());
+ }
+
+ public Summary from(SavedCommand.MinimalCommand cmd)
+ {
+ Invariants.checkState(findAsDep == null);
+ return from(cmd.txnId, cmd.executeAt, cmd.saveStatus,
cmd.participants, null);
+ }
+
+ private Summary from(TxnId txnId, Timestamp executeAt, SaveStatus
saveStatus, StoreParticipants participants, @Nullable PartialDeps partialDeps)
+ {
+ if (participants == null)
+ return null;
+
+ Ranges keysOrRanges = participants.touches().toRanges();
Review Comment:
previously I have seen `touches` have more ranges than `owns`, so wondering
why we do this rather than `owns`? previously we would load then `slice` to
what is logically `owns`, so wondering why this change? we added slice to CFR
as we were including ranges outside of owns and you previously said that was a
bug, so feels like we are allowing that to come back?
##########
src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java:
##########
@@ -341,6 +343,7 @@ public RangesForEpoch ranges()
private <O> O mapReduce(Routables<?> keysOrRanges,
BiFunction<CommandsSummary, O, O> map, O accumulate)
{
+ Invariants.checkState(context.keys().containsAll(keysOrRanges));
Review Comment:
```suggestion
Invariants.checkState(context.keys().containsAll(keysOrRanges),
"Attempted to access keysOrRanges outside of what was asked for; asked for %s,
accessed %s", context.keys(), keysOrRanges);
```
previous logic would tell you what ranges you were missing, we shouldn't
loose that detail
##########
src/java/org/apache/cassandra/service/accord/AccordTask.java:
##########
@@ -941,7 +944,35 @@ void revert()
commandsForKey.forEach((k, v) -> v.revert());
}
- public class RangeScanner implements Runnable
+ protected void addToQueue(TaskQueue queue)
+ {
+ Invariants.checkState(queue.kind == state || (queue.kind ==
State.WAITING_TO_LOAD && state == WAITING_TO_SCAN_RANGES), "Invalid queue type:
%s vs %s", queue.kind, this, AccordTask::toDescription);
+ Invariants.checkState(this.queued == null, "Already queued with state:
%s", this, AccordTask::toDescription);
+ queued = queue;
+ queue.append(this);
+ }
+
+ TaskQueue<?> queued()
Review Comment:
```suggestion
@Nullable
TaskQueue<?> queued()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]