This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new a5fd6b3bbb Invalidation fixes/improvements - Integrate accord-core changes for CASSANDRA-18057 a5fd6b3bbb is described below commit a5fd6b3bbb83661c12e9c08a16ba3601e2302c70 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Mon Nov 21 12:22:38 2022 +0000 Invalidation fixes/improvements - Integrate accord-core changes for CASSANDRA-18057 patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-18057 --- .build/include-accord.sh | 4 +- .../cassandra/service/accord/AccordCommand.java | 31 +++++++-- .../service/accord/AccordCommandStore.java | 6 ++ .../cassandra/service/accord/AccordKeyspace.java | 11 ++- .../service/accord/AccordPartialCommand.java | 4 +- .../service/accord/async/AsyncWriter.java | 5 +- .../accord/serializers/AcceptSerializers.java | 32 +++++---- .../serializers/BeginInvalidationSerializers.java | 81 ++++++---------------- .../service/accord/AccordCommandTest.java | 3 +- .../service/accord/async/AsyncWriterTest.java | 6 +- 10 files changed, 96 insertions(+), 87 deletions(-) diff --git a/.build/include-accord.sh b/.build/include-accord.sh index 0dc01bf2ba..ea1e0544df 100755 --- a/.build/include-accord.sh +++ b/.build/include-accord.sh @@ -24,8 +24,8 @@ set -o nounset bin="$(cd "$(dirname "$0")" > /dev/null; pwd)" -accord_repo='https://github.com/apache/cassandra-accord.git' -accord_branch='trunk' +accord_repo='https://github.com/belliottsmith/cassandra-accord.git' +accord_branch='bugfix-invalidation' accord_src="$bin/cassandra-accord" checkout() { diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommand.java b/src/java/org/apache/cassandra/service/accord/AccordCommand.java index e7e526495f..45885f657d 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommand.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommand.java @@ -18,11 +18,14 @@ package org.apache.cassandra.service.accord; +import java.util.Map; import java.util.Objects; import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; + import com.google.common.base.Preconditions; import org.slf4j.Logger; @@ -115,6 +118,7 @@ public class AccordCommand extends Command implements AccordState<TxnId> public final StoredValue<RoutingKey> homeKey; public final StoredValue<RoutingKey> progressKey; public final StoredValue<PartialTxn> partialTxn; + public final StoredValue<Txn.Kind> kind; // TODO: store this in TxnId public final StoredValue<Ballot> promised; public final StoredValue<Ballot> accepted; public final StoredValue<Timestamp> executeAt; @@ -141,6 +145,7 @@ public class AccordCommand extends Command implements AccordState<TxnId> progressKey = new StoredValue<>(rw()); route = new StoredValue<>(rw()); partialTxn = new StoredValue<>(rw()); + kind = new StoredValue<>(rw()); promised = new StoredValue<>(rw()); accepted = new StoredValue<>(rw()); executeAt = new StoredValue<>(rw()); @@ -235,6 +240,7 @@ public class AccordCommand extends Command implements AccordState<TxnId> progressKey.set(null); route.set(null); partialTxn.set(null); + kind.set(null); executeAt.load(null); promised.set(Ballot.ZERO); accepted.set(Ballot.ZERO); @@ -525,7 +531,13 @@ public class AccordCommand extends Command implements AccordState<TxnId> @Override public Txn.Kind kind() { - return partialTxn.get().kind(); + return kind.get(); + } + + @Override + public void setKind(Txn.Kind kind) + { + this.kind.set(kind); } @Override @@ -762,7 +774,6 @@ public class AccordCommand extends Command implements AccordState<TxnId> waitingOnCommit.blindAdd(txnId); } - @Override public boolean isWaitingOnCommit() { return !waitingOnCommit.getView().isEmpty(); @@ -788,7 +799,6 @@ public class AccordCommand extends Command implements AccordState<TxnId> waitingOnApply.blindPut(executeAt, txnId); } - @Override public boolean isWaitingOnApply() { return !waitingOnApply.getView().isEmpty(); @@ -802,10 +812,21 @@ public class AccordCommand extends Command implements AccordState<TxnId> } @Override - public TxnId firstWaitingOnApply() + public boolean isWaitingOnDependency() + { + return isWaitingOnCommit() || isWaitingOnApply(); + } + + @Override + public TxnId firstWaitingOnApply(@Nullable TxnId ifExecutesBefore) { if (!isWaitingOnApply()) return null; - return waitingOnApply.getView().firstEntry().getValue(); + + Map.Entry<Timestamp, TxnId> first = waitingOnApply.getView().firstEntry(); + if (ifExecutesBefore == null || first.getKey().compareTo(ifExecutesBefore) < 0) + return first.getValue(); + + return null; } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java index 12b84a40c4..c7348a149f 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java @@ -325,6 +325,12 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore return time.uniqueNow(max); } + @Override + public NodeTimeService time() + { + return time; + } + public Timestamp maxConflict(Keys keys) { // TODO: efficiency diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index e17e7c01e8..8677dd8fea 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -46,6 +46,7 @@ import accord.primitives.Ballot; import accord.primitives.PartialDeps; import accord.primitives.PartialTxn; import accord.primitives.Timestamp; +import accord.primitives.Txn; import accord.primitives.TxnId; import accord.primitives.Writes; import accord.utils.DeterministicIdentitySet; @@ -142,6 +143,7 @@ public class AccordKeyspace + "route blob," + "durability int," + "txn blob," + + "kind int," + format("execute_at %s,", TIMESTAMP_TUPLE) + format("promised_ballot %s,", TIMESTAMP_TUPLE) + format("accepted_ballot %s,", TIMESTAMP_TUPLE) @@ -189,6 +191,7 @@ public class AccordKeyspace static final ColumnMetadata route = getColumn(Commands, "route"); static final ColumnMetadata durability = getColumn(Commands, "durability"); static final ColumnMetadata txn = getColumn(Commands, "txn"); + static final ColumnMetadata kind = getColumn(Commands, "kind"); static final ColumnMetadata execute_at = getColumn(Commands, "execute_at"); static final ColumnMetadata promised_ballot = getColumn(Commands, "promised_ballot"); static final ColumnMetadata accepted_ballot = getColumn(Commands, "accepted_ballot"); @@ -447,6 +450,9 @@ public class AccordKeyspace if (command.partialTxn.hasModifications()) builder.addCell(live(CommandsColumns.txn, timestampMicros, serializeOrNull(command.partialTxn.get(), CommandsSerializers.partialTxn))); + if (command.kind.hasModifications()) + builder.addCell(live(CommandsColumns.kind, timestampMicros, accessor.valueOf(command.kind.get().ordinal()))); + if (command.executeAt.hasModifications()) builder.addCell(live(CommandsColumns.execute_at, timestampMicros, serializeTimestamp(command.executeAt.get()))); @@ -457,7 +463,7 @@ public class AccordKeyspace builder.addCell(live(CommandsColumns.accepted_ballot, timestampMicros, serializeTimestamp(command.accepted.get()))); if (command.partialDeps.hasModifications()) - builder.addCell(live(CommandsColumns.dependencies, timestampMicros, serialize(command.partialDeps.get(), CommandsSerializers.partialDeps))); + builder.addCell(live(CommandsColumns.dependencies, timestampMicros, serializeOrNull(command.partialDeps.get(), CommandsSerializers.partialDeps))); if (command.writes.hasModifications()) builder.addCell(live(CommandsColumns.writes, timestampMicros, serialize(command.writes.get(), CommandsSerializers.writes))); @@ -593,10 +599,11 @@ public class AccordKeyspace // TODO: something less brittle than ordinal, more efficient than values() command.durability.load(Status.Durability.values()[row.getInt("durability", 0)]); command.partialTxn.load(deserializeOrNull(row.getBlob("txn"), CommandsSerializers.partialTxn)); + command.kind.load(row.has("kind") ? Txn.Kind.values()[row.getInt("kind")] : null); command.executeAt.load(deserializeTimestampOrNull(row, "execute_at", Timestamp::new)); command.promised.load(deserializeTimestampOrNull(row, "promised_ballot", Ballot::new)); command.accepted.load(deserializeTimestampOrNull(row, "accepted_ballot", Ballot::new)); - command.partialDeps.load(deserializeWithVersionOr(row, "dependencies", CommandsSerializers.partialDeps, () -> PartialDeps.NONE)); + command.partialDeps.load(deserializeOrNull(row.getBlob("dependencies"), CommandsSerializers.partialDeps)); command.writes.load(deserializeWithVersionOr(row, "writes", CommandsSerializers.writes, () -> null)); command.result.load(deserializeWithVersionOr(row, "result", CommandsSerializers.result, () -> null)); command.waitingOnCommit.load(deserializeTxnIdNavigableSet(row, "waiting_on_commit")); diff --git a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java index e0384489c2..779ce66f19 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java +++ b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java @@ -64,7 +64,9 @@ public class AccordPartialCommand extends CommandsForKey.TxnIdWithExecuteAt public AccordPartialCommand(Key key, Command command) { - this(command.txnId(), command.executeAt(), command.partialDeps().txnIds(key), command.status(), command.kind()); + this(command.txnId(), command.executeAt(), + command.partialDeps() == null ? Collections.emptyList() : command.partialDeps().txnIds(key), + command.status(), command.kind()); } public TxnId txnId() diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java index 8b20347d77..64f6bdeab0 100644 --- a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java +++ b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java @@ -254,7 +254,10 @@ public class AsyncWriter } // There won't be a txn to denormalize against until the command has been preaccepted - if (command.known().hasTxn && AccordPartialCommand.serializer.needsUpdate(command)) + // TODO (now): this maybe insufficient for correctness? on Accept we use the explicitly provided keys to register + // the transaction here. It's possible a sequence of two Accept, with second taking a higher timestamp + // might not reflect the update timestamp in the map? Probably best addressed following Blake's refactor. + if (command.known().isDefinitionKnown() && AccordPartialCommand.serializer.needsUpdate(command)) { for (Key key : command.partialTxn().keys()) { diff --git a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java index c2d5d88928..a635ccc671 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java @@ -21,8 +21,6 @@ package org.apache.cassandra.service.accord.serializers; import java.io.IOException; import accord.messages.Accept; -import accord.messages.Accept.AcceptNack; -import accord.messages.Accept.AcceptOk; import accord.messages.Accept.AcceptReply; import accord.primitives.PartialRoute; import accord.primitives.TxnId; @@ -108,15 +106,22 @@ public class AcceptSerializers { default: throw new AssertionError(); case Success: - out.writeByte(1); - DepsSerializer.partialDeps.serialize(((AcceptOk)reply).deps, out, version); + if (reply.deps != null) + { + out.writeByte(1); + DepsSerializer.partialDeps.serialize(reply.deps, out, version); + } + else + { + out.writeByte(2); + } break; case Redundant: - out.writeByte(2); + out.writeByte(3); break; case RejectedBallot: - out.writeByte(3); - CommandSerializers.ballot.serialize(((AcceptNack) reply).supersededBy, out, version); + out.writeByte(4); + CommandSerializers.ballot.serialize(reply.supersededBy, out, version); } } @@ -128,11 +133,13 @@ public class AcceptSerializers { default: throw new IllegalStateException("Unexpected AcceptNack type: " + type); case 1: - return new AcceptOk(DepsSerializer.partialDeps.deserialize(in, version)); + return new AcceptReply(DepsSerializer.partialDeps.deserialize(in, version)); case 2: - return AcceptNack.REDUNDANT; + return AcceptReply.ACCEPT_INVALIDATE; case 3: - return new AcceptNack(RejectedBallot, CommandSerializers.ballot.deserialize(in, version)); + return AcceptReply.REDUNDANT; + case 4: + return new AcceptReply(CommandSerializers.ballot.deserialize(in, version)); } } @@ -144,12 +151,13 @@ public class AcceptSerializers { default: throw new AssertionError(); case Success: - size += DepsSerializer.partialDeps.serializedSize(((AcceptOk)reply).deps, version); + if (reply.deps != null) + size += DepsSerializer.partialDeps.serializedSize(reply.deps, version); break; case Redundant: break; case RejectedBallot: - size += CommandSerializers.ballot.serializedSize(((AcceptNack) reply).supersededBy, version); + size += CommandSerializers.ballot.serializedSize(reply.supersededBy, version); } return size; } diff --git a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java index d4ebe727f9..79d9b1b2b0 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java @@ -24,8 +24,6 @@ import accord.api.RoutingKey; import accord.local.SaveStatus; import accord.local.Status; import accord.messages.BeginInvalidation; -import accord.messages.BeginInvalidation.InvalidateNack; -import accord.messages.BeginInvalidation.InvalidateOk; import accord.messages.BeginInvalidation.InvalidateReply; import accord.primitives.AbstractRoute; import accord.primitives.Ballot; @@ -46,7 +44,7 @@ public class BeginInvalidationSerializers public void serialize(BeginInvalidation begin, DataOutputPlus out, int version) throws IOException { CommandSerializers.txnId.serialize(begin.txnId, out, version); - KeySerializers.routingKey.serialize(begin.someKey, out, version); + KeySerializers.routingKeys.serialize(begin.someKeys, out, version); CommandSerializers.ballot.serialize(begin.ballot, out, version); } @@ -54,7 +52,7 @@ public class BeginInvalidationSerializers public BeginInvalidation deserialize(DataInputPlus in, int version) throws IOException { return new BeginInvalidation(CommandSerializers.txnId.deserialize(in, version), - KeySerializers.routingKey.deserialize(in, version), + KeySerializers.routingKeys.deserialize(in, version), CommandSerializers.ballot.deserialize(in, version)); } @@ -62,80 +60,45 @@ public class BeginInvalidationSerializers public long serializedSize(BeginInvalidation begin, int version) { return CommandSerializers.txnId.serializedSize(begin.txnId, version) - + KeySerializers.routingKey.serializedSize(begin.someKey, version) + + KeySerializers.routingKeys.serializedSize(begin.someKeys, version) + CommandSerializers.ballot.serializedSize(begin.ballot, version); } }; public static final IVersionedSerializer<InvalidateReply> reply = new IVersionedSerializer<InvalidateReply>() { - void serializeOk(InvalidateOk ok, DataOutputPlus out, int version) throws IOException - { - CommandSerializers.saveStatus.serialize(ok.status, out, version); - serializeNullable(KeySerializers.abstractRoute, ok.route, out, version); - serializeNullable(KeySerializers.routingKey, ok.homeKey, out, version); - } - - InvalidateOk deserializeOk(DataInputPlus in, int version) throws IOException - { - SaveStatus status = CommandSerializers.saveStatus.deserialize(in, version); - AbstractRoute route = deserializeNullable(KeySerializers.abstractRoute, in, version); - RoutingKey homeKey = deserializeNullable(KeySerializers.routingKey, in, version); - return new InvalidateOk(status, route, homeKey); - } - - long serializedOkSize(InvalidateOk ok, int version) - { - return CommandSerializers.saveStatus.serializedSize(ok.status, version) - + serializedSizeNullable(KeySerializers.abstractRoute, ok.route, version) - + serializedSizeNullable(KeySerializers.routingKey, ok.homeKey, version); - } - - void serializeNack(InvalidateNack nack, DataOutputPlus out, int version) throws IOException - { - CommandSerializers.ballot.serialize(nack.supersededBy, out, version); - serializeNullable(KeySerializers.routingKey, nack.homeKey, out, version); - } - - InvalidateNack deserializeNack(DataInputPlus in, int version) throws IOException - { - Ballot supersededBy = CommandSerializers.ballot.deserialize(in, version); - RoutingKey homeKey = deserializeNullable(KeySerializers.routingKey, in, version); - return new InvalidateNack(supersededBy, homeKey); - } - - long serializedNackSize(InvalidateNack nack, int version) - { - return CommandSerializers.ballot.serializedSize(nack.supersededBy, version) - + serializedSizeNullable(KeySerializers.routingKey, nack.homeKey, version); - } - @Override public void serialize(InvalidateReply reply, DataOutputPlus out, int version) throws IOException { - out.writeBoolean(reply.isOk()); - if (!reply.isOk()) - serializeNack((InvalidateNack) reply, out, version); - else - serializeOk((InvalidateOk) reply, out, version); + serializeNullable(CommandSerializers.ballot, reply.supersededBy, out, version); + CommandSerializers.ballot.serialize(reply.accepted, out, version); + CommandSerializers.status.serialize(reply.status, out, version); + out.writeBoolean(reply.acceptedFastPath); + serializeNullable(KeySerializers.abstractRoute, reply.route, out, version); + serializeNullable(KeySerializers.routingKey, reply.homeKey, out, version); } @Override public InvalidateReply deserialize(DataInputPlus in, int version) throws IOException { - boolean isOk = in.readBoolean(); - if (!isOk) - return deserializeNack(in, version); - - return deserializeOk(in, version); + Ballot supersededBy = deserializeNullable(CommandSerializers.ballot, in, version); + Ballot accepted = CommandSerializers.ballot.deserialize(in, version); + Status status = CommandSerializers.status.deserialize(in, version); + boolean acceptedFastPath = in.readBoolean(); + AbstractRoute route = deserializeNullable(KeySerializers.abstractRoute, in, version); + RoutingKey homeKey = deserializeNullable(KeySerializers.routingKey, in, version); + return new InvalidateReply(supersededBy, accepted, status, acceptedFastPath, route, homeKey); } @Override public long serializedSize(InvalidateReply reply, int version) { - return TypeSizes.sizeof(reply.isOk()) - + (reply.isOk() ? serializedOkSize((InvalidateOk) reply, version) - : serializedNackSize((InvalidateNack) reply, version)); + return serializedSizeNullable(CommandSerializers.ballot, reply.supersededBy, version) + + CommandSerializers.ballot.serializedSize(reply.accepted, version) + + CommandSerializers.status.serializedSize(reply.status, version) + + TypeSizes.BOOL_SIZE + + serializedSizeNullable(KeySerializers.abstractRoute, reply.route, version) + + serializedSizeNullable(KeySerializers.routingKey, reply.homeKey, version); } }; } diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java index e48cda6e84..54ac4736ac 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java @@ -130,8 +130,7 @@ public class AccordCommandTest commandStore.execute(accept, instance -> { Accept.AcceptReply reply = accept.apply(instance); Assert.assertTrue(reply.isOk()); - Accept.AcceptOk ok = (Accept.AcceptOk) reply; - Assert.assertTrue(ok.deps.isEmpty()); + Assert.assertTrue(reply.deps.isEmpty()); }).get(); commandStore.execute(accept, instance -> { diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java index 28c2508491..37d1b5f764 100644 --- a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java +++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java @@ -109,7 +109,7 @@ public class AsyncWriterTest blocking = AccordKeyspace.loadCommand(commandStore, blockingId); Assert.assertTrue(blocking.blockingApplyOn.getView().contains(waitingId)); - // now change the blocking command and check it's changes are reflected in the waiting command + // now change the blocking command and check its changes are reflected in the waiting command context = new AsyncContext(); blocking.setStatus(Status.ReadyToExecute); context.commands.add(blocking); @@ -120,7 +120,7 @@ public class AsyncWriterTest execute(commandStore, () -> { AsyncContext ctx = new AsyncContext(); commandStore.setContext(ctx); - TxnId blockingSummary = waitingFinal.firstWaitingOnApply(); + TxnId blockingSummary = waitingFinal.firstWaitingOnApply(null); Assert.assertEquals(blockingId, blockingSummary); commandStore.unsetContext(ctx); }); @@ -231,7 +231,7 @@ public class AsyncWriterTest // remove listener from PartialCommand commandStore.execute(contextFor(waitingId), cs -> { Command waiting = cs.command(waitingId); - TxnId blocking = ((AccordCommand)waiting).firstWaitingOnApply(); + TxnId blocking = ((AccordCommand)waiting).firstWaitingOnApply(null); Assert.assertNotNull(blocking); Assert.assertEquals(blockingId, blocking); }); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org