This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
commit 48239a7a2df06af3c3b4fb61932b83a8c205d7fd Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Fri Aug 30 14:04:29 2019 -0700 RATIS-659. StateMachineUpdater#stopAndJoin might not take snapshot due to race condition. Contributed by Lokesh Jain --- .../ratis/server/impl/StateMachineUpdater.java | 26 +++++++++++++--------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index d17f10b..8553a01 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.LongStream; @@ -151,17 +152,10 @@ class StateMachineUpdater implements Runnable { } final MemoizedSupplier<List<CompletableFuture<Message>>> futures = applyLog(); - - // check if need to trigger a snapshot - if (shouldTakeSnapshot()) { - if (futures.isInitialized()) { - JavaUtils.allOf(futures.get()).get(); - } - - takeSnapshot(); - } + checkAndTakeSnapshot(futures); if (shouldStop()) { + checkAndTakeSnapshot(futures); stop(); } } catch (InterruptedException e) { @@ -231,6 +225,18 @@ class StateMachineUpdater implements Runnable { return futures; } + private void checkAndTakeSnapshot(MemoizedSupplier<List<CompletableFuture<Message>>> futures) + throws ExecutionException, InterruptedException { + // check if need to trigger a snapshot + if (shouldTakeSnapshot()) { + if (futures.isInitialized()) { + JavaUtils.allOf(futures.get()).get(); + } + + takeSnapshot(); + } + } + private void takeSnapshot() { final long i; try { @@ -266,7 +272,7 @@ class StateMachineUpdater implements Runnable { if (autoSnapshotThreshold == null) { return false; } else if (shouldStop()) { - return true; + return getLastAppliedIndex() - snapshotIndex.get() > 0; } return state == State.RUNNING && getLastAppliedIndex() - snapshotIndex.get() >= autoSnapshotThreshold; }
