ifesdjeen commented on code in PR #233:
URL: https://github.com/apache/cassandra-accord/pull/233#discussion_r2247620833
##########
accord-core/src/main/java/accord/local/cfk/Serialize.java:
##########
@@ -77,6 +77,7 @@ public class Serialize
private static final int HAS_BALLOT_HEADER_BIT_SHIFT = 2;
private static final int HAS_STATUS_OVERRIDES_HEADER_BIT = 0x8;
private static final int HAS_STATUS_OVERRIDES_HEADER_BIT_SHIFT = 3;
+ private static final int HAS_MISSING_DEPS_FLAGS_HEADER_BIT = 0x10;
Review Comment:
nit: maybe add something like "bit 4 is set if there are any queries with
missing deps flags" into preamble header description
##########
accord-core/src/main/java/accord/local/CommandSummaries.java:
##########
@@ -60,185 +62,279 @@ enum SummaryStatus
INVALIDATED;
public static final SummaryStatus NONE = null;
+
+ private static final SummaryStatus[] SUMMARY_STATUSES = values();
}
- enum IsDep { IS_COORD_DEP, IS_NOT_COORD_DEP, NOT_ELIGIBLE, IS_STABLE_DEP,
IS_NOT_STABLE_DEP }
+ enum IsDep
+ {
+ IS_COORD_DEP, IS_NOT_COORD_DEP, NOT_ELIGIBLE, IS_STABLE_DEP,
IS_NOT_STABLE_DEP;
+ private static final IsDep[] IS_DEPS = values();
+ }
- class Summary
+ class Summary extends TxnId
{
- public final @Nonnull TxnId txnId;
- public final @Nonnull Timestamp executeAt;
- public final @Nonnull SummaryStatus status;
- public final @Nonnull Unseekables<?> participants;
+ private static final int SUMMARY_STATUS_MASK = 0x7;
+ private static final int IS_DEP_SHIFT = 3;
+ final @Nonnull Timestamp executeAt;
+ final int encoded;
+ final Unseekables<?> participants;
- public final IsDep dep;
- public final TxnId findAsDep;
+ public Summary slice(Ranges ranges)
+ {
+ return new Summary(this, this.executeAt, encoded,
participants.slice(ranges, Minimal));
+ }
@VisibleForTesting
- public Summary(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt,
@Nonnull SummaryStatus status, @Nonnull Unseekables<?> participants, IsDep dep,
TxnId findAsDep)
+ public Summary(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt,
@Nonnull SummaryStatus status, IsDep dep, Unseekables<?> participants)
{
- this.txnId = txnId;
- this.executeAt = executeAt;
- this.status = status;
+ super(txnId);
this.participants = participants;
- this.findAsDep = findAsDep;
- this.dep = dep;
+ this.executeAt = executeAt.equals(txnId) ? this : executeAt;
+ this.encoded = status.ordinal() | (dep == null ? Integer.MIN_VALUE
: (dep.ordinal() << IS_DEP_SHIFT));
+ }
+
+ private Summary(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt,
int encoded, Unseekables<?> participants)
+ {
+ super(txnId);
+ this.participants = participants;
+ this.executeAt = executeAt == txnId || executeAt.equals(txnId) ?
this : executeAt;
+ this.encoded = encoded;
+ }
+
+ public boolean is(IsDep isDep)
+ {
+ return (encoded >> IS_DEP_SHIFT) == isDep.ordinal();
}
- public Summary slice(Ranges slice)
+ public IsDep isDep()
{
- return new Summary(txnId, executeAt, status,
participants.slice(slice, Minimal), dep, findAsDep);
+ if (encoded < 0)
+ return null;
+ return IsDep.IS_DEPS[encoded >> IS_DEP_SHIFT];
+ }
+
+ public boolean is(SummaryStatus summaryStatus)
+ {
+ return (encoded & SUMMARY_STATUS_MASK) == summaryStatus.ordinal();
+ }
+
+ public SummaryStatus status()
+ {
+ int ordinal = encoded & SUMMARY_STATUS_MASK;
+ return SummaryStatus.SUMMARY_STATUSES[ordinal];
+ }
+
+ public TxnId plainTxnId()
+ {
+ return new TxnId(this);
+ }
+
+ public Timestamp plainExecuteAt()
+ {
+ return executeAt == this ? new Timestamp(this) : executeAt;
}
@Override
public String toString()
{
return "Summary{" +
- "txnId=" + txnId +
- ", executeAt=" + executeAt +
- ", saveStatus=" + status +
- ", participants=" + participants +
- ", maybeDep=" + dep +
- ", findAsDep=" + findAsDep +
+ "txnId=" + plainTxnId() +
+ ", executeAt=" + plainExecuteAt() +
+ ", saveStatus=" + status() +
+ ", isDep=" + isDep() +
'}';
}
+ }
- public static class Loader
+ class SummaryLoader
+ {
+ public interface Factory<L extends SummaryLoader>
{
- public interface Factory<L extends Loader>
- {
- L create(@Nullable TxnId primaryTxnId, Unseekables<?>
searchKeysOrRanges, RedundantBefore redundantBefore, Kinds testKind, TxnId
minTxnId, Timestamp maxTxnId, @Nullable TxnId findAsDep);
- }
+ L create(RedundantBefore redundantBefore, @Nullable MaxDecidedRX
maxDecidedRX, TxnId primaryTxnId, Unseekables<?> searchKeysOrRanges, Kinds
testKind, TxnId minTxnId, Timestamp maxTxnId, @Nullable TxnId findAsDep);
+ }
- protected final Unseekables<?> searchKeysOrRanges;
- protected final RedundantBefore redundantBefore;
- // TODO (expected): separate out Kinds we need before/after
primaryTxnId/executeAt
- protected final Kinds testKind;
- protected final TxnId minTxnId;
- protected final Timestamp maxTxnId;
- @Nullable protected final TxnId findAsDep;
+ protected final RedundantBefore redundantBefore;
+ protected final MaxDecidedRX maxDecidedRX;
+ protected final Unseekables<?> searchKeysOrRanges;
+ // TODO (expected): separate out Kinds we need before/after
primaryTxnId/executeAt
+ protected final Kinds testKind;
+ protected final TxnId primaryTxnId, findAsDep, minTxnId, minDecidedId;
+ protected final Timestamp maxTxnId;
- // TODO (expected): provide executeAt to PreLoadContext so we can
more aggressively filter what we load, esp. by Kind
- public static Loader loader(RedundantBefore redundantBefore,
PreLoadContext context)
- {
- return loader(redundantBefore, context.primaryTxnId(),
context.loadKeysFor(), context.keys());
- }
+ // TODO (expected): provide executeAt to PreLoadContext so we can more
aggressively filter what we load, esp. by Kind
+ public static SummaryLoader loader(RedundantBefore redundantBefore,
MaxDecidedRX maxDecidedRX, PreLoadContext context)
+ {
+ return loader(redundantBefore, maxDecidedRX,
context.primaryTxnId(), context.loadKeysFor(), context.keys());
+ }
- public static Loader loader(RedundantBefore redundantBefore,
@Nullable TxnId primaryTxnId, LoadKeysFor loadKeysFor, Unseekables<?>
keysOrRanges)
- {
- return loader(redundantBefore, primaryTxnId, loadKeysFor,
keysOrRanges, Loader::new);
- }
+ public static SummaryLoader loader(RedundantBefore redundantBefore,
MaxDecidedRX maxDecidedRX, TxnId primaryTxnId, LoadKeysFor loadKeysFor,
Unseekables<?> keysOrRanges)
+ {
+ return loader(redundantBefore, maxDecidedRX, primaryTxnId,
loadKeysFor, keysOrRanges, SummaryLoader::new);
+ }
- public static <L extends Loader> L loader(RedundantBefore
redundantBefore, @Nullable TxnId primaryTxnId, LoadKeysFor loadKeysFor,
Unseekables<?> keysOrRanges, Factory<L> factory)
- {
- TxnId minTxnId = redundantBefore.min(keysOrRanges,
Bounds::gcBefore);
- Timestamp maxTxnId = primaryTxnId == null || loadKeysFor ==
RECOVERY || !primaryTxnId.is(ExclusiveSyncPoint) ? Timestamp.MAX : primaryTxnId;
- TxnId findAsDep = primaryTxnId != null && loadKeysFor ==
RECOVERY ? primaryTxnId : null;
- Kinds kinds = primaryTxnId == null ? AnyGloballyVisible :
primaryTxnId.witnesses().or(loadKeysFor == RECOVERY ?
primaryTxnId.witnessedBy() : Nothing);
- return factory.create(primaryTxnId, keysOrRanges,
redundantBefore, kinds, minTxnId, maxTxnId, findAsDep);
- }
+ public static <L extends SummaryLoader> L loader(RedundantBefore
redundantBefore, MaxDecidedRX maxDecidedRX, PreLoadContext context, Factory<L>
factory)
+ {
+ return loader(redundantBefore, maxDecidedRX,
context.primaryTxnId(), context.loadKeysFor(), context.keys(), factory);
+ }
- public Loader(@Nullable TxnId primaryTxnId, Unseekables<?>
searchKeysOrRanges, RedundantBefore redundantBefore, Kinds testKind, TxnId
minTxnId, Timestamp maxTxnId, @Nullable TxnId findAsDep)
- {
- this.searchKeysOrRanges = searchKeysOrRanges;
- this.redundantBefore = redundantBefore;
- this.testKind = testKind;
- this.minTxnId = minTxnId;
- this.maxTxnId = maxTxnId;
- this.findAsDep = findAsDep;
- }
+ public static <L extends SummaryLoader> L loader(RedundantBefore
redundantBefore, MaxDecidedRX maxDecidedRX, TxnId primaryTxnId, LoadKeysFor
loadKeysFor, Unseekables<?> keysOrRanges, Factory<L> factory)
+ {
+ Invariants.require(primaryTxnId != null);
+ TxnId minTxnId = redundantBefore.min(keysOrRanges,
Bounds::gcBefore);
+ Timestamp maxTxnId = loadKeysFor == RECOVERY ||
!primaryTxnId.is(ExclusiveSyncPoint) ? Timestamp.MAX : primaryTxnId;
Review Comment:
nit: maybe best remove `!` and flip arguments?
##########
accord-core/src/main/java/accord/local/CommandSummaries.java:
##########
@@ -60,185 +62,279 @@ enum SummaryStatus
INVALIDATED;
public static final SummaryStatus NONE = null;
+
+ private static final SummaryStatus[] SUMMARY_STATUSES = values();
}
- enum IsDep { IS_COORD_DEP, IS_NOT_COORD_DEP, NOT_ELIGIBLE, IS_STABLE_DEP,
IS_NOT_STABLE_DEP }
+ enum IsDep
+ {
+ IS_COORD_DEP, IS_NOT_COORD_DEP, NOT_ELIGIBLE, IS_STABLE_DEP,
IS_NOT_STABLE_DEP;
+ private static final IsDep[] IS_DEPS = values();
+ }
- class Summary
+ class Summary extends TxnId
{
- public final @Nonnull TxnId txnId;
- public final @Nonnull Timestamp executeAt;
- public final @Nonnull SummaryStatus status;
- public final @Nonnull Unseekables<?> participants;
+ private static final int SUMMARY_STATUS_MASK = 0x7;
+ private static final int IS_DEP_SHIFT = 3;
+ final @Nonnull Timestamp executeAt;
+ final int encoded;
+ final Unseekables<?> participants;
- public final IsDep dep;
- public final TxnId findAsDep;
+ public Summary slice(Ranges ranges)
+ {
+ return new Summary(this, this.executeAt, encoded,
participants.slice(ranges, Minimal));
+ }
@VisibleForTesting
- public Summary(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt,
@Nonnull SummaryStatus status, @Nonnull Unseekables<?> participants, IsDep dep,
TxnId findAsDep)
+ public Summary(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt,
@Nonnull SummaryStatus status, IsDep dep, Unseekables<?> participants)
{
- this.txnId = txnId;
- this.executeAt = executeAt;
- this.status = status;
+ super(txnId);
this.participants = participants;
- this.findAsDep = findAsDep;
- this.dep = dep;
+ this.executeAt = executeAt.equals(txnId) ? this : executeAt;
+ this.encoded = status.ordinal() | (dep == null ? Integer.MIN_VALUE
: (dep.ordinal() << IS_DEP_SHIFT));
+ }
+
+ private Summary(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt,
int encoded, Unseekables<?> participants)
+ {
+ super(txnId);
+ this.participants = participants;
+ this.executeAt = executeAt == txnId || executeAt.equals(txnId) ?
this : executeAt;
+ this.encoded = encoded;
+ }
+
+ public boolean is(IsDep isDep)
+ {
+ return (encoded >> IS_DEP_SHIFT) == isDep.ordinal();
}
- public Summary slice(Ranges slice)
+ public IsDep isDep()
{
- return new Summary(txnId, executeAt, status,
participants.slice(slice, Minimal), dep, findAsDep);
+ int ordinal = encoded >> IS_DEP_SHIFT;
+ return ordinal < 0 ? null : IsDep.IS_DEPS[ordinal];
+ }
+
+ public boolean is(SummaryStatus summaryStatus)
+ {
+ return (encoded & SUMMARY_STATUS_MASK) == summaryStatus.ordinal();
+ }
+
+ public SummaryStatus status()
+ {
+ int ordinal = encoded & SUMMARY_STATUS_MASK;
+ return SummaryStatus.SUMMARY_STATUSES[ordinal];
+ }
+
+ public TxnId plainTxnId()
+ {
+ return new TxnId(this);
+ }
+
+ public Timestamp plainExecuteAt()
+ {
+ return executeAt == this ? new Timestamp(this) : executeAt;
}
@Override
public String toString()
{
return "Summary{" +
- "txnId=" + txnId +
- ", executeAt=" + executeAt +
- ", saveStatus=" + status +
- ", participants=" + participants +
- ", maybeDep=" + dep +
- ", findAsDep=" + findAsDep +
+ "txnId=" + plainTxnId() +
+ ", executeAt=" + plainExecuteAt() +
+ ", saveStatus=" + status() +
+ ", isDep=" + isDep() +
'}';
}
+ }
- public static class Loader
+ class SummaryLoader
+ {
+ public interface Factory<L extends SummaryLoader, P>
{
- public interface Factory<L extends Loader>
- {
- L create(@Nullable TxnId primaryTxnId, Unseekables<?>
searchKeysOrRanges, RedundantBefore redundantBefore, Kinds testKind, TxnId
minTxnId, Timestamp maxTxnId, @Nullable TxnId findAsDep);
- }
+ L create(P param, RedundantBefore redundantBefore, @Nullable
MaxDecidedRX maxDecidedRX, TxnId primaryTxnId, Unseekables<?>
searchKeysOrRanges, Kinds testKind, TxnId minTxnId, Timestamp maxTxnId,
@Nullable TxnId findAsDep);
+ }
- protected final Unseekables<?> searchKeysOrRanges;
- protected final RedundantBefore redundantBefore;
- // TODO (expected): separate out Kinds we need before/after
primaryTxnId/executeAt
- protected final Kinds testKind;
- protected final TxnId minTxnId;
- protected final Timestamp maxTxnId;
- @Nullable protected final TxnId findAsDep;
+ protected final RedundantBefore redundantBefore;
+ protected final MaxDecidedRX maxDecidedRX;
+ protected final Unseekables<?> searchKeysOrRanges;
+ // TODO (expected): separate out Kinds we need before/after
primaryTxnId/executeAt
+ protected final Kinds testKind;
+ protected final TxnId primaryTxnId, findAsDep, minTxnId, minDecidedId;
+ protected final Timestamp maxTxnId;
+// protected final TxnId minDecidedId;
+
+ // TODO (expected): provide executeAt to PreLoadContext so we can more
aggressively filter what we load, esp. by Kind
+ public static SummaryLoader loader(RedundantBefore redundantBefore,
MaxDecidedRX maxDecidedRX, PreLoadContext context)
+ {
+ return loader(redundantBefore, maxDecidedRX,
context.primaryTxnId(), context.loadKeysFor(), context.keys());
+ }
- // TODO (expected): provide executeAt to PreLoadContext so we can
more aggressively filter what we load, esp. by Kind
- public static Loader loader(RedundantBefore redundantBefore,
PreLoadContext context)
- {
- return loader(redundantBefore, context.primaryTxnId(),
context.loadKeysFor(), context.keys());
- }
+ public static SummaryLoader loader(RedundantBefore redundantBefore,
MaxDecidedRX maxDecidedRX, TxnId primaryTxnId, LoadKeysFor loadKeysFor,
Unseekables<?> keysOrRanges)
+ {
+ return loader(null, redundantBefore, maxDecidedRX, primaryTxnId,
loadKeysFor, keysOrRanges, SummaryLoader::new);
+ }
- public static Loader loader(RedundantBefore redundantBefore,
@Nullable TxnId primaryTxnId, LoadKeysFor loadKeysFor, Unseekables<?>
keysOrRanges)
- {
- return loader(redundantBefore, primaryTxnId, loadKeysFor,
keysOrRanges, Loader::new);
- }
+ public static <L extends SummaryLoader, P> L loader(P param,
RedundantBefore redundantBefore, MaxDecidedRX maxDecidedRX, PreLoadContext
context, Factory<L, P> factory)
+ {
+ return loader(param, redundantBefore, maxDecidedRX,
context.primaryTxnId(), context.loadKeysFor(), context.keys(), factory);
+ }
- public static <L extends Loader> L loader(RedundantBefore
redundantBefore, @Nullable TxnId primaryTxnId, LoadKeysFor loadKeysFor,
Unseekables<?> keysOrRanges, Factory<L> factory)
- {
- TxnId minTxnId = redundantBefore.min(keysOrRanges,
Bounds::gcBefore);
- Timestamp maxTxnId = primaryTxnId == null || loadKeysFor ==
RECOVERY || !primaryTxnId.is(ExclusiveSyncPoint) ? Timestamp.MAX : primaryTxnId;
- TxnId findAsDep = primaryTxnId != null && loadKeysFor ==
RECOVERY ? primaryTxnId : null;
- Kinds kinds = primaryTxnId == null ? AnyGloballyVisible :
primaryTxnId.witnesses().or(loadKeysFor == RECOVERY ?
primaryTxnId.witnessedBy() : Nothing);
- return factory.create(primaryTxnId, keysOrRanges,
redundantBefore, kinds, minTxnId, maxTxnId, findAsDep);
- }
+ public static <L extends SummaryLoader, P> L loader(P param,
RedundantBefore redundantBefore, MaxDecidedRX maxDecidedRX, TxnId primaryTxnId,
LoadKeysFor loadKeysFor, Unseekables<?> keysOrRanges, Factory<L, P> factory)
+ {
+ Invariants.require(primaryTxnId != null);
+ TxnId minTxnId = redundantBefore.min(keysOrRanges,
Bounds::gcBefore);
+ Timestamp maxTxnId = loadKeysFor == RECOVERY ||
!primaryTxnId.is(ExclusiveSyncPoint) ? Timestamp.MAX : primaryTxnId;
+ TxnId findAsDep = loadKeysFor == RECOVERY ? primaryTxnId : null;
+ Kinds kinds = primaryTxnId.witnesses().or(loadKeysFor == RECOVERY
? primaryTxnId.witnessedBy() : Nothing);
+ if (!primaryTxnId.is(Txn.Kind.ExclusiveSyncPoint))
+ maxDecidedRX = null;
+ return factory.create(param, redundantBefore, maxDecidedRX,
primaryTxnId, keysOrRanges, kinds, minTxnId, maxTxnId, findAsDep);
+ }
- public Loader(@Nullable TxnId primaryTxnId, Unseekables<?>
searchKeysOrRanges, RedundantBefore redundantBefore, Kinds testKind, TxnId
minTxnId, Timestamp maxTxnId, @Nullable TxnId findAsDep)
- {
- this.searchKeysOrRanges = searchKeysOrRanges;
- this.redundantBefore = redundantBefore;
- this.testKind = testKind;
- this.minTxnId = minTxnId;
- this.maxTxnId = maxTxnId;
- this.findAsDep = findAsDep;
- }
+ public SummaryLoader(Object ignore, RedundantBefore redundantBefore,
MaxDecidedRX maxDecidedRX, TxnId primaryTxnId, Unseekables<?>
searchKeysOrRanges, Kinds testKind, TxnId minTxnId, Timestamp maxTxnId,
@Nullable TxnId findAsDep)
+ {
+ this.redundantBefore = redundantBefore;
+ this.maxDecidedRX = maxDecidedRX;
+ this.primaryTxnId = primaryTxnId;
+ this.searchKeysOrRanges = searchKeysOrRanges;
+ this.testKind = testKind;
+ this.minTxnId = minTxnId;
+ this.maxTxnId = maxTxnId;
+ this.findAsDep = findAsDep;
+ this.minDecidedId = minDecidedDependencyId(maxDecidedRX,
searchKeysOrRanges, primaryTxnId);
+ }
- public Summary ifRelevant(Command cmd)
- {
- return ifRelevant(cmd.txnId(), cmd.executeAtOrTxnId(),
cmd.saveStatus(), cmd.participants(), cmd.partialDeps());
- }
+ public boolean isRelevant(CommandsForKey cfk)
+ {
+ if (cfk == null || cfk.size() == 0)
+ return false;
- private boolean isEligibleDep(SummaryStatus status, TxnId
findAsDep, TxnId txnId, Timestamp executeAt)
+ // NOTE: we CANNOT safely filter on first element, as we may have
pruned dependencies we need to witness
+ // and that will be populated on the receiving replicas as
necessary - that is,
+ // we must permit adopting future dependencies
+ CommandsForKey.TxnInfo last = cfk.get(cfk.size() - 1);
+ if (last.compareTo(minTxnId) < 0)
+ return false;
+
+ if (maxDecidedRX == null)
+ return true;
+
+ CommandsForKey.TxnInfo minUndecided = cfk.minUndecided();
+ if (minUndecided != null)
+ return true;
+
+ if (minDecidedId != null && last.compareTo(minDecidedId) < 0)
+ return false;
+
+ TxnId minDecidedId =
maxDecidedRX.minDecidedDependencyId(cfk.key(), primaryTxnId);
+ return minDecidedId == null || last.compareTo(minDecidedId) >= 0;
+ }
+
+ public final Summary ifRelevant(Command cmd)
+ {
+ return ifRelevant(cmd.txnId(), cmd.executeAtOrTxnId(),
cmd.saveStatus(), cmd.participants(), cmd.partialDeps());
+ }
+
+ final boolean isEligibleDep(SummaryStatus status, TxnId findAsDep,
TxnId txnId, Timestamp executeAt)
+ {
+ switch (status)
{
- switch (status)
- {
- default: throw new UnhandledEnum(status);
- case NOT_DIRECTLY_WITNESSED:
- case INVALIDATED:
+ default: throw new UnhandledEnum(status);
+ case NOT_DIRECTLY_WITNESSED:
+ case INVALIDATED:
+ return false;
+ case NOTACCEPTED:
+ case PREACCEPTED:
+ if
(!txnId.is(TxnId.FastPath.PrivilegedCoordinatorWithDeps))
return false;
- case NOTACCEPTED:
- case PREACCEPTED:
- if
(!txnId.is(TxnId.FastPath.PrivilegedCoordinatorWithDeps))
- return false;
- case ACCEPTED:
- return txnId.compareTo(findAsDep) > 0;
- case COMMITTED:
- case APPLIED:
- case STABLE:
- return executeAt.compareTo(findAsDep) > 0;
- }
+ case ACCEPTED:
+ return txnId.compareTo(findAsDep) > 0;
+ case COMMITTED:
+ case APPLIED:
+ case STABLE:
+ return executeAt.compareTo(findAsDep) > 0;
}
+ }
- public Summary ifRelevant(TxnId txnId, Timestamp executeAt,
SaveStatus saveStatus, StoreParticipants participants, @Nullable PartialDeps
partialDeps)
- {
- if (participants == null)
- return null;
+ public final Summary ifRelevant(TxnId txnId, Timestamp executeAt,
SaveStatus saveStatus, StoreParticipants participants, @Nullable PartialDeps
partialDeps)
+ {
+ if (participants == null)
+ return null;
- return ifRelevant(txnId, executeAt, saveStatus,
participants.touches(), partialDeps);
- }
+ return ifRelevant(txnId, executeAt, saveStatus,
participants.touches(), partialDeps);
+ }
- public Summary ifRelevant(TxnId txnId, Timestamp executeAt,
SaveStatus saveStatus, Participants<?> touches, @Nullable PartialDeps
partialDeps)
+ public final Summary ifRelevant(TxnId txnId, Timestamp executeAt,
SaveStatus saveStatus, Participants<?> touches, @Nullable PartialDeps
partialDeps)
+ {
+ SummaryStatus summaryStatus = saveStatus.summary;
+ if (summaryStatus == null)
+ return null;
+
+ if (!txnId.is(testKind))
+ return null;
+
+ boolean mayFilterAsDecided = maxDecidedRX != null &&
(saveStatus.compareTo(SaveStatus.PreCommitted) >= 0 ||
txnId.is(ExclusiveSyncPoint));
+ if (mayFilterAsDecided && minDecidedId != null &&
txnId.compareTo(minDecidedId) < 0)
+ return null;
+
+ // start in search key domain, since this is what we consult to
decide if can be recovered
+ Unseekables<?> intersecting =
searchKeysOrRanges.intersecting(touches, Minimal);
+ if (intersecting.isEmpty())
+ return null;
+
+ if (redundantBefore != null)
{
- SummaryStatus summaryStatus = saveStatus.summary;
- if (summaryStatus == null)
+ // TODO (expected): consider whether this is necessary (and
document it).
+ Unseekables<?> newIntersecting =
redundantBefore.foldlWithBounds(intersecting, (e, accum, start, end) -> {
+ if (e.gcBefore.compareTo(txnId) <= 0)
+ return accum;
+ return
accum.without(Ranges.of(start.rangeFactory().newRange(start, end)));
+ }, intersecting, ignore -> false);
+
+ if (newIntersecting.isEmpty())
return null;
- if (!txnId.is(testKind))
- return null;
+ intersecting = newIntersecting;
+ }
- // start in search key domain, since this is what we consult
to decide if can be recovered
- Unseekables<?> intersecting =
searchKeysOrRanges.intersecting(touches, Minimal);
- if (intersecting.isEmpty())
+ if (mayFilterAsDecided)
+ {
+ TxnId minDecidedId = minDecidedDependencyId(maxDecidedRX,
intersecting, primaryTxnId);
Review Comment:
Variable shadowing looks intentional in both cases, but maybe rename to
include "interesting" or something similar?
##########
accord-core/src/main/java/accord/local/CommandStore.java:
##########
@@ -666,7 +663,62 @@ protected void updatedRedundantBefore(SafeCommandStore
safeStore, RedundantBefor
listeners.clearBefore(this, clearWaitingBefore);
}
- protected void markSynced(SafeCommandStore safeStore, TxnId syncId, Ranges
ranges)
+ protected final Ranges isWaitingOnSync(TxnId syncId, Ranges ranges)
+ {
+ if (waitingOnSync.isEmpty())
+ return Ranges.EMPTY;
+
+ Ranges waitingOn = Ranges.EMPTY;
+ for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
+ {
+ if (e.getKey() > syncId.epoch())
+ break;
+
+ Ranges remaining = e.getValue().waitingOn;
+ Ranges intersecting = remaining.slice(ranges, Minimal);
+ if (!intersecting.isEmpty())
+ {
+ ranges = ranges.without(intersecting);
+ waitingOn = waitingOn.with(intersecting);
+ }
+ }
+
+ return waitingOn;
+ }
+
+ protected final void markSyncing(TxnId syncId, Ranges ranges)
+ {
+ if (waitingOnSync.isEmpty())
+ return;
+
+ for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
+ {
+ if (e.getKey() > syncId.epoch())
+ break;
+
+ Ranges remaining = e.getValue().waitingOn.without(ranges);
+ if (e.getValue().waitingOn != remaining)
+ e.getValue().waitingOn = remaining;
+ }
+ }
+
+ protected final void unmarkSyncing(TxnId syncId, Ranges ranges)
+ {
+ if (waitingOnSync.isEmpty())
+ return;
+
+ for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
+ {
+ if (e.getKey() > syncId.epoch())
+ break;
+
+ Ranges unmark = e.getValue().waitingOnDurable.slice(ranges,
Minimal);
+ if (!unmark.isEmpty())
Review Comment:
should we disassociate the key if empty?
##########
accord-core/src/main/java/accord/impl/InMemoryCommandStore.java:
##########
@@ -420,30 +429,32 @@ protected final InMemorySafeStore
createSafeStore(PreLoadContext context, Ranges
Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKey = new
HashMap<>();
context.forEachId(txnId -> commands.put(txnId, lazyReference(txnId)));
-
- for (Unseekable unseekable : context.keys())
+ if (context.loadKeys() != NONE)
{
- switch (unseekable.domain())
+ Unseekables unseekables = context.keys();
+ if (unseekables.domain() == Key)
{
- case Key:
- RoutableKey key = (RoutableKey) unseekable;
- switch (context.loadKeys())
- {
- case NONE:
- continue;
- case INCR:
- case SYNC:
- case ASYNC:
- commandsForKey.put(key,
commandsForKey((RoutingKey) key).createSafeReference());
- break;
- default: throw new
UnsupportedOperationException("Unknown key history: " + context.loadKeys());
- }
- break;
- case Range:
- // load range cfks here
- break;
+ for (RoutingKey key : (AbstractUnseekableKeys)unseekables)
+ commandsForKey.put(key,
commandsForKey(key).createSafeReference());
+ }
+ else
+ {
+ CommandSummaries.SummaryLoader loader =
CommandSummaries.SummaryLoader.loader(unsafeGetRedundantBefore(),
unsafeGetMaxDecidedRX(), context);
+ for (GlobalCommandsForKey global :
this.commandsForKey.values())
Review Comment:
There should be a way to avoid scanning over all keys linearly here, maybe
with a range multi-map or something? Not saying that we should do this in this
patch, but maybe worth adding a TODO?
##########
accord-core/src/main/java/accord/local/CommandStore.java:
##########
@@ -666,7 +663,62 @@ protected void updatedRedundantBefore(SafeCommandStore
safeStore, RedundantBefor
listeners.clearBefore(this, clearWaitingBefore);
}
- protected void markSynced(SafeCommandStore safeStore, TxnId syncId, Ranges
ranges)
+ protected final Ranges isWaitingOnSync(TxnId syncId, Ranges ranges)
+ {
+ if (waitingOnSync.isEmpty())
+ return Ranges.EMPTY;
+
+ Ranges waitingOn = Ranges.EMPTY;
+ for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
+ {
+ if (e.getKey() > syncId.epoch())
+ break;
+
+ Ranges remaining = e.getValue().waitingOn;
+ Ranges intersecting = remaining.slice(ranges, Minimal);
+ if (!intersecting.isEmpty())
+ {
+ ranges = ranges.without(intersecting);
+ waitingOn = waitingOn.with(intersecting);
+ }
+ }
+
+ return waitingOn;
+ }
+
+ protected final void markSyncing(TxnId syncId, Ranges ranges)
+ {
+ if (waitingOnSync.isEmpty())
+ return;
+
+ for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
+ {
+ if (e.getKey() > syncId.epoch())
+ break;
+
+ Ranges remaining = e.getValue().waitingOn.without(ranges);
+ if (e.getValue().waitingOn != remaining)
Review Comment:
nit: we can just always assign: if it's equal, it will remain unchanged, and
new value will get assigned otherwise
##########
accord-core/src/main/java/accord/local/CommandSummaries.java:
##########
@@ -60,185 +62,279 @@ enum SummaryStatus
INVALIDATED;
public static final SummaryStatus NONE = null;
+
+ private static final SummaryStatus[] SUMMARY_STATUSES = values();
}
- enum IsDep { IS_COORD_DEP, IS_NOT_COORD_DEP, NOT_ELIGIBLE, IS_STABLE_DEP,
IS_NOT_STABLE_DEP }
+ enum IsDep
+ {
+ IS_COORD_DEP, IS_NOT_COORD_DEP, NOT_ELIGIBLE, IS_STABLE_DEP,
IS_NOT_STABLE_DEP;
+ private static final IsDep[] IS_DEPS = values();
+ }
- class Summary
+ class Summary extends TxnId
{
- public final @Nonnull TxnId txnId;
- public final @Nonnull Timestamp executeAt;
- public final @Nonnull SummaryStatus status;
- public final @Nonnull Unseekables<?> participants;
+ private static final int SUMMARY_STATUS_MASK = 0x7;
+ private static final int IS_DEP_SHIFT = 3;
+ final @Nonnull Timestamp executeAt;
+ final int encoded;
+ final Unseekables<?> participants;
- public final IsDep dep;
- public final TxnId findAsDep;
+ public Summary slice(Ranges ranges)
+ {
+ return new Summary(this, this.executeAt, encoded,
participants.slice(ranges, Minimal));
+ }
@VisibleForTesting
- public Summary(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt,
@Nonnull SummaryStatus status, @Nonnull Unseekables<?> participants, IsDep dep,
TxnId findAsDep)
+ public Summary(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt,
@Nonnull SummaryStatus status, IsDep dep, Unseekables<?> participants)
{
- this.txnId = txnId;
- this.executeAt = executeAt;
- this.status = status;
+ super(txnId);
this.participants = participants;
- this.findAsDep = findAsDep;
- this.dep = dep;
+ this.executeAt = executeAt.equals(txnId) ? this : executeAt;
+ this.encoded = status.ordinal() | (dep == null ? Integer.MIN_VALUE
: (dep.ordinal() << IS_DEP_SHIFT));
+ }
+
+ private Summary(@Nonnull TxnId txnId, @Nonnull Timestamp executeAt,
int encoded, Unseekables<?> participants)
+ {
+ super(txnId);
+ this.participants = participants;
+ this.executeAt = executeAt == txnId || executeAt.equals(txnId) ?
this : executeAt;
+ this.encoded = encoded;
+ }
+
+ public boolean is(IsDep isDep)
+ {
+ return (encoded >> IS_DEP_SHIFT) == isDep.ordinal();
}
- public Summary slice(Ranges slice)
+ public IsDep isDep()
{
- return new Summary(txnId, executeAt, status,
participants.slice(slice, Minimal), dep, findAsDep);
+ if (encoded < 0)
+ return null;
+ return IsDep.IS_DEPS[encoded >> IS_DEP_SHIFT];
+ }
+
+ public boolean is(SummaryStatus summaryStatus)
+ {
+ return (encoded & SUMMARY_STATUS_MASK) == summaryStatus.ordinal();
+ }
+
+ public SummaryStatus status()
+ {
+ int ordinal = encoded & SUMMARY_STATUS_MASK;
+ return SummaryStatus.SUMMARY_STATUSES[ordinal];
+ }
+
+ public TxnId plainTxnId()
+ {
+ return new TxnId(this);
+ }
+
+ public Timestamp plainExecuteAt()
+ {
+ return executeAt == this ? new Timestamp(this) : executeAt;
}
@Override
public String toString()
{
return "Summary{" +
- "txnId=" + txnId +
- ", executeAt=" + executeAt +
- ", saveStatus=" + status +
- ", participants=" + participants +
- ", maybeDep=" + dep +
- ", findAsDep=" + findAsDep +
+ "txnId=" + plainTxnId() +
+ ", executeAt=" + plainExecuteAt() +
+ ", saveStatus=" + status() +
+ ", isDep=" + isDep() +
'}';
}
+ }
- public static class Loader
+ class SummaryLoader
+ {
+ public interface Factory<L extends SummaryLoader>
{
- public interface Factory<L extends Loader>
- {
- L create(@Nullable TxnId primaryTxnId, Unseekables<?>
searchKeysOrRanges, RedundantBefore redundantBefore, Kinds testKind, TxnId
minTxnId, Timestamp maxTxnId, @Nullable TxnId findAsDep);
- }
+ L create(RedundantBefore redundantBefore, @Nullable MaxDecidedRX
maxDecidedRX, TxnId primaryTxnId, Unseekables<?> searchKeysOrRanges, Kinds
testKind, TxnId minTxnId, Timestamp maxTxnId, @Nullable TxnId findAsDep);
+ }
- protected final Unseekables<?> searchKeysOrRanges;
- protected final RedundantBefore redundantBefore;
- // TODO (expected): separate out Kinds we need before/after
primaryTxnId/executeAt
- protected final Kinds testKind;
- protected final TxnId minTxnId;
- protected final Timestamp maxTxnId;
- @Nullable protected final TxnId findAsDep;
+ protected final RedundantBefore redundantBefore;
+ protected final MaxDecidedRX maxDecidedRX;
+ protected final Unseekables<?> searchKeysOrRanges;
+ // TODO (expected): separate out Kinds we need before/after
primaryTxnId/executeAt
+ protected final Kinds testKind;
+ protected final TxnId primaryTxnId, findAsDep, minTxnId, minDecidedId;
+ protected final Timestamp maxTxnId;
- // TODO (expected): provide executeAt to PreLoadContext so we can
more aggressively filter what we load, esp. by Kind
- public static Loader loader(RedundantBefore redundantBefore,
PreLoadContext context)
- {
- return loader(redundantBefore, context.primaryTxnId(),
context.loadKeysFor(), context.keys());
- }
+ // TODO (expected): provide executeAt to PreLoadContext so we can more
aggressively filter what we load, esp. by Kind
+ public static SummaryLoader loader(RedundantBefore redundantBefore,
MaxDecidedRX maxDecidedRX, PreLoadContext context)
+ {
+ return loader(redundantBefore, maxDecidedRX,
context.primaryTxnId(), context.loadKeysFor(), context.keys());
+ }
- public static Loader loader(RedundantBefore redundantBefore,
@Nullable TxnId primaryTxnId, LoadKeysFor loadKeysFor, Unseekables<?>
keysOrRanges)
- {
- return loader(redundantBefore, primaryTxnId, loadKeysFor,
keysOrRanges, Loader::new);
- }
+ public static SummaryLoader loader(RedundantBefore redundantBefore,
MaxDecidedRX maxDecidedRX, TxnId primaryTxnId, LoadKeysFor loadKeysFor,
Unseekables<?> keysOrRanges)
+ {
+ return loader(redundantBefore, maxDecidedRX, primaryTxnId,
loadKeysFor, keysOrRanges, SummaryLoader::new);
+ }
- public static <L extends Loader> L loader(RedundantBefore
redundantBefore, @Nullable TxnId primaryTxnId, LoadKeysFor loadKeysFor,
Unseekables<?> keysOrRanges, Factory<L> factory)
- {
- TxnId minTxnId = redundantBefore.min(keysOrRanges,
Bounds::gcBefore);
- Timestamp maxTxnId = primaryTxnId == null || loadKeysFor ==
RECOVERY || !primaryTxnId.is(ExclusiveSyncPoint) ? Timestamp.MAX : primaryTxnId;
- TxnId findAsDep = primaryTxnId != null && loadKeysFor ==
RECOVERY ? primaryTxnId : null;
- Kinds kinds = primaryTxnId == null ? AnyGloballyVisible :
primaryTxnId.witnesses().or(loadKeysFor == RECOVERY ?
primaryTxnId.witnessedBy() : Nothing);
- return factory.create(primaryTxnId, keysOrRanges,
redundantBefore, kinds, minTxnId, maxTxnId, findAsDep);
- }
+ public static <L extends SummaryLoader> L loader(RedundantBefore
redundantBefore, MaxDecidedRX maxDecidedRX, PreLoadContext context, Factory<L>
factory)
+ {
+ return loader(redundantBefore, maxDecidedRX,
context.primaryTxnId(), context.loadKeysFor(), context.keys(), factory);
+ }
- public Loader(@Nullable TxnId primaryTxnId, Unseekables<?>
searchKeysOrRanges, RedundantBefore redundantBefore, Kinds testKind, TxnId
minTxnId, Timestamp maxTxnId, @Nullable TxnId findAsDep)
- {
- this.searchKeysOrRanges = searchKeysOrRanges;
- this.redundantBefore = redundantBefore;
- this.testKind = testKind;
- this.minTxnId = minTxnId;
- this.maxTxnId = maxTxnId;
- this.findAsDep = findAsDep;
- }
+ public static <L extends SummaryLoader> L loader(RedundantBefore
redundantBefore, MaxDecidedRX maxDecidedRX, TxnId primaryTxnId, LoadKeysFor
loadKeysFor, Unseekables<?> keysOrRanges, Factory<L> factory)
+ {
+ Invariants.require(primaryTxnId != null);
+ TxnId minTxnId = redundantBefore.min(keysOrRanges,
Bounds::gcBefore);
+ Timestamp maxTxnId = loadKeysFor == RECOVERY ||
!primaryTxnId.is(ExclusiveSyncPoint) ? Timestamp.MAX : primaryTxnId;
Review Comment:
or perhaps:
```
185 + Timestamp maxTxnId;
186 + TxnId findAsDep;
187 + Kinds kinds;
188 + if (loadKeysFor == RECOVERY)
189 + {
190 + maxTxnId = Timestamp.MAX;
191 + findAsDep = primaryTxnId;
192 + kinds =
primaryTxnId.witnesses().or(primaryTxnId.witnessedBy());
193 + }
194 + else
195 + {
196 + maxTxnId = primaryTxnId.is(ExclusiveSyncPoint)
? primaryTxnId : Timestamp.MAX;
197 + findAsDep = null;
198 + kinds = primaryTxnId.witnesses().or(Nothing);
199 + }
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]