dcapwell commented on code in PR #3494:
URL: https://github.com/apache/cassandra/pull/3494#discussion_r1731820569
##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -362,6 +363,31 @@ public List<V> readAll(K id)
return res;
}
+ public void readAll(K id, Reader reader)
Review Comment:
this is copy/paste of `org.apache.cassandra.journal.Journal#readAll just
changing this one line
`reader.read(in, segment.descriptor.userVersion);` rather than
`res.add(valueSerializer.deserialize(holder.key, in,
segment.descriptor.userVersion));`
maybe we should refactor so we only do this once and just pass a lambda
around?
##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -510,6 +536,25 @@ public RecordPointer asyncWrite(K id, V record,
Set<Integer> hosts)
return recordPointer;
}
+ public RecordPointer asyncWrite(K id, Writer writer, Set<Integer> hosts)
Review Comment:
this looks like copy/paste of
`org.apache.cassandra.journal.Journal#asyncWrite`. should we refactor?
##########
test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java:
##########
@@ -380,10 +380,10 @@ Consumer<List<Partition>> expectAccordCommandsNoChange()
};
}
-
private static RedundantBefore redundantBefore(TxnId txnId)
{
Ranges ranges = AccordTestUtils.fullRange(AccordTestUtils.keys(table,
42));
+ txnId = txnId.as(Kind.Read, Routable.Domain.Range);
Review Comment:
why is this change needed?
##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -362,6 +363,31 @@ public List<V> readAll(K id)
return res;
}
+ public void readAll(K id, Reader reader)
+ {
+ EntrySerializer.EntryHolder<K> holder = new
EntrySerializer.EntryHolder<>();
+ try (ReferencedSegments<K, V> segments = selectAndReference(id))
+ {
+ for (Segment<K, V> segment : segments.all())
+ {
+ segment.readAll(id, holder, () -> {
+ try (DataInputBuffer in = new
DataInputBuffer(holder.value, false))
+ {
+ reader.read(in, segment.descriptor.userVersion);
+ Invariants.checkState(Objects.equals(holder.key, id),
Review Comment:
shouldn't you do this before the `read`?
##########
src/java/org/apache/cassandra/service/accord/SavedCommand.java:
##########
@@ -73,540 +71,427 @@ private enum HasFields
LISTENERS
}
- public final TxnId txnId;
-
- public final Timestamp executeAt;
- public final SaveStatus saveStatus;
- public final Status.Durability durability;
-
- public final Ballot acceptedOrCommitted;
- public final Ballot promised;
-
- public final Route<?> route;
- public final PartialTxn partialTxn;
- public final PartialDeps partialDeps;
- public final Seekables<?, ?> additionalKeysOrRanges;
-
- public final Writes writes;
- public final Listeners.Immutable<Command.DurableAndIdempotentListener>
listeners;
+ public interface Writer<K> extends Journal.Writer
+ {
+ void write(DataOutputPlus out, int userVersion) throws IOException;
+ K key();
+ }
- public SavedCommand(TxnId txnId,
- Timestamp executeAt,
- SaveStatus saveStatus,
- Status.Durability durability,
+ public static class DiffWriter implements Writer<TxnId>
+ {
+ private final Command before;
+ private final Command after;
+ private final TxnId txnId;
- Ballot acceptedOrCommitted,
- Ballot promised,
+ public DiffWriter(Command before, Command after)
+ {
+ this(after.txnId(), before, after);
+ }
- Route<?> route,
- PartialTxn partialTxn,
- PartialDeps partialDeps,
- Seekables<?, ?> additionalKeysOrRanges,
+ public DiffWriter(TxnId txnId, Command before, Command after)
+ {
+ this.txnId = txnId;
+ this.before = before;
+ this.after = after;
+ }
- Writes writes,
-
Listeners.Immutable<Command.DurableAndIdempotentListener> listeners)
- {
- this.txnId = txnId;
- this.executeAt = executeAt;
- this.saveStatus = saveStatus;
- this.durability = durability;
+ @VisibleForTesting
+ public Command before()
+ {
+ return before;
+ }
- this.acceptedOrCommitted = acceptedOrCommitted;
- this.promised = promised;
+ @VisibleForTesting
+ public Command after()
+ {
+ return after;
+ }
- this.route = route;
- this.partialTxn = partialTxn;
- this.partialDeps = partialDeps;
- this.additionalKeysOrRanges = additionalKeysOrRanges;
+ public void write(DataOutputPlus out, int userVersion) throws
IOException
+ {
+ serialize(before, after, out, userVersion);
+ }
- this.writes = writes;
- this.listeners = listeners;
+ public TxnId key()
+ {
+ return txnId;
+ }
}
- public static SavedDiff diff(Command before, Command after)
- {
- if (before == after)
- return null;
- // TODO: we do not need to save `waitingOn` _every_ time.
- Command.WaitingOn waitingOn = getWaitingOn(after);
- return new SavedDiff(after.txnId(),
- ifNotEqual(before, after, Command::executeAt,
true),
- ifNotEqual(before, after, Command::saveStatus,
false),
- ifNotEqual(before, after, Command::durability,
false),
-
- ifNotEqual(before, after,
Command::acceptedOrCommitted, false),
- ifNotEqual(before, after, Command::promised,
false),
-
- ifNotEqual(before, after, Command::route, true),
- ifNotEqual(before, after, Command::partialTxn,
false),
- ifNotEqual(before, after, Command::partialDeps,
false),
- ifNotEqual(before, after,
Command::additionalKeysOrRanges, false),
-
- waitingOn,
- ifNotEqual(before, after, Command::writes, false),
- ifNotEqual(before, after,
Command::durableListeners, true));
- }
-
- static Command reconstructFromDiff(List<LoadedDiff> diffs)
+ public static Writer<TxnId> diffWriter(Command before, Command after)
{
- return reconstructFromDiff(diffs, CommandSerializers.APPLIED);
+ return new DiffWriter(before, after);
}
- /**
- * @param result is exposed because we are _not_ persisting result, since
during loading or replay
- * we do not expect we will have to send a result to the
client, and data results
- * can potentially contain a large number of entries, so
it's best if they are not
- * written into the log.
- */
- @VisibleForTesting
- static Command reconstructFromDiff(List<LoadedDiff> diffs, Result result)
+ public static void serialize(Command before, Command after, DataOutputPlus
out, int userVersion) throws IOException
{
- TxnId txnId = null;
-
- Timestamp executeAt = null;
- SaveStatus saveStatus = null;
- Status.Durability durability = null;
+ int flags = 0;
- Ballot acceptedOrCommitted = Ballot.ZERO;
- Ballot promised = null;
+ flags = collectFlags(before, after, Command::txnId, true,
Fields.TXN_ID, flags);
+ flags = collectFlags(before, after, Command::executeAt, true,
Fields.EXECUTE_AT, flags);
+ flags = collectFlags(before, after, Command::saveStatus, false,
Fields.SAVE_STATUS, flags);
+ flags = collectFlags(before, after, Command::durability, false,
Fields.DURABILITY, flags);
- Route<?> route = null;
- PartialTxn partialTxn = null;
- PartialDeps partialDeps = null;
- Seekables<?, ?> additionalKeysOrRanges = null;
+ flags = collectFlags(before, after, Command::acceptedOrCommitted,
false, Fields.ACCEPTED, flags);
+ flags = collectFlags(before, after, Command::promised, false,
Fields.PROMISED, flags);
- WaitingOnProvider waitingOnProvider = null;
- Writes writes = null;
- Listeners.Immutable listeners = null;
+ flags = collectFlags(before, after, Command::route, true,
Fields.ROUTE, flags);
+ flags = collectFlags(before, after, Command::partialTxn, false,
Fields.PARTIAL_TXN, flags);
+ flags = collectFlags(before, after, Command::partialDeps, false,
Fields.PARTIAL_DEPS, flags);
+ flags = collectFlags(before, after, Command::additionalKeysOrRanges,
false, Fields.ADDITIONAL_KEYS, flags);
- for (LoadedDiff diff : diffs)
+ Command.WaitingOn waitingOn = getWaitingOn(after);
+ if (waitingOn == null)
+ flags = setFieldIsNull(Fields.WAITING_ON, flags);
+ else
+ flags = setFieldChanged(Fields.WAITING_ON, flags);
+
+ flags = collectFlags(before, after, Command::writes, false,
Fields.WRITES, flags);
+ flags = collectFlags(before, after, Command::durableListeners, true,
Fields.LISTENERS, flags);
+
+ out.writeInt(flags);
+
+ // We encode all changed fields unless their value is null
+ if (getFieldChanged(Fields.TXN_ID, flags) && after.txnId() != null)
+ CommandSerializers.txnId.serialize(after.txnId(), out,
userVersion);
+ if (getFieldChanged(Fields.EXECUTE_AT, flags) && after.executeAt() !=
null)
+ CommandSerializers.timestamp.serialize(after.executeAt(), out,
userVersion);
+ if (getFieldChanged(Fields.SAVE_STATUS, flags))
+ out.writeInt(after.saveStatus().ordinal());
+ if (getFieldChanged(Fields.DURABILITY, flags) && after.durability() !=
null)
+ out.writeInt(after.durability().ordinal());
+
+ if (getFieldChanged(Fields.ACCEPTED, flags) &&
after.acceptedOrCommitted() != null)
+ CommandSerializers.ballot.serialize(after.acceptedOrCommitted(),
out, userVersion);
+ if (getFieldChanged(Fields.PROMISED, flags) && after.promised() !=
null)
+ CommandSerializers.ballot.serialize(after.promised(), out,
userVersion);
+
+ if (getFieldChanged(Fields.ROUTE, flags) && after.route() != null)
+
AccordKeyspace.LocalVersionedSerializers.route.serialize(after.route(), out);
// TODO (required): user version
+ if (getFieldChanged(Fields.PARTIAL_TXN, flags) && after.partialTxn()
!= null)
+ CommandSerializers.partialTxn.serialize(after.partialTxn(), out,
userVersion);
+ if (getFieldChanged(Fields.PARTIAL_DEPS, flags) && after.partialDeps()
!= null)
+ DepsSerializer.partialDeps.serialize(after.partialDeps(), out,
userVersion);
+ if (getFieldChanged(Fields.ADDITIONAL_KEYS, flags) &&
after.additionalKeysOrRanges() != null)
+ KeySerializers.seekables.serialize(after.additionalKeysOrRanges(),
out, userVersion);
+
+ if (getFieldChanged(Fields.WAITING_ON, flags) && waitingOn != null)
{
- if (diff.txnId != null)
- txnId = diff.txnId;
- if (diff.executeAt != null)
- executeAt = diff.executeAt;
- if (diff.saveStatus != null)
- saveStatus = diff.saveStatus;
- if (diff.durability != null)
- durability = diff.durability;
-
- if (diff.acceptedOrCommitted != null)
- acceptedOrCommitted = diff.acceptedOrCommitted;
- if (diff.promised != null)
- promised = diff.promised;
-
- if (diff.route != null)
- route = diff.route;
- if (diff.partialTxn != null)
- partialTxn = diff.partialTxn;
- if (diff.partialDeps != null)
- partialDeps = diff.partialDeps;
- if (diff.additionalKeysOrRanges != null)
- additionalKeysOrRanges = diff.additionalKeysOrRanges;
-
- if (diff.waitingOn != null)
- waitingOnProvider = diff.waitingOn;
- if (diff.writes != null)
- writes = diff.writes;
- if (diff.listeners != null)
- listeners = diff.listeners;
+ long size = WaitingOnSerializer.serializedSize(waitingOn);
+ ByteBuffer serialized =
WaitingOnSerializer.serialize(after.txnId(), waitingOn);
+ out.writeInt((int) size);
+ out.write(serialized);
}
- CommonAttributes.Mutable attrs = new CommonAttributes.Mutable(txnId);
- if (partialTxn != null)
- attrs.partialTxn(partialTxn);
- if (durability != null)
- attrs.durability(durability);
- if (route != null)
- attrs.route(route);
- if (partialDeps != null &&
- (saveStatus.known.deps != Status.KnownDeps.NoDeps &&
- saveStatus.known.deps != Status.KnownDeps.DepsErased &&
- saveStatus.known.deps != Status.KnownDeps.DepsUnknown))
- attrs.partialDeps(partialDeps);
- if (additionalKeysOrRanges != null)
- attrs.additionalKeysOrRanges(additionalKeysOrRanges);
- if (listeners != null && !listeners.isEmpty())
- attrs.setListeners(listeners);
-
- Command.WaitingOn waitingOn = null;
- if (waitingOnProvider != null)
- waitingOn = waitingOnProvider.provide(txnId, partialDeps);
-
- Invariants.checkState(saveStatus != null,
- "Save status is null after applying %s", diffs);
- switch (saveStatus.status)
+ if (getFieldChanged(Fields.WRITES, flags) && after.writes() != null)
+ CommandSerializers.writes.serialize(after.writes(), out,
userVersion);
+
+ if (getFieldChanged(Fields.LISTENERS, flags) &&
after.durableListeners() != null)
{
- case NotDefined:
- return saveStatus == SaveStatus.Uninitialised ?
Command.NotDefined.uninitialised(attrs.txnId())
- :
Command.NotDefined.notDefined(attrs, promised);
- case PreAccepted:
- return Command.PreAccepted.preAccepted(attrs, executeAt,
promised);
- case AcceptedInvalidate:
- case Accepted:
- case PreCommitted:
- return Command.Accepted.accepted(attrs, saveStatus, executeAt,
promised, acceptedOrCommitted);
- case Committed:
- case Stable:
- return Command.Committed.committed(attrs, saveStatus,
executeAt, promised, acceptedOrCommitted, waitingOn);
- case PreApplied:
- case Applied:
- return Command.Executed.executed(attrs, saveStatus, executeAt,
promised, acceptedOrCommitted, waitingOn, writes, result);
- case Truncated:
- case Invalidated:
- default:
- throw new IllegalStateException();
+ out.writeByte(after.durableListeners().size());
+ for (Command.DurableAndIdempotentListener listener :
after.durableListeners())
+
AccordKeyspace.LocalVersionedSerializers.listeners.serialize(listener, out);
}
}
- // TODO (required): this convert function was added only because
AsyncOperationTest was failing without it; maybe after switching to loading
from the log we can just pass l and r directly or remove != null checks.
- private static <OBJ, VAL> VAL ifNotEqual(OBJ lo, OBJ ro, Function<OBJ,
VAL> convert, boolean allowClassMismatch)
+ static Command.WaitingOn getWaitingOn(Command command)
+ {
+ if (command instanceof Command.Committed)
+ return command.asCommitted().waitingOn();
+
+ return null;
+ }
+
+ private static <OBJ, VAL> int collectFlags(OBJ lo, OBJ ro, Function<OBJ,
VAL> convert, boolean allowClassMismatch, Fields field, int oldFlags)
{
VAL l = null;
VAL r = null;
if (lo != null) l = convert.apply(lo);
if (ro != null) r = convert.apply(ro);
+ if (r == null)
+ oldFlags = setFieldIsNull(field, oldFlags);
+
if (l == r)
- return null;
+ return oldFlags; // no change
+
if (l == null || r == null)
- return r;
+ return setFieldChanged(field, oldFlags);
+
assert allowClassMismatch || l.getClass() == r.getClass() :
String.format("%s != %s", l.getClass(), r.getClass());
if (l.equals(r))
- return null;
+ return oldFlags; // no change
- return r;
+ return setFieldChanged(field, oldFlags);
}
- static Command.WaitingOn getWaitingOn(Command command)
+ private static int setFieldChanged(Fields field, int oldFlags)
{
- if (command instanceof Command.Committed)
- return command.asCommitted().waitingOn();
-
- return null;
+ return oldFlags | (1 << (field.ordinal() + Short.SIZE));
}
- public static class SavedDiff extends SavedCommand
+ private static boolean getFieldChanged(Fields field, int oldFlags)
{
- public final Command.WaitingOn waitingOn;
+ return (oldFlags & (1 << (field.ordinal() + Short.SIZE))) != 0;
+ }
- public SavedDiff(TxnId txnId,
- Timestamp executeAt,
- SaveStatus saveStatus,
- Status.Durability durability,
+ private static boolean getFieldIsNull(Fields field, int oldFlags)
+ {
+ return (oldFlags & (1 << field.ordinal())) != 0;
+ }
- Ballot acceptedOrCommitted,
- Ballot promised,
+ private static int setFieldIsNull(Fields field, int oldFlags)
+ {
+ return oldFlags | (1 << field.ordinal());
+ }
- Route<?> route,
- PartialTxn partialTxn,
- PartialDeps partialDeps,
- Seekables<?, ?> additionalKeysOrRanges,
- Command.WaitingOn waitingOn,
- Writes writes,
-
Listeners.Immutable<Command.DurableAndIdempotentListener> listeners)
- {
- super(txnId, executeAt, saveStatus, durability,
acceptedOrCommitted, promised, route, partialTxn, partialDeps,
additionalKeysOrRanges, writes, listeners);
- this.waitingOn = waitingOn;
- }
+ public static class Builder
+ {
+ TxnId txnId = null;
- @Override
- public String toString()
- {
- return "SavedDiff{" +
- " txnId=" + txnId +
- ", executeAt=" + executeAt +
- ", saveStatus=" + saveStatus +
- ", durability=" + durability +
- ", acceptedOrCommitted=" + acceptedOrCommitted +
- ", promised=" + promised +
- ", route=" + route +
- ", partialTxn=" + partialTxn +
- ", partialDeps=" + partialDeps +
- ", writes=" + writes +
- ", waitingOn=" + waitingOn +
- '}';
- }
- }
+ Timestamp executeAt = null;
+ SaveStatus saveStatus = null;
+ Status.Durability durability = null;
- public static class LoadedDiff extends SavedCommand
- {
- public final WaitingOnProvider waitingOn;
+ Ballot acceptedOrCommitted = Ballot.ZERO;
+ Ballot promised = null;
- public LoadedDiff(TxnId txnId,
- Timestamp executeAt,
- SaveStatus saveStatus,
- Status.Durability durability,
+ Route<?> route = null;
+ PartialTxn partialTxn = null;
+ PartialDeps partialDeps = null;
+ Seekables<?, ?> additionalKeysOrRanges = null;
- Ballot acceptedOrCommitted,
- Ballot promised,
+ SavedCommand.WaitingOnProvider waitingOn = (txn, deps) -> null;
+ Writes writes = null;
+ Listeners.Immutable<?> listeners = null;
+ Result result = CommandSerializers.APPLIED;
- Route<?> route,
- PartialTxn partialTxn,
- PartialDeps partialDeps,
- Seekables<?, ?> additionalKeysOrRanges,
+ boolean nextCalled = false;
+ int count = 0;
- WaitingOnProvider waitingOn,
- Writes writes,
-
Listeners.Immutable<Command.DurableAndIdempotentListener> listeners)
+ public int count()
{
- super(txnId, executeAt, saveStatus, durability,
acceptedOrCommitted, promised, route, partialTxn, partialDeps,
additionalKeysOrRanges, writes, listeners);
- this.waitingOn = waitingOn;
+ return count;
}
- public String toString()
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public void deserializeNext(DataInputPlus in, int userVersion) throws
IOException
{
- return "LoadedDiff{" +
- "waitingOn=" + waitingOn +
- '}';
- }
- }
-
- final static class SavedCommandSerializer implements
ValueSerializer<JournalKey, Object>
- {
- @Override
- public int serializedSize(JournalKey key, Object value, int
userVersion)
- {
- SavedDiff diff = (SavedDiff) value;
- long size = 0;
- size += SHORT_SIZE; // flags
-
- if (diff.txnId != null)
- size += CommandSerializers.txnId.serializedSize(diff.txnId,
userVersion);
- if (diff.executeAt != null)
- size +=
CommandSerializers.timestamp.serializedSize(diff.executeAt, userVersion);
- if (diff.saveStatus != null)
- size += Integer.BYTES;
- if (diff.durability != null)
- size += Integer.BYTES;
-
- if (diff.acceptedOrCommitted != null)
- size +=
CommandSerializers.ballot.serializedSize(diff.acceptedOrCommitted, userVersion);
- if (diff.promised != null)
- size +=
CommandSerializers.ballot.serializedSize(diff.promised, userVersion);
-
- if (diff.route != null)
- size +=
AccordKeyspace.LocalVersionedSerializers.route.serializedSize(diff.route);
- if (diff.partialTxn != null)
- CommandSerializers.partialTxn.serializedSize(diff.partialTxn,
userVersion);
- if (diff.partialDeps != null)
- DepsSerializer.partialDeps.serializedSize(diff.partialDeps,
userVersion);
- if (diff.additionalKeysOrRanges != null)
-
KeySerializers.seekables.serializedSize(diff.additionalKeysOrRanges,
userVersion);
-
- if (diff.waitingOn != null)
+ nextCalled = true;
+ count++;
+
+ final int flags = in.readInt();
+
+ if (getFieldChanged(Fields.TXN_ID, flags))
{
- size += Integer.BYTES;
- size += WaitingOnSerializer.serializedSize(diff.waitingOn);
+ if (getFieldIsNull(Fields.TXN_ID, flags))
+ txnId = null;
+ else
+ txnId = CommandSerializers.txnId.deserialize(in,
userVersion);
}
- if (diff.writes != null)
- CommandSerializers.writes.serializedSize(diff.writes,
userVersion);
-
- if (diff.listeners != null && !diff.listeners.isEmpty())
+ if (getFieldChanged(Fields.EXECUTE_AT, flags))
{
- size += Byte.BYTES;
- for (Command.DurableAndIdempotentListener listener :
diff.listeners)
- size +=
AccordKeyspace.LocalVersionedSerializers.listeners.serializedSize(listener);
+ if (getFieldIsNull(Fields.EXECUTE_AT, flags))
+ executeAt = null;
+ else
+ executeAt = CommandSerializers.timestamp.deserialize(in,
userVersion);
}
- return (int) size;
- }
- @Override
- public void serialize(JournalKey key, Object value, DataOutputPlus
out, int userVersion) throws IOException
- {
- SavedDiff diff = (SavedDiff) value;
- int flags = getFlags(diff);
-
- out.writeShort(flags);
-
- if (diff.txnId != null)
- CommandSerializers.txnId.serialize(diff.txnId, out,
userVersion);
- if (diff.executeAt != null)
- CommandSerializers.timestamp.serialize(diff.executeAt, out,
userVersion);
- if (diff.saveStatus != null)
- out.writeInt(diff.saveStatus.ordinal());
- if (diff.durability != null)
- out.writeInt(diff.durability.ordinal());
-
- if (diff.acceptedOrCommitted != null)
- CommandSerializers.ballot.serialize(diff.acceptedOrCommitted,
out, userVersion);
- if (diff.promised != null)
- CommandSerializers.ballot.serialize(diff.promised, out,
userVersion);
-
- if (diff.route != null)
-
AccordKeyspace.LocalVersionedSerializers.route.serialize(diff.route, out); //
TODO (required): user version
- if (diff.partialTxn != null)
- CommandSerializers.partialTxn.serialize(diff.partialTxn, out,
userVersion);
- if (diff.partialDeps != null)
- DepsSerializer.partialDeps.serialize(diff.partialDeps, out,
userVersion);
- if (diff.additionalKeysOrRanges != null)
-
KeySerializers.seekables.serialize(diff.additionalKeysOrRanges, out,
userVersion);
-
- if (diff.waitingOn != null)
+ if (getFieldChanged(Fields.SAVE_STATUS, flags))
{
- long size = WaitingOnSerializer.serializedSize(diff.waitingOn);
- ByteBuffer serialized =
WaitingOnSerializer.serialize(diff.txnId, diff.waitingOn);
- out.writeInt((int) size);
- out.write(serialized);
+ if (getFieldIsNull(Fields.SAVE_STATUS, flags))
+ saveStatus = null;
+ else
+ saveStatus = SaveStatus.values()[in.readInt()];
+ }
+ if (getFieldChanged(Fields.DURABILITY, flags))
+ {
+ if (getFieldIsNull(Fields.DURABILITY, flags))
+ durability = null;
+ else
+ durability = Status.Durability.values()[in.readInt()];
}
- if (diff.writes != null)
- CommandSerializers.writes.serialize(diff.writes, out,
userVersion);
+ if (getFieldChanged(Fields.ACCEPTED, flags))
+ {
+ if (getFieldIsNull(Fields.ACCEPTED, flags))
+ acceptedOrCommitted = null;
+ else
+ acceptedOrCommitted =
CommandSerializers.ballot.deserialize(in, userVersion);
+ }
- if (diff.listeners != null && !diff.listeners.isEmpty())
+ if (getFieldChanged(Fields.PROMISED, flags))
{
- out.writeByte(diff.listeners.size());
- for (Command.DurableAndIdempotentListener listener :
diff.listeners)
-
AccordKeyspace.LocalVersionedSerializers.listeners.serialize(listener, out);
+ if (getFieldIsNull(Fields.PROMISED, flags))
+ promised = null;
+ else
+ promised = CommandSerializers.ballot.deserialize(in,
userVersion);
}
- }
+ if (getFieldChanged(Fields.ROUTE, flags))
+ {
+ if (getFieldIsNull(Fields.ROUTE, flags))
+ route = null;
+ else
+ route =
AccordKeyspace.LocalVersionedSerializers.route.deserialize(in);
+ }
- private static int getFlags(SavedDiff diff)
- {
- int flags = 0;
-
- if (diff.txnId != null)
- flags = setBit(flags, HasFields.TXN_ID.ordinal());
- if (diff.executeAt != null)
- flags = setBit(flags, HasFields.EXECUTE_AT.ordinal());
- if (diff.saveStatus != null)
- flags = setBit(flags, HasFields.SAVE_STATUS.ordinal());
- if (diff.durability != null)
- flags = setBit(flags, HasFields.DURABILITY.ordinal());
-
- if (diff.acceptedOrCommitted != null)
- flags = setBit(flags, HasFields.ACCEPTED.ordinal());
- if (diff.promised != null)
- flags = setBit(flags, HasFields.PROMISED.ordinal());
-
- if (diff.route != null)
- flags = setBit(flags, HasFields.ROUTE.ordinal());
- if (diff.partialTxn != null)
- flags = setBit(flags, HasFields.PARTIAL_TXN.ordinal());
- if (diff.partialDeps != null)
- flags = setBit(flags, HasFields.PARTIAL_DEPS.ordinal());
- if (diff.additionalKeysOrRanges != null)
- flags = setBit(flags, HasFields.ADDITIONAL_KEYS.ordinal());
-
- if (diff.waitingOn != null)
- flags = setBit(flags, HasFields.WAITING_ON.ordinal());
- if (diff.writes != null)
- flags = setBit(flags, HasFields.WRITES.ordinal());
- if (diff.listeners != null && !diff.listeners.isEmpty())
- flags = setBit(flags, HasFields.LISTENERS.ordinal());
- return flags;
- }
+ if (getFieldChanged(Fields.PARTIAL_TXN, flags))
+ {
+ if (getFieldIsNull(Fields.PARTIAL_TXN, flags))
+ partialTxn = null;
+ else
+ partialTxn = CommandSerializers.partialTxn.deserialize(in,
userVersion);
+ }
- @Override
- public Object deserialize(JournalKey key, DataInputPlus in, int
userVersion) throws IOException
- {
- int flags = in.readShort();
-
- TxnId txnId = null;
- Timestamp executedAt = null;
- SaveStatus saveStatus = null;
- Status.Durability durability = null;
-
- Ballot acceptedOrCommitted = null;
- Ballot promised = null;
- Route<?> route = null;
-
- PartialTxn partialTxn = null;
- PartialDeps partialDeps = null;
- Seekables<?, ?> additionalKeysOrRanges = null;
-
- WaitingOnProvider waitingOn = (txn, deps) -> null;
- Writes writes = null;
- Listeners.Immutable listeners = null;
-
- if (isSet(flags, HasFields.TXN_ID.ordinal()))
- txnId = CommandSerializers.txnId.deserialize(in, userVersion);
- if (isSet(flags, HasFields.EXECUTE_AT.ordinal()))
- executedAt = CommandSerializers.timestamp.deserialize(in,
userVersion);
- if (isSet(flags, HasFields.SAVE_STATUS.ordinal()))
- saveStatus = SaveStatus.values()[in.readInt()];
- if (isSet(flags, HasFields.DURABILITY.ordinal()))
- durability = Status.Durability.values()[in.readInt()];
-
- if (isSet(flags, HasFields.ACCEPTED.ordinal()))
- acceptedOrCommitted =
CommandSerializers.ballot.deserialize(in, userVersion);
- if (isSet(flags, HasFields.PROMISED.ordinal()))
- promised = CommandSerializers.ballot.deserialize(in,
userVersion);
-
- if (isSet(flags, HasFields.ROUTE.ordinal()))
- route =
AccordKeyspace.LocalVersionedSerializers.route.deserialize(in);
- if (isSet(flags, HasFields.PARTIAL_TXN.ordinal()))
- partialTxn = CommandSerializers.partialTxn.deserialize(in,
userVersion);
- if (isSet(flags, HasFields.PARTIAL_DEPS.ordinal()))
- partialDeps = DepsSerializer.partialDeps.deserialize(in,
userVersion);
- if (isSet(flags, HasFields.ADDITIONAL_KEYS.ordinal()))
- additionalKeysOrRanges =
KeySerializers.seekables.deserialize(in, userVersion);
-
- if (isSet(flags, HasFields.WAITING_ON.ordinal()))
+ if (getFieldChanged(Fields.PARTIAL_DEPS, flags))
{
- int size = in.readInt();
- byte[] bytes = new byte[size];
- in.readFully(bytes);
- ByteBuffer buffer = ByteBuffer.wrap(bytes);
- waitingOn = (localTxnId, deps) -> {
- try
- {
- return WaitingOnSerializer.deserialize(localTxnId,
deps.keyDeps.keys(), deps.rangeDeps, deps.directKeyDeps, buffer);
- }
- catch (IOException e)
- {
- throw Throwables.unchecked(e);
- }
- };
+ if (getFieldIsNull(Fields.PARTIAL_DEPS, flags))
+ partialDeps = null;
+ else
+ partialDeps = DepsSerializer.partialDeps.deserialize(in,
userVersion);
}
- if (isSet(flags, HasFields.WRITES.ordinal()))
- writes = CommandSerializers.writes.deserialize(in,
userVersion);
- if (isSet(flags, HasFields.LISTENERS.ordinal()))
+ if (getFieldChanged(Fields.ADDITIONAL_KEYS, flags))
{
- Listeners builder = Listeners.Immutable.EMPTY.mutable();
- int cnt = in.readByte();
- for (int i = 0; i < cnt; i++)
-
builder.add(AccordKeyspace.LocalVersionedSerializers.listeners.deserialize(in));
- listeners = new Listeners.Immutable(builder);
+ if (getFieldIsNull(Fields.ADDITIONAL_KEYS, flags))
+ additionalKeysOrRanges = null;
+ else
+ additionalKeysOrRanges =
KeySerializers.seekables.deserialize(in, userVersion);
}
- return new LoadedDiff(txnId,
- executedAt,
- saveStatus,
- durability,
+ if (getFieldChanged(Fields.WAITING_ON, flags))
+ {
+ if (getFieldIsNull(Fields.WAITING_ON, flags))
+ {
+ waitingOn = null;
+ }
+ else
+ {
+ int size = in.readInt();
+ byte[] bytes = new byte[size];
+ in.readFully(bytes);
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ waitingOn = (localTxnId, deps) -> {
+ try
+ {
+ return WaitingOnSerializer.deserialize(localTxnId,
deps.keyDeps.keys(), deps.rangeDeps, deps.directKeyDeps, buffer);
+ }
+ catch (IOException e)
+ {
+ throw Throwables.unchecked(e);
+ }
+ };
+ }
+ }
- acceptedOrCommitted,
- promised,
+ if (getFieldChanged(Fields.WRITES, flags))
+ {
+ if (getFieldIsNull(Fields.WRITES, flags))
+ writes = null;
+ else
+ writes = CommandSerializers.writes.deserialize(in,
userVersion);
+ }
- route,
- partialTxn,
- partialDeps,
- additionalKeysOrRanges,
+ if (getFieldChanged(Fields.LISTENERS, flags))
+ {
+ if (getFieldIsNull(Fields.LISTENERS, flags))
+ {
+ listeners = null;
+ }
+ else
+ {
+ Listeners builder = Listeners.Immutable.EMPTY.mutable();
+ int cnt = in.readByte();
+ for (int i = 0; i < cnt; i++)
+
builder.add(AccordKeyspace.LocalVersionedSerializers.listeners.deserialize(in));
+ listeners = new Listeners.Immutable(builder);
+ }
+ }
+ }
- waitingOn,
- writes,
- listeners);
+ public void forceResult(Result newValue)
+ {
+ this.result = newValue;
}
- }
- static int setBit(int value, int bit)
- {
- return value | (1 << bit);
- }
+ public Command construct() throws IOException
+ {
+ if (!nextCalled)
+ return null;
+
+ CommonAttributes.Mutable attrs = new
CommonAttributes.Mutable(txnId);
+ if (partialTxn != null)
+ attrs.partialTxn(partialTxn);
+ if (durability != null)
+ attrs.durability(durability);
+ if (route != null)
+ attrs.route(route);
+ if (partialDeps != null &&
+ (saveStatus.known.deps != Status.KnownDeps.NoDeps &&
+ saveStatus.known.deps != Status.KnownDeps.DepsErased &&
+ saveStatus.known.deps != Status.KnownDeps.DepsUnknown))
+ attrs.partialDeps(partialDeps);
+ if (additionalKeysOrRanges != null)
+ attrs.additionalKeysOrRanges(additionalKeysOrRanges);
+ if (listeners != null && !listeners.isEmpty())
+ attrs.setListeners(listeners);
+
+ Command.WaitingOn waitingOn = null;
+ if (this.waitingOn != null)
+ waitingOn = this.waitingOn.provide(txnId, partialDeps);
+
+ switch (saveStatus.status)
+ {
+ case NotDefined:
+ return saveStatus == SaveStatus.Uninitialised ?
Command.NotDefined.uninitialised(attrs.txnId())
+ :
Command.NotDefined.notDefined(attrs, promised);
+ case PreAccepted:
+ return Command.PreAccepted.preAccepted(attrs, executeAt,
promised);
+ case AcceptedInvalidate:
+ case Accepted:
+ case PreCommitted:
+ return Command.Accepted.accepted(attrs, saveStatus,
executeAt, promised, acceptedOrCommitted);
+ case Committed:
+ case Stable:
+ return Command.Committed.committed(attrs, saveStatus,
executeAt, promised, acceptedOrCommitted, waitingOn);
+ case PreApplied:
+ case Applied:
+ return Command.Executed.executed(attrs, saveStatus,
executeAt, promised, acceptedOrCommitted, waitingOn, writes, result);
+ case Truncated:
+ case Invalidated:
+ default:
+ throw new IllegalStateException();
+ }
+ }
- static boolean isSet(int value, int bit)
- {
- return (value & (1 << bit)) != 0;
+ public String toString()
+ {
+ return "Diff {" +
+ "txnId=" + txnId +
+ ", executeAt=" + executeAt +
+ ", saveStatus=" + saveStatus +
+ ", durability=" + durability +
+ ", acceptedOrCommitted=" + acceptedOrCommitted +
+ ", promised=" + promised +
+ ", route=" + route +
+ ", partialTxn=" + partialTxn +
+ ", partialDeps=" + partialDeps +
+ ", additionalKeysOrRanges=" + additionalKeysOrRanges +
+ ", waitingOn=" + waitingOn +
+ ", writes=" + writes +
+ ", listeners=" + listeners +
+ '}';
+ }
}
public interface WaitingOnProvider
{
Command.WaitingOn provide(TxnId txnId, PartialDeps deps);
}
-}
\ No newline at end of file
+
+}//import java.io.IOException;//import java.nio.ByteBuffer;//import
java.util.List;//import java.util.function.Function;////import
com.google.common.annotations.VisibleForTesting;////import
accord.api.Result;//import accord.local.Command;//import
accord.local.CommonAttributes;//import accord.local.Listeners;//import
accord.local.SaveStatus;//import accord.local.Status;//import
accord.primitives.Ballot;//import accord.primitives.PartialDeps;//import
accord.primitives.PartialTxn;//import accord.primitives.Route;//import
accord.primitives.Seekables;//import accord.primitives.Timestamp;//import
accord.primitives.TxnId;//import accord.primitives.Writes;//import
accord.utils.Invariants;//import
org.apache.cassandra.io.util.DataInputPlus;//import
org.apache.cassandra.io.util.DataOutputPlus;//import
org.apache.cassandra.journal.ValueSerializer;//import
org.apache.cassandra.service.accord.serializers.CommandSerializers;//import
org.apache.cassandra.service.accord.serializers.DepsSerializer;/
/import
org.apache.cassandra.service.accord.serializers.KeySerializers;//import
org.apache.cassandra.service.accord.serializers.WaitingOnSerializer;//import
org.apache.cassandra.utils.Throwables;////public class SavedCommand//{//
public static final ValueSerializer<JournalKey, Object> serializer = new
SavedCommandSerializer();//// // This enum is order-dependent// private
enum Fields// {// TXN_ID,// EXECUTE_AT,//
SAVE_STATUS,// DURABILITY,// ACCEPTED,// PROMISED,//
ROUTE,// PARTIAL_TXN,// PARTIAL_DEPS,//
ADDITIONAL_KEYS,// WAITING_ON,// WRITES,// LISTENERS//
}//// public final TxnId txnId;//// public final NewValue<Timestamp>
executeAt;// public final NewValue<SaveStatus> saveStatus;// public final
NewValue<Status.Durability> durability;//// public final NewValue<Ballot>
acceptedOrCommitted;// public final NewValue<Ballot> promised;//// public
final NewValue
<Route<?>> route;// public final NewValue<PartialTxn> partialTxn;//
public final NewValue<PartialDeps> partialDeps;// public final
NewValue<Seekables<?, ?>> additionalKeysOrRanges;//// public final
NewValue<Writes> writes;// public final
NewValue<Listeners.Immutable<Command.DurableAndIdempotentListener>>
listeners;//// public SavedCommand(TxnId txnId,//
NewValue<Timestamp> executeAt,// NewValue<SaveStatus>
saveStatus,// NewValue<Status.Durability>
durability,//// NewValue<Ballot> acceptedOrCommitted,//
NewValue<Ballot> promised,////
NewValue<Route<?>> route,// NewValue<PartialTxn>
partialTxn,// NewValue<PartialDeps> partialDeps,//
NewValue<Seekables<?, ?>> additionalKeysOrRanges,////
NewValue<Writes> writes,// NewVal
ue<Listeners.Immutable<Command.DurableAndIdempotentListener>> listeners)//
{// this.txnId = txnId;// this.executeAt = executeAt;//
this.saveStatus = saveStatus;// this.durability = durability;////
this.acceptedOrCommitted = acceptedOrCommitted;// this.promised =
promised;//// this.route = route;// this.partialTxn =
partialTxn;// this.partialDeps = partialDeps;//
this.additionalKeysOrRanges = additionalKeysOrRanges;//// this.writes =
writes;// this.listeners = listeners;// }//// // Instead of saved
diff, just encode/serialize right away.// // Loaded diff will have both a
method to check if the field exists _and_ a getter// public static SavedDiff
diff(Command before, Command after)// {// if (before == after)//
return null;//// // TODO: we do not need to save `waitingOn`
_every_ time.// Command.WaitingOn waitingOn = getWaitingOn(after);//
return
new SavedDiff(after.txnId(),// ifNotEqual(before,
after, Command::executeAt, true),//
ifNotEqual(before, after, Command::saveStatus, false),//
ifNotEqual(before, after, Command::durability, false),////
ifNotEqual(before, after, Command::acceptedOrCommitted, false),//
ifNotEqual(before, after, Command::promised,
false),//// ifNotEqual(before, after,
Command::route, true),// ifNotEqual(before, after,
Command::partialTxn, false),// ifNotEqual(before,
after, Command::partialDeps, false),//
ifNotEqual(before, after, Command::additionalKeysOrRanges, false),////
new NewValue<>(waitingOn),//
ifNotEqual(before, after, Command::writes, false),//
ifNotEqual(before
, after, Command::durableListeners, true));// }//// static Command
reconstructFromDiff(List<LoadedDiff> diffs)// {// return
reconstructFromDiff(diffs, CommandSerializers.APPLIED);// }//// /**//
* @param result is exposed because we are _not_ persisting result, since during
loading or replay// * we do not expect we will have to send a
result to the client, and data results// * can potentially
contain a large number of entries, so it's best if they are not// *
written into the log.// */// @VisibleForTesting// static
Command reconstructFromDiff(List<LoadedDiff> diffs, Result result)// {//
TxnId txnId = null;//// Timestamp executeAt = null;//
SaveStatus saveStatus = null;// Status.Durability durability = null;////
Ballot acceptedOrCommitted = Ballot.ZERO;// Ballot promised =
null;//// Route<?> route = null;// PartialTxn partialTxn = null;//
PartialDeps partialDeps = null;// Seekables<?, ?>
additionalKeysOrRanges = null;//// WaitingOnProvider waitingOnProvider =
null;// Writes writes = null;// Listeners.Immutable listeners =
null;//// for (LoadedDiff diff : diffs)// {// if
(diff.txnId != null)// txnId = diff.txnId;// if
(diff.executeAt != null)// executeAt = diff.executeAt.get();//
if (diff.saveStatus != null)// saveStatus =
diff.saveStatus.get();// if (diff.durability != null)//
durability = diff.durability.get();//// if
(diff.acceptedOrCommitted != null)// acceptedOrCommitted =
diff.acceptedOrCommitted.get();// if (diff.promised != null)//
promised = diff.promised.get();//// if (diff.route !=
null)// route = diff.route.get();// if
(diff.partialTxn != null)// partia
lTxn = diff.partialTxn.get();// if (diff.partialDeps != null)//
partialDeps = diff.partialDeps.get();// if
(diff.additionalKeysOrRanges != null)// additionalKeysOrRanges =
diff.additionalKeysOrRanges.get();//// if (diff.waitingOn != null)//
waitingOnProvider = diff.waitingOn.get();// if
(diff.writes != null)// writes = diff.writes.get();//
if (diff.listeners != null)// listeners =
diff.listeners.get();// }//// CommonAttributes.Mutable attrs =
new CommonAttributes.Mutable(txnId);// if (partialTxn != null)//
attrs.partialTxn(partialTxn);// if (durability != null)//
attrs.durability(durability);// if (route != null)//
attrs.route(route);// if (partialDeps != null &&//
(saveStatus.known.deps != Status.KnownDeps.NoDeps &&//
saveStatus.known.deps != Status.KnownDep
s.DepsErased &&// saveStatus.known.deps !=
Status.KnownDeps.DepsUnknown))// attrs.partialDeps(partialDeps);//
if (additionalKeysOrRanges != null)//
attrs.additionalKeysOrRanges(additionalKeysOrRanges);// if (listeners !=
null && !listeners.isEmpty())// attrs.setListeners(listeners);////
Command.WaitingOn waitingOn = null;// if (waitingOnProvider !=
null)// waitingOn = waitingOnProvider.provide(txnId,
partialDeps);//// Invariants.checkState(saveStatus != null,//
"Save status is null after applying %s", diffs);//
switch (saveStatus.status)// {// case NotDefined://
return saveStatus == SaveStatus.Uninitialised ?
Command.NotDefined.uninitialised(attrs.txnId())//
: Command.NotDefined.notDefined(attrs,
promised);// case PreAccepted:// return Comm
and.PreAccepted.preAccepted(attrs, executeAt, promised);// case
AcceptedInvalidate:// case Accepted:// case
PreCommitted:// return Command.Accepted.accepted(attrs,
saveStatus, executeAt, promised, acceptedOrCommitted);// case
Committed:// case Stable:// return
Command.Committed.committed(attrs, saveStatus, executeAt, promised,
acceptedOrCommitted, waitingOn);// case PreApplied://
case Applied:// return Command.Executed.executed(attrs,
saveStatus, executeAt, promised, acceptedOrCommitted, waitingOn, writes,
result);// case Truncated:// case Invalidated://
default:// throw new IllegalStateException();// }//
}//// // TODO (required): this convert function was added only because
AsyncOperationTest was failing without it;// // maybe after switching to
loading from the log we can just pass l and r directl
y or remove != null checks.// private static <OBJ, VAL> NewValue<VAL>
ifNotEqual(OBJ lo, OBJ ro, Function<OBJ, VAL> convert, boolean
allowClassMismatch)// {// VAL l = null;// VAL r = null;//
if (lo != null) l = convert.apply(lo);// if (ro != null) r =
convert.apply(ro);//// if (l == r)// return null; // null
here means there was no change//// if (l == null || r == null)//
return NewValue.of(r);//// assert allowClassMismatch || l.getClass()
== r.getClass() : String.format("%s != %s", l.getClass(), r.getClass());////
if (l.equals(r))// return null;//// return
NewValue.of(r);// }////// public static class NewValue<T>// {//
final T value;//// private NewValue(T value)// {//
this.value = value;// }//// public boolean isNull()// {//
return value == null;// }//// public T get()//
{// re
turn value;// }//// public static <T> NewValue<T> of(T value)//
{// return new NewValue<>(value);// }//// public
String toString()// {// return "" + value;// }//
}//// static Command.WaitingOn getWaitingOn(Command command)// {//
if (command instanceof Command.Committed)// return
command.asCommitted().waitingOn();//// return null;// }//// public
static class SavedDiff extends SavedCommand// {// public final
NewValue<Command.WaitingOn> waitingOn;//// public SavedDiff(TxnId
txnId,// NewValue<Timestamp> executeAt,//
NewValue<SaveStatus> saveStatus,//
NewValue<Status.Durability> durability,////
NewValue<Ballot> acceptedOrCommitted,//
NewValue<Ballot> promised,//// NewValue<Route<?>>
route,// NewVa
lue<PartialTxn> partialTxn,// NewValue<PartialDeps>
partialDeps,// NewValue<Seekables<?, ?>>
additionalKeysOrRanges,//// NewValue<Command.WaitingOn>
waitingOn,// NewValue<Writes> writes,//
NewValue<Listeners.Immutable<Command.DurableAndIdempotentListener>>
listeners)// {// super(txnId, executeAt, saveStatus,
durability, acceptedOrCommitted, promised, route, partialTxn, partialDeps,
additionalKeysOrRanges, writes, listeners);// this.waitingOn =
waitingOn;// }//// @Override// public String toString()//
{// return "SavedDiff{" +// " txnId=" +
txnId +// ", executeAt=" + executeAt +// ",
saveStatus=" + saveStatus +// ", durability=" + durability
+// ", acceptedOrCommitted=" + acceptedOrCommitted +//
", promised=" + promised +// ", route=" + route +//
", partialTxn=" + partialTxn +// ", partialDeps="
+ partialDeps +// ", writes=" + writes +//
", waitingOn=" + waitingOn +// '}';// }// }////
public static class LoadedDiff extends SavedCommand// {// public
final NewValue<WaitingOnProvider> waitingOn;//// public LoadedDiff(TxnId
txnId,// NewValue<Timestamp> executeAt,//
NewValue<SaveStatus> saveStatus,//
NewValue<Status.Durability> durability,////
NewValue<Ballot> acceptedOrCommitted,//
NewValue<Ballot> promised,//// NewValue<Route<?>>
route,// NewValue<PartialTxn> partialTxn,//
NewValue<PartialDeps> partialDeps,//
NewValue
<Seekables<?, ?>> additionalKeysOrRanges,////
NewValue<WaitingOnProvider> waitingOn,//
NewValue<Writes> writes,//
NewValue<Listeners.Immutable<Command.DurableAndIdempotentListener>>
listeners)// {// super(txnId, executeAt, saveStatus,
durability, acceptedOrCommitted, promised, route, partialTxn, partialDeps,
additionalKeysOrRanges, writes, listeners);// this.waitingOn =
waitingOn;// }//// public String toString()// {//
return "LoadedDiff{" +// "waitingOn=" + waitingOn +//
'}';// }// }//// final static class
SavedCommandSerializer implements ValueSerializer<JournalKey, Object>// {//
@Override// public int serializedSize(JournalKey key, Object
value, int userVersion)// {// SavedDiff diff = (SavedDiff)
value;// long size = 0;//// size += SHORT_SIZE
; // flags//////// if (diff.txnId != null)//// size
+= CommandSerializers.txnId.serializedSize(diff.txnId, userVersion);////
if (diff.executeAt != null)//// size +=
CommandSerializers.timestamp.serializedSize(diff.executeAt, userVersion);////
if (diff.saveStatus != null)//// size +=
Integer.BYTES;//// if (diff.durability != null)////
size += Integer.BYTES;//////// if (diff.acceptedOrCommitted !=
null)//// size +=
CommandSerializers.ballot.serializedSize(diff.acceptedOrCommitted,
userVersion);//// if (diff.promised != null)//// size
+= CommandSerializers.ballot.serializedSize(diff.promised,
userVersion);//////// if (diff.route != null)////
size +=
AccordKeyspace.LocalVersionedSerializers.route.serializedSize(diff.route);////
if (diff.partialTxn != null)//// CommandSerializers.par
tialTxn.serializedSize(diff.partialTxn, userVersion);//// if
(diff.partialDeps != null)////
DepsSerializer.partialDeps.serializedSize(diff.partialDeps, userVersion);////
if (diff.additionalKeysOrRanges != null)////
KeySerializers.seekables.serializedSize(diff.additionalKeysOrRanges,
userVersion);//////// if (diff.waitingOn != null)////
{//// size += Integer.BYTES;//// size +=
WaitingOnSerializer.serializedSize(diff.waitingOn);//// }////////
if (diff.writes != null)////
CommandSerializers.writes.serializedSize(diff.writes, userVersion);////////
if (diff.listeners != null && !diff.listeners.isEmpty())////
{//// size += Byte.BYTES;//// for
(Command.DurableAndIdempotentListener listener : diff.listeners)////
size += AccordKeyspace.LocalVersionedSerializers.listeners.serializedSi
ze(listener);//// }// return (int) size;// }////
@Override// public void serialize(JournalKey key, Object value,
DataOutputPlus out, int userVersion) throws IOException// {//
SavedDiff diff = (SavedDiff) value;////
out.writeShort(getSerializedFieldsFlags(diff));//
out.writeShort(getNullFieldFlags(diff));//// if (diff.txnId !=
null)// CommandSerializers.txnId.serialize(diff.txnId, out,
userVersion);// if (diff.executeAt != null &&
!diff.executeAt.isNull())//
CommandSerializers.timestamp.serialize(diff.executeAt.get(), out,
userVersion);// if (diff.saveStatus != null &&
!diff.saveStatus.isNull())//
out.writeInt(diff.saveStatus.get().ordinal());// if (diff.durability
!= null && !diff.durability.isNull())//
out.writeInt(diff.durability.get().ordinal());//// if
(diff.acceptedOrCommitted != nul
l && !diff.acceptedOrCommitted.isNull())//
CommandSerializers.ballot.serialize(diff.acceptedOrCommitted.get(), out,
userVersion);// if (diff.promised != null &&
!diff.promised.isNull())//
CommandSerializers.ballot.serialize(diff.promised.get(), out, userVersion);////
if (diff.route != null && !diff.route.isNull())//
AccordKeyspace.LocalVersionedSerializers.route.serialize(diff.route.get(),
out); // TODO (required): user version// if (diff.partialTxn != null
&& !diff.partialTxn.isNull())//
CommandSerializers.partialTxn.serialize(diff.partialTxn.get(), out,
userVersion);// if (diff.partialDeps != null &&
!diff.partialDeps.isNull())//
DepsSerializer.partialDeps.serialize(diff.partialDeps.get(), out,
userVersion);// if (diff.additionalKeysOrRanges != null &&
!diff.additionalKeysOrRanges.isNull())//
KeySerializers.seekables.serialize(diff.additi
onalKeysOrRanges.get(), out, userVersion);//// if (diff.waitingOn
!= null && !diff.waitingOn.isNull())// {// long size
= WaitingOnSerializer.serializedSize(diff.waitingOn.get());//
ByteBuffer serialized = WaitingOnSerializer.serialize(diff.txnId,
diff.waitingOn.get());// out.writeInt((int) size);//
out.write(serialized);// }//// if (diff.writes != null
&& !diff.writes.isNull())//
CommandSerializers.writes.serialize(diff.writes.get(), out, userVersion);////
if (diff.listeners != null && !diff.listeners.isNull())//
{// out.writeByte(diff.listeners.get().size());//
for (Command.DurableAndIdempotentListener listener : diff.listeners.get())//
AccordKeyspace.LocalVersionedSerializers.listeners.serialize(listener, out);//
}// }//// private static int
getSerializedFieldsFlags(Sav
edDiff diff)// {// int flags = 0;//// if
(diff.txnId != null)// flags = setBit(flags,
Fields.TXN_ID.ordinal());// if (diff.executeAt != null)//
flags = setBit(flags, Fields.EXECUTE_AT.ordinal());// if
(diff.saveStatus != null)// flags = setBit(flags,
Fields.SAVE_STATUS.ordinal());// if (diff.durability != null)//
flags = setBit(flags, Fields.DURABILITY.ordinal());//// if
(diff.acceptedOrCommitted != null)// flags = setBit(flags,
Fields.ACCEPTED.ordinal());// if (diff.promised != null)//
flags = setBit(flags, Fields.PROMISED.ordinal());//// if
(diff.route != null)// flags = setBit(flags,
Fields.ROUTE.ordinal());// if (diff.partialTxn != null)//
flags = setBit(flags, Fields.PARTIAL_TXN.ordinal());// if
(diff.partialDeps != null)// flags
= setBit(flags, Fields.PARTIAL_DEPS.ordinal());// if
(diff.additionalKeysOrRanges != null)// flags = setBit(flags,
Fields.ADDITIONAL_KEYS.ordinal());//// if (diff.waitingOn != null)//
flags = setBit(flags, Fields.WAITING_ON.ordinal());//
if (diff.writes != null)// flags = setBit(flags,
Fields.WRITES.ordinal());// if (diff.listeners != null)//
flags = setBit(flags, Fields.LISTENERS.ordinal());// return
flags;// }//// private static int getNullFieldFlags(SavedDiff
diff)// {// int flags = 0;//// if (diff.executeAt
!= null && diff.executeAt.isNull())// flags = setBit(flags,
Fields.EXECUTE_AT.ordinal());// if (diff.saveStatus != null &&
diff.saveStatus.isNull())// flags = setBit(flags,
Fields.SAVE_STATUS.ordinal());// if (diff.durability != null &&
diff.durability.isNull())//
flags = setBit(flags, Fields.DURABILITY.ordinal());////
if (diff.acceptedOrCommitted != null && diff.acceptedOrCommitted.isNull())//
flags = setBit(flags, Fields.ACCEPTED.ordinal());// if
(diff.promised != null && diff.promised.isNull())// flags =
setBit(flags, Fields.PROMISED.ordinal());//// if (diff.route != null
&& diff.route.isNull())// flags = setBit(flags,
Fields.ROUTE.ordinal());// if (diff.partialTxn != null &&
diff.partialTxn.isNull())// flags = setBit(flags,
Fields.PARTIAL_TXN.ordinal());// if (diff.partialDeps != null &&
diff.partialDeps.isNull())// flags = setBit(flags,
Fields.PARTIAL_DEPS.ordinal());// if (diff.additionalKeysOrRanges !=
null && diff.additionalKeysOrRanges.isNull())// flags =
setBit(flags, Fields.ADDITIONAL_KEYS.ordinal());//// if
(diff.waitingOn != null && diff.waitingOn.isNull()
)// flags = setBit(flags, Fields.WAITING_ON.ordinal());//
if (diff.writes != null && diff.writes.isNull())// flags =
setBit(flags, Fields.WRITES.ordinal());// if (diff.listeners != null
&& diff.listeners.isNull())// flags = setBit(flags,
Fields.LISTENERS.ordinal());// return flags;// }//////
@Override// public Object deserialize(JournalKey key, DataInputPlus in,
int userVersion) throws IOException// {// int hasFieldFlags =
in.readShort();// int nullFields = in.readShort();////
TxnId txnId = null;// Timestamp executedAt = null;//
SaveStatus saveStatus = null;// Status.Durability durability =
null;//// Ballot acceptedOrCommitted = null;// Ballot
promised = null;// Route<?> route = null;//// PartialTxn
partialTxn = null;// PartialDeps partialDeps = null;//
Seekables<?, ?> additionalKeysOrRanges = null;////
WaitingOnProvider waitingOn = (txn, deps) -> null;// Writes writes =
null;// Listeners.Immutable listeners = null;//// if
(isSet(hasFieldFlags, Fields.TXN_ID.ordinal()) &&//
!isSet(nullFields, Fields.TXN_ID.ordinal()))// txnId =
CommandSerializers.txnId.deserialize(in, userVersion);// if
(isSet(hasFieldFlags, Fields.EXECUTE_AT.ordinal()) &&//
!isSet(nullFields, Fields.EXECUTE_AT.ordinal()))// executedAt =
CommandSerializers.timestamp.deserialize(in, userVersion);// if
(isSet(hasFieldFlags, Fields.SAVE_STATUS.ordinal()) &&//
!isSet(nullFields, Fields.SAVE_STATUS.ordinal()))// saveStatus =
SaveStatus.values()[in.readInt()];// if (isSet(hasFieldFlags,
Fields.DURABILITY.ordinal()) &&// !isSet(nullFields,
Fields.DURABILITY.ordinal()))// du
rability = Status.Durability.values()[in.readInt()];//// if
(isSet(hasFieldFlags, Fields.ACCEPTED.ordinal()) &&//
!isSet(nullFields, Fields.ACCEPTED.ordinal()))//
acceptedOrCommitted = CommandSerializers.ballot.deserialize(in, userVersion);//
if (isSet(hasFieldFlags, Fields.PROMISED.ordinal()) &&//
!isSet(nullFields, Fields.PROMISED.ordinal()))// promised =
CommandSerializers.ballot.deserialize(in, userVersion);//// if
(isSet(hasFieldFlags, Fields.ROUTE.ordinal()) &&//
!isSet(nullFields, Fields.ROUTE.ordinal()))// route =
AccordKeyspace.LocalVersionedSerializers.route.deserialize(in);// if
(isSet(hasFieldFlags, Fields.PARTIAL_TXN.ordinal()) &&//
!isSet(nullFields, Fields.PARTIAL_TXN.ordinal()))// partialTxn =
CommandSerializers.partialTxn.deserialize(in, userVersion);// if
(isSet(hasFieldFlags, Fields.PARTIAL_D
EPS.ordinal()) &&// !isSet(nullFields,
Fields.PARTIAL_DEPS.ordinal()))// partialDeps =
DepsSerializer.partialDeps.deserialize(in, userVersion);// if
(isSet(hasFieldFlags, Fields.ADDITIONAL_KEYS.ordinal()) &&//
!isSet(nullFields, Fields.ADDITIONAL_KEYS.ordinal()))//
additionalKeysOrRanges = KeySerializers.seekables.deserialize(in,
userVersion);//// if (isSet(hasFieldFlags,
Fields.WAITING_ON.ordinal()) &&// !isSet(nullFields,
Fields.WAITING_ON.ordinal()))// {// int size =
in.readInt();// byte[] bytes = new byte[size];//
in.readFully(bytes);// ByteBuffer buffer =
ByteBuffer.wrap(bytes);// waitingOn = (localTxnId, deps) -> {//
try// {// return
WaitingOnSerializer.deserialize(localTxnId, deps.keyDeps.keys(),
deps.rangeDeps, deps.directKeyDeps, buff
er);// }// catch (IOException e)//
{// throw Throwables.unchecked(e);//
}// };// }// if
(isSet(hasFieldFlags, Fields.WRITES.ordinal()) &&//
!isSet(nullFields, Fields.WRITES.ordinal()))// writes =
CommandSerializers.writes.deserialize(in, userVersion);//// if
(isSet(hasFieldFlags, Fields.LISTENERS.ordinal()) &&//
!isSet(nullFields, Fields.LISTENERS.ordinal()))// {//
Listeners builder = Listeners.Immutable.EMPTY.mutable();// int
cnt = in.readByte();// for (int i = 0; i < cnt; i++)//
builder.add(AccordKeyspace.LocalVersionedSerializers.listeners.deserialize(in));//
listeners = new Listeners.Immutable(builder);//
}//// return new LoadedDiff(txnId,//
execut
edAt == null && !isSet(nullFields, Fields.EXECUTE_AT.ordinal()) ? null :
NewValue.of(executedAt),// saveStatus == null
&& !isSet(nullFields, Fields.SAVE_STATUS.ordinal()) ? null :
NewValue.of(saveStatus),// durability == null
&& !isSet(nullFields, Fields.DURABILITY.ordinal()) ? null :
NewValue.of(durability),////
acceptedOrCommitted == null && !isSet(nullFields, Fields.ACCEPTED.ordinal()) ?
null : NewValue.of(acceptedOrCommitted),//
promised == null && !isSet(nullFields, Fields.PROMISED.ordinal()) ? null :
NewValue.of(promised),//// route == null &&
!isSet(nullFields, Fields.ROUTE.ordinal()) ? null : NewValue.of(route),//
partialTxn == null && !isSet(nullFields,
Fields.PARTIAL_TXN.ordinal()) ? null : NewValue.of(partialTxn),//
partialDeps == null && !isSet
(nullFields, Fields.PARTIAL_DEPS.ordinal()) ? null :
NewValue.of(partialDeps),//
additionalKeysOrRanges == null && !isSet(nullFields,
Fields.ADDITIONAL_KEYS.ordinal()) ? null :
NewValue.of(additionalKeysOrRanges),////
waitingOn == null && !isSet(nullFields, Fields.WAITING_ON.ordinal()) ? null :
NewValue.of(waitingOn),// writes == null &&
!isSet(nullFields, Fields.WRITES.ordinal()) ? null : NewValue.of(writes),//
listeners == null && !isSet(nullFields,
Fields.LISTENERS.ordinal()) ? null : NewValue.of(listeners));// }//
}//// static int setBit(int value, int bit)// {// return value |
(1 << bit);// }//// static boolean isSet(int value, int bit)// {//
return (value & (1 << bit)) != 0;// }//// public interface
WaitingOnProvider// {// Command.WaitingOn provide(TxnId txnId,
PartialDeps deps);// }//}
Review Comment:
think you want to remove this...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]