ifesdjeen commented on code in PR #168:
URL: https://github.com/apache/cassandra-accord/pull/168#discussion_r1954210674


##########
accord-core/src/main/java/accord/impl/CommandChange.java:
##########
@@ -264,84 +271,53 @@ public Cleanup shouldCleanup(Input input, Agent agent, 
RedundantBefore redundant
             return cleanup;
         }
 
-        public Builder maybeCleanup(Cleanup cleanup)
+        public boolean maybeCleanup(Input input, Agent agent, RedundantBefore 
redundantBefore, DurableBefore durableBefore)
         {
-            if (saveStatus == null)
-                return this;
-
-            switch (cleanup)
-            {
-                case EXPUNGE:
-                case ERASE:
-                    return null;
-
-                case VESTIGIAL:
-                case INVALIDATE:
-                    return saveStatusOnly();
-
-                case TRUNCATE_WITH_OUTCOME:
-                case TRUNCATE:
-                    return expungePartial(cleanup, cleanup.appliesIfNot, 
cleanup == TRUNCATE_WITH_OUTCOME);
-
-                case NO:
-                    return this;
-                default:
-                    throw new UnhandledEnum(cleanup);
-            }
+            Cleanup cleanup = shouldCleanup(input, agent, redundantBefore, 
durableBefore);
+            return maybeCleanup(cleanup);
         }
 
-        public Builder expungePartial(Cleanup cleanup, SaveStatus saveStatus, 
boolean includeOutcome)
+        public boolean maybeCleanup(Cleanup cleanup)
         {
-            Invariants.require(txnId != null);
-            Builder builder = new Builder(txnId, ALL);
+            if (saveStatus == null)
+                return false;
 
-            builder.count++;
-            builder.nextCalled = true;
+            cleanup = cleanup.filter(saveStatus);
+            if (cleanup == NO)
+                return false;
 
-            Invariants.require(saveStatus != null);
-            builder.flags = setChanged(SAVE_STATUS, builder.flags);
-            builder.saveStatus = saveStatus;
-            builder.flags = setChanged(CLEANUP, builder.flags);
-            builder.cleanup = cleanup;
-            if (executeAt != null)
-            {
-                builder.flags = setChanged(EXECUTE_AT, builder.flags);
-                builder.executeAt = executeAt;
-            }
-            if (durability != null)
-            {
-                builder.flags = setChanged(DURABILITY, builder.flags);
-                builder.durability = durability;
-            }
-            if (participants != null)
-            {
-                builder.flags = setChanged(PARTICIPANTS, builder.flags);
-                builder.participants = participants;
-            }
-            if (includeOutcome && builder.writes != null)
-            {
-                builder.flags = setChanged(WRITES, builder.flags);
-                builder.writes = writes;
-            }
-
-            return builder;
+            truncate(cleanup.appliesIfNot);
+            return true;
         }
 
