[ https://issues.apache.org/jira/browse/RATIS-363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16658542#comment-16658542 ]
Tsz Wo Nicholas Sze commented on RATIS-363: ------------------------------------------- Thanks for the update. bq. ... with a little change because the updater thread may be stuck in wait and therefore needs to be interrupted, before 'join'. Good point. I agree with [~msingh] that it is better to call notifyUpdater() than interrupt(). Also, we should check shouldStop() before wait() then. Some more comments: - stopIndex should be volatile. - We should call stop() at the end of the loop instead of changing isRunning(). Otherwise, the last snapshot may not be taken. - We may remove the currentAppliedIndex parameter from shouldTakeSnapshot and shouldStop. Just use lastAppliedIndex directly. - Similarly, we may call raftLog.getLastCommittedIndex() in stopAndJoin() instead of passing it as a parameter. The code becomes: {code} +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -55,6 +55,7 @@ class StateMachineUpdater implements Runnable { private final RaftLog raftLog; private volatile long lastAppliedIndex; + private volatile long stopIndex = -1; private final boolean autoSnapshotEnabled; private final long autoSnapshotThreshold; @@ -82,15 +83,25 @@ class StateMachineUpdater implements Runnable { updater.start(); } - void stop() { + private void stop() { state = State.STOP; - updater.interrupt(); try { stateMachine.close(); } catch (IOException ignored) { + LOG.warn(server.getId() + ": Failed to close " + + stateMachine.getClass().getSimpleName() + + " " + stateMachine, ignored); } } + + void stopAndJoin() throws InterruptedException { + Preconditions.assertTrue(this.stopIndex == -1, "stopIndex already set"); + this.stopIndex = raftLog.getLastCommittedIndex(); + notifyUpdater(); + updater.join(); + } + void reloadStateMachine() { state = State.RELOAD; notifyUpdater(); @@ -113,7 +124,7 @@ class StateMachineUpdater implements Runnable { // when the peers just start, the committedIndex is initialized as 0 // and will be updated only after the leader contacts other peers. // Thus initially lastAppliedIndex can be greater than lastCommitted. - while (lastAppliedIndex >= raftLog.getLastCommittedIndex()) { + while (lastAppliedIndex >= raftLog.getLastCommittedIndex() && !shouldStop()) { wait(); } } @@ -158,7 +169,7 @@ class StateMachineUpdater implements Runnable { } // check if need to trigger a snapshot - if (shouldTakeSnapshot(lastAppliedIndex)) { + if (shouldTakeSnapshot()) { if (futures.isInitialized()) { JavaUtils.allOf(futures.get()).get(); } @@ -166,6 +177,9 @@ class StateMachineUpdater implements Runnable { // TODO purge logs, including log cache. but should keep log for leader's RPCSenders lastSnapshotIndex = lastAppliedIndex; } + if (shouldStop()) { + stop(); + } } catch (InterruptedException e) { if (!isRunning()) { LOG.info("{}: the StateMachineUpdater is interrupted and will exit.", this); @@ -184,9 +198,16 @@ class StateMachineUpdater implements Runnable { return state != State.STOP; } - private boolean shouldTakeSnapshot(long currentAppliedIndex) { - return autoSnapshotEnabled && (state != State.RELOAD) && - (currentAppliedIndex - lastSnapshotIndex >= autoSnapshotThreshold); + private boolean shouldStop() { + return stopIndex > -1 && lastAppliedIndex >= stopIndex; + } + + private boolean shouldTakeSnapshot() { + return autoSnapshotEnabled && + ( ((state != State.RELOAD) && + (lastAppliedIndex - lastSnapshotIndex >= autoSnapshotThreshold)) + || (shouldStop()) + ); } long getLastAppliedIndex() { {code} > StateMachineUpdater should wait for committed transactions to be applied > before shutdown. > ----------------------------------------------------------------------------------------- > > Key: RATIS-363 > URL: https://issues.apache.org/jira/browse/RATIS-363 > Project: Ratis > Issue Type: Improvement > Components: server > Reporter: Jitendra Nath Pandey > Assignee: Jitendra Nath Pandey > Priority: Blocker > Attachments: RATIS-363.3.patch > > > StateMachineUpdater should apply all the committed transactions before > shutdown, otherwise committed data can be lost. -- This message was sent by Atlassian JIRA (v7.6.3#76005)