[ 
https://issues.apache.org/jira/browse/RATIS-363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16658542#comment-16658542
 ] 

Tsz Wo Nicholas Sze commented on RATIS-363:
-------------------------------------------

Thanks for the update.  

bq. ... with a little change because the updater thread may be stuck in wait 
and therefore needs to be interrupted, before 'join'. 

Good point.  I agree with [~msingh] that it is better to call notifyUpdater() 
than interrupt().  Also, we should check shouldStop() before wait() then.

Some more comments:
- stopIndex should be volatile.
- We should call stop() at the end of the loop instead of changing isRunning(). 
 Otherwise, the last snapshot may not be taken.
- We may remove the currentAppliedIndex parameter from shouldTakeSnapshot and 
shouldStop.  Just use lastAppliedIndex directly.
- Similarly, we may call raftLog.getLastCommittedIndex() in stopAndJoin() 
instead of passing it as a parameter.

The code becomes:
{code}
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -55,6 +55,7 @@ class StateMachineUpdater implements Runnable {
   private final RaftLog raftLog;
 
   private volatile long lastAppliedIndex;
+  private volatile long stopIndex = -1;
 
   private final boolean autoSnapshotEnabled;
   private final long autoSnapshotThreshold;
@@ -82,15 +83,25 @@ class StateMachineUpdater implements Runnable {
     updater.start();
   }
 
-  void stop() {
+  private void stop() {
     state = State.STOP;
-    updater.interrupt();
     try {
       stateMachine.close();
     } catch (IOException ignored) {
+      LOG.warn(server.getId() + ": Failed to close "
+          + stateMachine.getClass().getSimpleName()
+          + " " + stateMachine, ignored);
     }
   }
 
+
+  void stopAndJoin() throws InterruptedException {
+    Preconditions.assertTrue(this.stopIndex == -1, "stopIndex already set");
+    this.stopIndex = raftLog.getLastCommittedIndex();
+    notifyUpdater();
+    updater.join();
+  }
+
   void reloadStateMachine() {
     state = State.RELOAD;
     notifyUpdater();
@@ -113,7 +124,7 @@ class StateMachineUpdater implements Runnable {
           // when the peers just start, the committedIndex is initialized as 0
           // and will be updated only after the leader contacts other peers.
           // Thus initially lastAppliedIndex can be greater than lastCommitted.
-          while (lastAppliedIndex >= raftLog.getLastCommittedIndex()) {
+          while (lastAppliedIndex >= raftLog.getLastCommittedIndex() && 
!shouldStop()) {
             wait();
           }
         }
@@ -158,7 +169,7 @@ class StateMachineUpdater implements Runnable {
         }
 
         // check if need to trigger a snapshot
-        if (shouldTakeSnapshot(lastAppliedIndex)) {
+        if (shouldTakeSnapshot()) {
           if (futures.isInitialized()) {
             JavaUtils.allOf(futures.get()).get();
           }
@@ -166,6 +177,9 @@ class StateMachineUpdater implements Runnable {
           // TODO purge logs, including log cache. but should keep log for 
leader's RPCSenders
           lastSnapshotIndex = lastAppliedIndex;
         }
+        if (shouldStop()) {
+          stop();
+        }
       } catch (InterruptedException e) {
         if (!isRunning()) {
           LOG.info("{}: the StateMachineUpdater is interrupted and will 
exit.", this);
@@ -184,9 +198,16 @@ class StateMachineUpdater implements Runnable {
     return state != State.STOP;
   }
 
-  private boolean shouldTakeSnapshot(long currentAppliedIndex) {
-    return autoSnapshotEnabled && (state != State.RELOAD) &&
-        (currentAppliedIndex - lastSnapshotIndex >= autoSnapshotThreshold);
+  private boolean shouldStop() {
+    return stopIndex > -1 && lastAppliedIndex >= stopIndex;
+  }
+
+  private boolean shouldTakeSnapshot() {
+    return autoSnapshotEnabled &&
+        ( ((state != State.RELOAD) &&
+            (lastAppliedIndex - lastSnapshotIndex >= autoSnapshotThreshold))
+          || (shouldStop())
+        );
   }
 
   long getLastAppliedIndex() {
{code}

> StateMachineUpdater should wait for committed transactions to be applied 
> before shutdown.
> -----------------------------------------------------------------------------------------
>
>                 Key: RATIS-363
>                 URL: https://issues.apache.org/jira/browse/RATIS-363
>             Project: Ratis
>          Issue Type: Improvement
>          Components: server
>            Reporter: Jitendra Nath Pandey
>            Assignee: Jitendra Nath Pandey
>            Priority: Blocker
>         Attachments: RATIS-363.3.patch
>
>
> StateMachineUpdater should apply all the committed transactions before 
> shutdown, otherwise committed data can be lost.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to