This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 593e042535d60e773cfa5f7c4b6a63e2fb6e5b30
Author: Blake Eggleston <bl...@ultrablake.com>
AuthorDate: Tue Sep 24 14:59:45 2024 -0700

    Prune older MaxConflicts entries
    
    patch by Blake Eggleston; reviewed by Aleksey Yeschenko for
    CASSANDRA-19952
---
 accord-core/src/main/java/accord/api/Agent.java    | 17 +++++++++
 .../src/main/java/accord/local/CommandStore.java   | 41 +++++++++++++++++++++-
 .../src/main/java/accord/local/MaxConflicts.java   |  1 +
 .../src/test/java/accord/impl/TestAgent.java       | 12 +++++++
 .../src/test/java/accord/impl/list/ListAgent.java  | 12 +++++++
 .../java/accord/local/cfk/CommandsForKeyTest.java  | 22 ++++++++++--
 .../main/java/accord/maelstrom/MaelstromAgent.java | 12 +++++++
 7 files changed, 113 insertions(+), 4 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/Agent.java 
b/accord-core/src/main/java/accord/api/Agent.java
index 116c8e4a..f0589863 100644
--- a/accord-core/src/main/java/accord/api/Agent.java
+++ b/accord-core/src/main/java/accord/api/Agent.java
@@ -99,6 +99,23 @@ public interface Agent extends UncaughtExceptionListener
      */
     int cfkPruneInterval();
 
+    /**
+     * Controls pruning of MaxConflicts
+     *
+     * The timestamp delta between a timestamp being added to MaxConflicts and 
the minimum timestamp we
+     * want to maintain granular max conflict data for. A smaller value 
minimizes the amount of memory taken
+     * for granular maxConflicts data. A larger value minimizes the number of 
unneccesary fast path rejections,
+     * within the bounds of inter-node clock drift and messaging latencies.
+     */
+    long maxConflictsHlcPruneDelta();
+
+    /**
+     * Controls pruning of MaxConflicts
+     *
+     * Every n updates, max conflicts is pruned to the delta, where n is the 
value returned by this method
+     */
+    long maxConflictsPruneInterval();
+
     /**
      * Create an empty transaction that Accord can use for its own internal 
transactions.
      */
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index f993974b..083a3efd 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -57,6 +57,9 @@ import java.util.function.Supplier;
 
 import com.google.common.collect.ImmutableSortedMap;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import accord.primitives.FullRoute;
 import accord.primitives.Participants;
 import accord.primitives.RangeDeps;
