This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d7c7a3  RATIS-573. Handle Raft Log Append Failure.  Contributed by 
Supratim Deka
9d7c7a3 is described below

commit 9d7c7a38a97b4a93704259d4e837cf6fed74c856
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jun 24 13:19:25 2019 +0800

    RATIS-573. Handle Raft Log Append Failure.  Contributed by Supratim Deka
---
 .../server/raftlog/segmented/SegmentedRaftLog.java |  4 +++
 .../raftlog/segmented/SegmentedRaftLogWorker.java  | 33 ++++++++++++++++++++--
 .../apache/ratis/statemachine/StateMachine.java    | 10 +++++++
 .../raftlog/segmented/TestSegmentedRaftLog.java    | 19 +++++++++++--
 4 files changed, 60 insertions(+), 6 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index a602bdc..7ea75ce 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -83,9 +83,13 @@ public class SegmentedRaftLog extends RaftLog {
     }
 
     void done() {
+      Preconditions.assertTrue(!future.isDone());
       future.complete(getEndIndex());
     }
 
+    void failed(IOException e) {
+      this.getFuture().completeExceptionally(e);
+    }
 
     abstract void execute() throws IOException;
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index a29a727..c4f6aa9 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -30,6 +30,7 @@ import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.metrics.RatisMetrics;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.server.storage.RaftStorage;
 import 
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.SegmentFileInfo;
 import 
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncationSegments;
@@ -232,20 +233,33 @@ class SegmentedRaftLogWorker implements Runnable {
 
   @Override
   public void run() {
+
+    // if and when a log task encounters an exception
+    RaftLogIOException logIOException = null;
+
     while (running) {
       try {
         Task task = queue.poll(ONE_SECOND);
         if (task != null) {
           try {
-            task.execute();
+            if (logIOException != null) {
+              throw logIOException;
+            } else {
+              task.execute();
+            }
           } catch (IOException e) {
             if (task.getEndIndex() < lastWrittenIndex) {
               LOG.info("Ignore IOException when handling task " + task
                   + " which is smaller than the lastWrittenIndex."
                   + " There should be a snapshot installed.", e);
             } else {
-              task.getFuture().completeExceptionally(e);
-              throw e;
+              task.failed(e);
+              if (logIOException == null) {
+                logIOException = new RaftLogIOException("Log already failed"
+                    + " at index " + task.getEndIndex()
+                    + " for task " + task, e);
+              }
+              continue;
             }
           }
           task.done();
@@ -391,6 +405,12 @@ class SegmentedRaftLogWorker implements Runnable {
     }
 
     @Override
+    void failed(IOException e) {
+      stateMachine.notifyLogFailed(e, entry);
+      super.failed(e);
+    }
+
+    @Override
     int getSerializedSize() {
       return ServerProtoUtils.getSerializedSize(entry);
     }
@@ -461,6 +481,13 @@ class SegmentedRaftLogWorker implements Runnable {
     }
 
     @Override
+    void failed(IOException e) {
+      // not failed for a specific log entry, but an entire segment
+      stateMachine.notifyLogFailed(e, null);
+      super.failed(e);
+    }
+
+    @Override
     long getEndIndex() {
       return endIndex;
     }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 19bd9e6..5ce8e96 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -221,6 +221,16 @@ public interface StateMachine extends Closeable {
   }
 
   /**
+   * Notify the state machine that the pipeline has failed.
+   * This notification is triggered when a log operation throws an Exception.
+   * @param t Exception which was caught, indicates possible cause.
+   * @param failedEntry if append failed for a specific entry, null otherwise.
+   */
+  default void notifyLogFailed(Throwable t, LogEntryProto failedEntry) {
+
+  }
+
+  /**
    * Notify the Leader's state machine that a leader has not been elected for 
a long time
    * this notification is based on "raft.server.leader.election.timeout"
    *
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index dcbe013..cda1043 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -36,6 +36,7 @@ import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
@@ -62,8 +63,6 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class TestSegmentedRaftLog extends BaseTest {
@@ -523,8 +522,22 @@ public class TestSegmentedRaftLog extends BaseTest {
     final StateMachine sm = new BaseStateMachine() {
       @Override
       public CompletableFuture<?> writeStateMachineData(LogEntryProto entry) {
+        lifeCycle.transition(LifeCycle.State.STARTING);
+        lifeCycle.transition(LifeCycle.State.RUNNING);
+
         return new CompletableFuture<>(); // the future never completes
       }
+
+      @Override
+      public void notifyLogFailed(Throwable t, LogEntryProto entry) {
+        LOG.info("Test StateMachine : Ratis log failed notification received, "
+            + "as expected. Transition to PAUSED state.");
+
+        Assert.assertNotNull(entry);
+
+        lifeCycle.transition(LifeCycle.State.PAUSING);
+        lifeCycle.transition(LifeCycle.State.PAUSED);
+      }
     };
 
     RaftServerImpl server = mock(RaftServerImpl.class);
@@ -542,7 +555,7 @@ public class TestSegmentedRaftLog extends BaseTest {
       }
     }
     Assert.assertNotNull(ex);
-    verify(server, times(1)).shutdown(false);
+    Assert.assertSame(LifeCycle.State.PAUSED, sm.getLifeCycleState());
     throw ex;
   }
 

Reply via email to