-        public Builder saveStatusOnly()
+        protected void truncate(SaveStatus newSaveStatus)
         {
-            Invariants.require(txnId != null);
-            Builder builder = new Builder(txnId, ALL);
-
-            builder.count++;
-            builder.nextCalled = true;
-
-            if (saveStatus != null)
+            int mask = saveStatusMasks[newSaveStatus.ordinal()];
+            // low flag bits represent fields already nulled out, so no need 
to visit them again
+            int iterable = toIterableSetFields(mask) & ~flags;
+            for (Field next = nextSetField(iterable); next != null; iterable = 
unsetIterable(next, iterable), next = nextSetField(iterable))
             {
-                builder.flags = setChanged(SAVE_STATUS, builder.flags);
-                builder.saveStatus = saveStatus;
+                switch (next)
+                {
+                    default: throw new UnhandledEnum(next);
+                    case PARTICIPANTS:      participants = null;               
      break;

Review Comment:
   definitely a nit and can be done in a separate patch, but would it make 
sense to make `clear` use this `switch` will `all fields` mask, and then simply 
unset `flags`? In this case we only have one place to update when adding a new 
field.



##########
accord-core/src/main/java/accord/topology/Topologies.java:
##########
@@ -39,6 +39,8 @@
 //  (e.g. at least implementing Topologies by Topology)
 public interface Topologies extends TopologySorter
 {
+    enum SelectNodeOwnership { RESELECT, SHARE }

Review Comment:
   could you add 1-line comments to both cases? They are obvious in context of 
the patch, but might be hard to understand after a while.
   
   Also, maybe `RECOMPUTE` or `SLICE` or `RESLICE`?



##########
accord-core/src/main/java/accord/primitives/LatestDeps.java:
##########
@@ -60,29 +63,110 @@ public static LatestDeps create(boolean inclusiveEnds, 
RoutingKey[] starts, Late
         }
     }
 
-    public static void withCommitted(Node node, TxnId txnId, Timestamp 
executeAt, Deps mergeDeps, FullRoute<?> route, Unseekables<?> missing, 
BiConsumer<?, Throwable> failureCallback, Consumer<Deps> withDeps)
+    public static void withCommitted(CoordinationAdapter<?> adapter, Node 
node, Merge merge, FullRoute<?> route, Ballot ballot, TxnId txnId, Timestamp 
executeAt, Txn txn, BiConsumer<?, Throwable> failureCallback, Consumer<Deps> 
withDeps)
     {
-        node.withEpoch(executeAt.epoch(), failureCallback, () -> {
-            if (missing.isEmpty())
+        if (!node.topology().hasEpoch(executeAt.epoch()))
+        {
+            node.withEpoch(executeAt.epoch(), failureCallback, () -> 
withCommitted(adapter, node, merge, route, ballot, txnId, executeAt, txn, 
failureCallback, withDeps));
+            return;
+        }
+
+        LatestDeps.MergedCommitResult merged = merge.mergeCommitOrStable(null, 
DepsCommitted);
+        Route<?> missing = route.without(merged.sufficientFor);
+        Deps committed = merged.deps;
+        if (missing.isEmpty()) withDeps.accept(merged.deps);
+        else
+        {
+            // we include the committed deps in our proposal so that if we 
contact a replica that participates in one of the committed shards we include 
any deps it should see
+            Deps propose = merge.mergeProposal(missing).with(committed);
+            adapter.proposeOnly(node, missing, missing, SHARE, route, SLOW, 
ballot, txnId, txn, executeAt, propose, (success, fail) -> {
+                if (fail != null) failureCallback.accept(null, fail);
+                else
+                {
+                    success = success.intersecting(missing).asFullUnsafe();
+                    withDeps.accept(success.with(committed));
+                }
+            });
+        }
+    }
+
+    public static void withStable(CoordinationAdapter<?> adapter, Node node, 
Merge merge, Route<?> require, @Nullable Route<?> sendTo, @Nullable 
SelectNodeOwnership selectSendTo, FullRoute<?> route, Ballot ballot, TxnId 
txnId, Timestamp executeAt, Txn txn, BiConsumer<?, ? super Throwable> 
failureCallback, Consumer<Deps> withDeps)
+    {
+        Invariants.require(sendTo == null || selectSendTo != null);
+        if (!node.topology().hasEpoch(executeAt.epoch()))
+        {
+            node.withEpoch(executeAt.epoch(), failureCallback, () -> 
withStable(adapter, node, merge, require, sendTo, selectSendTo, route, ballot, 
txnId, executeAt, txn, failureCallback, withDeps));
+            return;
+        }
+
+        LatestDeps.MergedCommitResult mergedStable = 
merge.mergeCommitOrStable(null, DepsKnown);
+        Deps stable = mergedStable.deps;
+        Route<?> stabilise = require.without(mergedStable.sufficientFor);
+        if (stabilise.isEmpty()) withDeps.accept(stable);
+        else
+        {
+            LatestDeps.MergedCommitResult mergedCommitted = 
merge.mergeCommitOrStable(stabilise, DepsCommitted);
+            Route<?> propose = 
stabilise.without(mergedCommitted.sufficientFor);
+            // we merge with stable to make sure we can send a full Commit to 
any replica that overlaps the stable and unstable ranges
+            Deps committed = stable.with(mergedCommitted.deps);
+            if (propose.isEmpty())
             {
-                withDeps.accept(mergeDeps);
+                stabilise(adapter, node, committed, stabilise, sendTo, 
selectSendTo, route, ballot, txnId, executeAt, txn, failureCallback, withDeps);
             }
             else
             {
-                CollectLatestDeps.withLatestDeps(node, txnId, route, missing, 
executeAt, (extraDeps, fail) -> {
-                    if (fail != null)
-                    {
-                        failureCallback.accept(null, fail);
-                    }
+                // we merge with committed to make sure we can send a full 
Commit to any replica that overlaps the stable and unstable ranges
+                Deps notaccepted = 
committed.with(merge.mergeProposal(propose));

Review Comment:
   nit: `notAccepted` for consistency with other places?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to