Repository: incubator-ratis Updated Branches: refs/heads/master cb68b56c2 -> 76e89911a
RATIS-326. Introduce truncateStateMachineData API in StateMachine interface. Contributed by Shashikant Banerjee Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/76e89911 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/76e89911 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/76e89911 Branch: refs/heads/master Commit: 76e89911ad8591cd16a460095cbb22a8a5625347 Parents: cb68b56 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Fri Dec 14 10:14:14 2018 +0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Fri Dec 14 10:14:14 2018 +0800 ---------------------------------------------------------------------- .../ratis/server/storage/RaftLogWorker.java | 19 +++++++++++++++---- .../ratis/server/storage/SegmentedRaftLog.java | 2 +- .../apache/ratis/statemachine/StateMachine.java | 12 ++++++++++++ 3 files changed, 28 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/76e89911/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java index ef5611c..345baed 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java @@ -310,9 +310,9 @@ class RaftLogWorker implements Runnable { return addIOTask(new WriteLog(entry)); } - Task truncate(TruncationSegments ts) { - LOG.info("{}: Truncating segments {}", name, ts); - return addIOTask(new TruncateLog(ts)); + Task truncate(TruncationSegments ts, long index) { + LOG.info("{}: Truncating segments {}, start index {}", name, ts, index); + return addIOTask(new TruncateLog(ts, index)); } private class WriteLog extends Task { @@ -447,15 +447,23 @@ class RaftLogWorker implements Runnable { private class TruncateLog extends Task { private final TruncationSegments segments; + private final long truncateIndex; - TruncateLog(TruncationSegments ts) { + TruncateLog(TruncationSegments ts, long index) { this.segments = ts; + this.truncateIndex = index; + } @Override void execute() throws IOException { IOUtils.cleanup(null, out); out = null; + CompletableFuture<Void> stateMachineFuture = null; + if (stateMachine != null) { + stateMachineFuture = stateMachine.truncateStateMachineData(truncateIndex); + } + if (segments.toTruncate != null) { File fileToTruncate = segments.toTruncate.isOpen ? storage.getStorageDir().getOpenLogFile( @@ -499,6 +507,9 @@ class RaftLogWorker implements Runnable { lastWrittenIndex = minStart - 1; } } + if (stateMachineFuture != null) { + IOUtils.getFromFuture(stateMachineFuture, () -> this + "-truncateStateMachineData"); + } updateFlushedIndex(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/76e89911/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index d2b579d..588b819 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -259,7 +259,7 @@ public class SegmentedRaftLog extends RaftLog { try(AutoCloseableLock writeLock = writeLock()) { RaftLogCache.TruncationSegments ts = cache.truncate(index); if (ts != null) { - Task task = fileLogWorker.truncate(ts); + Task task = fileLogWorker.truncate(ts, index); return task.getFuture(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/76e89911/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java index bb40c3c..c52bed1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -230,4 +230,16 @@ public interface StateMachine extends Closeable { default CompletableFuture<Void> flushStateMachineData(long index) { return CompletableFuture.completedFuture(null); } + + /** + * Truncates asynchronously the associated state machine data starting from the given log + * index from the state machine. It will be a no op if the corresponding log entry does not + * have associated stateMachineData. + * @param index log Index starting from which the stateMachineData will be truncated. + * @return a combined future for the remove task for all the log entries starting from + * given logIndex, null otherwise + */ + default CompletableFuture<Void> truncateStateMachineData(long index) { + return CompletableFuture.completedFuture(null); + } }