@@ -82,6 +85,7 @@ import static accord.utils.Invariants.illegalState;
  */
 public abstract class CommandStore implements AgentExecutor
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(CommandStore.class);
     static class EpochUpdate
     {
         final RangesForEpoch newRangesForEpoch;
@@ -157,6 +161,7 @@ public abstract class CommandStore implements AgentExecutor
     // TODO (expected): store this only once per node
     private DurableBefore durableBefore = DurableBefore.EMPTY;
     private MaxConflicts maxConflicts = MaxConflicts.EMPTY;
+    private int maxConflictsUpdates = 0;
     protected RangesForEpoch rangesForEpoch;
 
     /**
@@ -282,6 +287,8 @@ public abstract class CommandStore implements AgentExecutor
         this.maxConflicts = maxConflicts;
     }
 
+    protected int dumpCounter = 0;
+
     protected void updateMaxConflicts(Command prev, Command updated)
     {
         Timestamp executeAt = updated.executeAt();
@@ -290,7 +297,39 @@ public abstract class CommandStore implements AgentExecutor
         if (keysOrRanges == null) return;
         if (prev != null && prev.executeAt() != null && 
prev.executeAt().compareTo(executeAt) >= 0) return;
 
-        setMaxConflicts(maxConflicts.update(keysOrRanges, executeAt));
+
+        MaxConflicts updatedMaxConflicts = maxConflicts.update(keysOrRanges, 
executeAt);
+        if (++maxConflictsUpdates >= agent.maxConflictsPruneInterval())
+        {
+            int initialSize = updatedMaxConflicts.size();
+            MaxConflicts initialConflicts = updatedMaxConflicts;
+            long pruneHlc = executeAt.hlc() - 
agent.maxConflictsHlcPruneDelta();
+            Timestamp pruneBefore = pruneHlc > 0 ? 
Timestamp.fromValues(executeAt.epoch(), pruneHlc, executeAt.node) : null;
+            Ranges ranges = rangesForEpoch.all();
+            if (pruneBefore != null)
+                updatedMaxConflicts = updatedMaxConflicts.update(ranges, 
pruneBefore);
+
+            int prunedSize = updatedMaxConflicts.size();
+            if (initialSize > 100 && prunedSize == initialSize)
+            {
+                logger.info("Ineffective prune for {}. Initial size: {}, 
pruned size: {}, executeAt: {}, pruneBefore: {}", ranges, initialSize, 
prunedSize, executeAt, pruneBefore);
+                if (dumpCounter == 0)
+                {
+                    logger.info("initial MaxConflicts dump: {}", 
initialConflicts);
+                    logger.info("pruned MaxConflicts dump: {}", 
updatedMaxConflicts);
+                }
+                dumpCounter++;
+                dumpCounter %= 100;
+            }
+            else if (prunedSize != initialSize)
+            {
+                logger.info("Successfully pruned {} to {}", initialSize, 
prunedSize);
+            }
+
+
+            maxConflictsUpdates = 0;
+        }
+        setMaxConflicts(updatedMaxConflicts);
     }
 
     /**
diff --git a/accord-core/src/main/java/accord/local/MaxConflicts.java 
b/accord-core/src/main/java/accord/local/MaxConflicts.java
index 421c54f6..6d24c4a4 100644
--- a/accord-core/src/main/java/accord/local/MaxConflicts.java
+++ b/accord-core/src/main/java/accord/local/MaxConflicts.java
@@ -23,6 +23,7 @@ import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
 import accord.utils.BTreeReducingRangeMap;
 
+
 // TODO (expected): track read/write conflicts separately
 class MaxConflicts extends BTreeReducingRangeMap<Timestamp>
 {
diff --git a/accord-core/src/test/java/accord/impl/TestAgent.java 
b/accord-core/src/test/java/accord/impl/TestAgent.java
index 3b1ebd19..5f7c5240 100644
--- a/accord-core/src/test/java/accord/impl/TestAgent.java
+++ b/accord-core/src/test/java/accord/impl/TestAgent.java
@@ -136,6 +136,18 @@ public class TestAgent implements Agent
         return 1;
     }
 
+    @Override
+    public long maxConflictsHlcPruneDelta()
+    {
+        return 500;
+    }
+
+    @Override
+    public long maxConflictsPruneInterval()
+    {
+        return 0;
+    }
+
     @Override
     public Txn emptySystemTxn(Txn.Kind kind, Seekables<?, ?> keysOrRanges)
     {
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java 
b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index 2e390931..abab22cf 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -125,12 +125,24 @@ public class ListAgent implements Agent
         return 100;
     }
 
+    @Override
+    public long maxConflictsPruneInterval()
+    {
+        return 0;
+    }
+
     @Override
     public int cfkPruneInterval()
     {
         return 1;
     }
 
+    @Override
+    public long maxConflictsHlcPruneDelta()
+    {
+        return 50;
+    }
+
     @Override
     public Txn emptySystemTxn(Txn.Kind kind, Seekables<?, ?> keysOrRanges)
     {
diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java 
b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
index 271037ad..1da0474a 100644
--- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
+++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
@@ -604,8 +604,10 @@ public class CommandsForKeyTest
             final float pruneChance = rnd.nextFloat() * (rnd.nextBoolean() ? 
0.1f : 0.01f);
             final int pruneHlcDelta = 1 << rnd.nextInt(10);
             final int pruneInterval = 1 << rnd.nextInt(5);
+            final int maxConflictsHlcDelta = 1 << rnd.nextInt(10);
+            final int maxConflictsPruneInterval = 1 << rnd.nextInt(5);
             final Canon canon = new Canon(rnd);
-            TestCommandStore commandStore = new 
TestCommandStore(pruneInterval, pruneHlcDelta);
+            TestCommandStore commandStore = new 
TestCommandStore(pruneInterval, pruneHlcDelta, maxConflictsHlcDelta, 
maxConflictsPruneInterval);
             TestSafeCommandsForKey safeCfk = new TestSafeCommandsForKey(new 
CommandsForKey(KEY));
             TestSafeStore safeStore = new TestSafeStore(canon, commandStore, 
safeCfk);
             int c = 0;
@@ -910,14 +912,16 @@ public class CommandsForKeyTest
             }
         }
 
-        final int pruneInterval, pruneHlcDelta;
+        final int pruneInterval, pruneHlcDelta, maxConflictsHlcDelta, 
maxConflictsPruneInterval;
         final ArrayDeque<Task> queue = new ArrayDeque<>();
 
-        protected TestCommandStore(int pruneInterval, int pruneHlcDelta)
+        protected TestCommandStore(int pruneInterval, int pruneHlcDelta, int 
maxConflictsHlcDelta, int maxConflictsPruneInterval)
         {
             super(0, null, null, null, ignore -> new 
ProgressLog.NoOpProgressLog(), ignore -> new DefaultLocalListeners(new 
DefaultRemoteListeners((a, b, c, d, e)->{}), DefaultNotifySink.INSTANCE), new 
EpochUpdateHolder());
             this.pruneInterval = pruneInterval;
             this.pruneHlcDelta = pruneHlcDelta;
+            this.maxConflictsHlcDelta = maxConflictsHlcDelta;
+            this.maxConflictsPruneInterval = maxConflictsPruneInterval;
         }
 
         @Override
@@ -1021,6 +1025,18 @@ public class CommandsForKeyTest
             return pruneInterval;
         }
 
+        @Override
+        public long maxConflictsHlcPruneDelta()
+        {
+            return maxConflictsHlcDelta;
+        }
+
+        @Override
+        public long maxConflictsPruneInterval()
+        {
+            return maxConflictsPruneInterval;
+        }
+
         @Override
         public Txn emptySystemTxn(Txn.Kind kind, Seekables<?, ?> keysOrRanges)
         {
diff --git 
a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
index a48df717..0da3bf73 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java
@@ -96,12 +96,24 @@ public class MaelstromAgent implements Agent
         return 1000;
     }
 
+    @Override
+    public long maxConflictsPruneInterval()
+    {
+        return 0;
+    }
+
     @Override
     public int cfkPruneInterval()
     {
         return 1;
     }
 
+    @Override
+    public long maxConflictsHlcPruneDelta()
+    {
+        return 500;
+    }
+
     @Override
     public Txn emptySystemTxn(Txn.Kind kind, Seekables<?, ?> keysOrRanges)
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to