This is an automated email from the ASF dual-hosted git repository. aleksey pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 593e042535d60e773cfa5f7c4b6a63e2fb6e5b30 Author: Blake Eggleston <bl...@ultrablake.com> AuthorDate: Tue Sep 24 14:59:45 2024 -0700 Prune older MaxConflicts entries patch by Blake Eggleston; reviewed by Aleksey Yeschenko for CASSANDRA-19952 --- accord-core/src/main/java/accord/api/Agent.java | 17 +++++++++ .../src/main/java/accord/local/CommandStore.java | 41 +++++++++++++++++++++- .../src/main/java/accord/local/MaxConflicts.java | 1 + .../src/test/java/accord/impl/TestAgent.java | 12 +++++++ .../src/test/java/accord/impl/list/ListAgent.java | 12 +++++++ .../java/accord/local/cfk/CommandsForKeyTest.java | 22 ++++++++++-- .../main/java/accord/maelstrom/MaelstromAgent.java | 12 +++++++ 7 files changed, 113 insertions(+), 4 deletions(-) diff --git a/accord-core/src/main/java/accord/api/Agent.java b/accord-core/src/main/java/accord/api/Agent.java index 116c8e4a..f0589863 100644 --- a/accord-core/src/main/java/accord/api/Agent.java +++ b/accord-core/src/main/java/accord/api/Agent.java @@ -99,6 +99,23 @@ public interface Agent extends UncaughtExceptionListener */ int cfkPruneInterval(); + /** + * Controls pruning of MaxConflicts + * + * The timestamp delta between a timestamp being added to MaxConflicts and the minimum timestamp we + * want to maintain granular max conflict data for. A smaller value minimizes the amount of memory taken + * for granular maxConflicts data. A larger value minimizes the number of unneccesary fast path rejections, + * within the bounds of inter-node clock drift and messaging latencies. + */ + long maxConflictsHlcPruneDelta(); + + /** + * Controls pruning of MaxConflicts + * + * Every n updates, max conflicts is pruned to the delta, where n is the value returned by this method + */ + long maxConflictsPruneInterval(); + /** * Create an empty transaction that Accord can use for its own internal transactions. */ diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index f993974b..083a3efd 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -57,6 +57,9 @@ import java.util.function.Supplier; import com.google.common.collect.ImmutableSortedMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import accord.primitives.FullRoute; import accord.primitives.Participants; import accord.primitives.RangeDeps; @@ -82,6 +85,7 @@ import static accord.utils.Invariants.illegalState; */ public abstract class CommandStore implements AgentExecutor { + private static final Logger logger = LoggerFactory.getLogger(CommandStore.class); static class EpochUpdate { final RangesForEpoch newRangesForEpoch; @@ -157,6 +161,7 @@ public abstract class CommandStore implements AgentExecutor // TODO (expected): store this only once per node private DurableBefore durableBefore = DurableBefore.EMPTY; private MaxConflicts maxConflicts = MaxConflicts.EMPTY; + private int maxConflictsUpdates = 0; protected RangesForEpoch rangesForEpoch; /** @@ -282,6 +287,8 @@ public abstract class CommandStore implements AgentExecutor this.maxConflicts = maxConflicts; } + protected int dumpCounter = 0; + protected void updateMaxConflicts(Command prev, Command updated) { Timestamp executeAt = updated.executeAt(); @@ -290,7 +297,39 @@ public abstract class CommandStore implements AgentExecutor if (keysOrRanges == null) return; if (prev != null && prev.executeAt() != null && prev.executeAt().compareTo(executeAt) >= 0) return; - setMaxConflicts(maxConflicts.update(keysOrRanges, executeAt)); + + MaxConflicts updatedMaxConflicts = maxConflicts.update(keysOrRanges, executeAt); + if (++maxConflictsUpdates >= agent.maxConflictsPruneInterval()) + { + int initialSize = updatedMaxConflicts.size(); + MaxConflicts initialConflicts = updatedMaxConflicts; + long pruneHlc = executeAt.hlc() - agent.maxConflictsHlcPruneDelta(); + Timestamp pruneBefore = pruneHlc > 0 ? Timestamp.fromValues(executeAt.epoch(), pruneHlc, executeAt.node) : null; + Ranges ranges = rangesForEpoch.all(); + if (pruneBefore != null) + updatedMaxConflicts = updatedMaxConflicts.update(ranges, pruneBefore); + + int prunedSize = updatedMaxConflicts.size(); + if (initialSize > 100 && prunedSize == initialSize) + { + logger.info("Ineffective prune for {}. Initial size: {}, pruned size: {}, executeAt: {}, pruneBefore: {}", ranges, initialSize, prunedSize, executeAt, pruneBefore); + if (dumpCounter == 0) + { + logger.info("initial MaxConflicts dump: {}", initialConflicts); + logger.info("pruned MaxConflicts dump: {}", updatedMaxConflicts); + } + dumpCounter++; + dumpCounter %= 100; + } + else if (prunedSize != initialSize) + { + logger.info("Successfully pruned {} to {}", initialSize, prunedSize); + } + + + maxConflictsUpdates = 0; + } + setMaxConflicts(updatedMaxConflicts); } /** diff --git a/accord-core/src/main/java/accord/local/MaxConflicts.java b/accord-core/src/main/java/accord/local/MaxConflicts.java index 421c54f6..6d24c4a4 100644 --- a/accord-core/src/main/java/accord/local/MaxConflicts.java +++ b/accord-core/src/main/java/accord/local/MaxConflicts.java @@ -23,6 +23,7 @@ import accord.primitives.Seekables; import accord.primitives.Timestamp; import accord.utils.BTreeReducingRangeMap; + // TODO (expected): track read/write conflicts separately class MaxConflicts extends BTreeReducingRangeMap<Timestamp> { diff --git a/accord-core/src/test/java/accord/impl/TestAgent.java b/accord-core/src/test/java/accord/impl/TestAgent.java index 3b1ebd19..5f7c5240 100644 --- a/accord-core/src/test/java/accord/impl/TestAgent.java +++ b/accord-core/src/test/java/accord/impl/TestAgent.java @@ -136,6 +136,18 @@ public class TestAgent implements Agent return 1; } + @Override + public long maxConflictsHlcPruneDelta() + { + return 500; + } + + @Override + public long maxConflictsPruneInterval() + { + return 0; + } + @Override public Txn emptySystemTxn(Txn.Kind kind, Seekables<?, ?> keysOrRanges) { diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java b/accord-core/src/test/java/accord/impl/list/ListAgent.java index 2e390931..abab22cf 100644 --- a/accord-core/src/test/java/accord/impl/list/ListAgent.java +++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java @@ -125,12 +125,24 @@ public class ListAgent implements Agent return 100; } + @Override + public long maxConflictsPruneInterval() + { + return 0; + } + @Override public int cfkPruneInterval() { return 1; } + @Override + public long maxConflictsHlcPruneDelta() + { + return 50; + } + @Override public Txn emptySystemTxn(Txn.Kind kind, Seekables<?, ?> keysOrRanges) { diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java index 271037ad..1da0474a 100644 --- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java +++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java @@ -604,8 +604,10 @@ public class CommandsForKeyTest final float pruneChance = rnd.nextFloat() * (rnd.nextBoolean() ? 0.1f : 0.01f); final int pruneHlcDelta = 1 << rnd.nextInt(10); final int pruneInterval = 1 << rnd.nextInt(5); + final int maxConflictsHlcDelta = 1 << rnd.nextInt(10); + final int maxConflictsPruneInterval = 1 << rnd.nextInt(5); final Canon canon = new Canon(rnd); - TestCommandStore commandStore = new TestCommandStore(pruneInterval, pruneHlcDelta); + TestCommandStore commandStore = new TestCommandStore(pruneInterval, pruneHlcDelta, maxConflictsHlcDelta, maxConflictsPruneInterval); TestSafeCommandsForKey safeCfk = new TestSafeCommandsForKey(new CommandsForKey(KEY)); TestSafeStore safeStore = new TestSafeStore(canon, commandStore, safeCfk); int c = 0; @@ -910,14 +912,16 @@ public class CommandsForKeyTest } } - final int pruneInterval, pruneHlcDelta; + final int pruneInterval, pruneHlcDelta, maxConflictsHlcDelta, maxConflictsPruneInterval; final ArrayDeque<Task> queue = new ArrayDeque<>(); - protected TestCommandStore(int pruneInterval, int pruneHlcDelta) + protected TestCommandStore(int pruneInterval, int pruneHlcDelta, int maxConflictsHlcDelta, int maxConflictsPruneInterval) { super(0, null, null, null, ignore -> new ProgressLog.NoOpProgressLog(), ignore -> new DefaultLocalListeners(new DefaultRemoteListeners((a, b, c, d, e)->{}), DefaultNotifySink.INSTANCE), new EpochUpdateHolder()); this.pruneInterval = pruneInterval; this.pruneHlcDelta = pruneHlcDelta; + this.maxConflictsHlcDelta = maxConflictsHlcDelta; + this.maxConflictsPruneInterval = maxConflictsPruneInterval; } @Override @@ -1021,6 +1025,18 @@ public class CommandsForKeyTest return pruneInterval; } + @Override + public long maxConflictsHlcPruneDelta() + { + return maxConflictsHlcDelta; + } + + @Override + public long maxConflictsPruneInterval() + { + return maxConflictsPruneInterval; + } + @Override public Txn emptySystemTxn(Txn.Kind kind, Seekables<?, ?> keysOrRanges) { diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java index a48df717..0da3bf73 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromAgent.java @@ -96,12 +96,24 @@ public class MaelstromAgent implements Agent return 1000; } + @Override + public long maxConflictsPruneInterval() + { + return 0; + } + @Override public int cfkPruneInterval() { return 1; } + @Override + public long maxConflictsHlcPruneDelta() + { + return 500; + } + @Override public Txn emptySystemTxn(Txn.Kind kind, Seekables<?, ?> keysOrRanges) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org