Repository: incubator-ratis
Updated Branches:
  refs/heads/master da8a0b494 -> 1d07b18ed


RATIS-396. Support retry if writeStateMachineData throws TimeoutIOException.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/1d07b18e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/1d07b18e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/1d07b18e

Branch: refs/heads/master
Commit: 1d07b18eda61318c14661691051079f943a8ee29
Parents: da8a0b4
Author: Tsz Wo Nicholas Sze <[email protected]>
Authored: Wed Nov 7 11:34:28 2018 -0800
Committer: Tsz Wo Nicholas Sze <[email protected]>
Committed: Wed Nov 7 11:34:28 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/ratis/util/IOUtils.java     |  2 +-
 .../ratis/server/RaftServerConfigKeys.java      | 24 ++++++++++
 .../ratis/server/storage/RaftLogWorker.java     | 50 +++++++++++++++-----
 3 files changed, 64 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d07b18e/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
index dbb8d20..1f81170 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
@@ -84,7 +84,7 @@ public interface IOUtils {
     } catch (CompletionException e) {
       throw asIOException(JavaUtils.unwrapCompletionException(e));
     } catch(TimeoutException e) {
-      throw new TimeoutIOException("Timeout: " + name.get(), e);
+      throw new TimeoutIOException("Timeout " + timeout + ": " + name.get(), 
e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d07b18e/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 22799f2..5e0017c 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -91,6 +91,15 @@ public interface RaftServerConfigKeys {
       setBoolean(properties::setBoolean, USE_MEMORY_KEY, useMemory);
     }
 
+    String QUEUE_SIZE_KEY = PREFIX + ".queue.size";
+    int QUEUE_SIZE_DEFAULT = 4096;
+    static int queueSize(RaftProperties properties) {
+      return getInt(properties::getInt, QUEUE_SIZE_KEY, QUEUE_SIZE_DEFAULT, 
getDefaultLog(), requireMin(1));
+    }
+    static void setQueueSize(RaftProperties properties, int queueSize) {
+      setInt(properties::setInt, QUEUE_SIZE_KEY, queueSize, requireMin(1));
+    }
+
     String SEGMENT_SIZE_MAX_KEY = PREFIX + ".segment.size.max";
     SizeInBytes SEGMENT_SIZE_MAX_DEFAULT = SizeInBytes.valueOf("8MB");
     static SizeInBytes segmentSizeMax(RaftProperties properties) {
@@ -166,6 +175,21 @@ public interface RaftServerConfigKeys {
       static void setSyncTimeout(RaftProperties properties, TimeDuration 
syncTimeout) {
         setTimeDuration(properties::setTimeDuration, SYNC_TIMEOUT_KEY, 
syncTimeout);
       }
+
+      /**
+       * -1: retry indefinitely
+       *  0: no retry
+       * >0: the number of retries
+       */
+      String SYNC_TIMEOUT_RETRY_KEY = PREFIX + ".sync.timeout.retry";
+      int SYNC_TIMEOUT_RETRY_DEFAULT = -1;
+      static int syncTimeoutRetry(RaftProperties properties) {
+        return getInt(properties::getInt, SYNC_TIMEOUT_RETRY_KEY, 
SYNC_TIMEOUT_RETRY_DEFAULT, getDefaultLog(),
+            requireMin(-1));
+      }
+      static void setSyncTimeoutRetry(RaftProperties properties, int 
syncTimeoutRetry) {
+        setInt(properties::setInt, SYNC_TIMEOUT_RETRY_KEY, syncTimeoutRetry, 
requireMin(-1));
+      }
     }
 
     interface Appender {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d07b18e/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
index 6c64057..82b0b49 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
@@ -22,6 +22,7 @@ import com.codahale.metrics.Timer;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.metrics.RatisMetricsRegistry;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.TimeoutIOException;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.ServerProtoUtils;
@@ -50,11 +51,39 @@ import java.util.function.Supplier;
 class RaftLogWorker implements Runnable {
   static final Logger LOG = LoggerFactory.getLogger(RaftLogWorker.class);
 
+  static class StateMachineDataPolicy {
+    private final boolean sync;
+    private final TimeDuration syncTimeout;
+    private final int syncTimeoutRetry;
+
+    StateMachineDataPolicy(RaftProperties properties) {
+      this.sync = RaftServerConfigKeys.Log.StateMachineData.sync(properties);
+      this.syncTimeout = 
RaftServerConfigKeys.Log.StateMachineData.syncTimeout(properties);
+      this.syncTimeoutRetry = 
RaftServerConfigKeys.Log.StateMachineData.syncTimeoutRetry(properties);
+    }
+
+    boolean isSync() {
+      return sync;
+    }
+
+    void getFromFuture(CompletableFuture<?> future, Supplier<Object> getName) 
throws IOException {
+      Preconditions.assertTrue(isSync());
+      for(int retry = 0; syncTimeoutRetry == -1 || retry <= syncTimeoutRetry; 
retry++) {
+        try {
+          IOUtils.getFromFuture(future, getName, syncTimeout);
+          return;
+        } catch(TimeoutIOException e) {
+          LOG.warn("Timeout " + retry + (syncTimeoutRetry == -1? "/~": "/" + 
syncTimeoutRetry), e);
+        }
+      }
+    }
+  }
+
   private final String name;
   /**
    * The task queue accessed by rpc handler threads and the io worker thread.
    */
-  private final BlockingQueue<Task> queue = new ArrayBlockingQueue<>(4096);
+  private final BlockingQueue<Task> queue;
   private volatile boolean running = true;
   private final Thread workerThread;
 
@@ -80,8 +109,7 @@ class RaftLogWorker implements Runnable {
   private final long preallocatedSize;
   private final int bufferSize;
 
-  private final boolean stateMachineDataSync;
-  private final TimeDuration stateMachineDataSyncTimeout;
+  private final StateMachineDataPolicy stateMachineDataPolicy;
 
   RaftLogWorker(RaftPeerId selfId, StateMachine stateMachine, Runnable 
submitUpdateCommitEvent,
       RaftStorage storage, RaftProperties properties) {
@@ -92,13 +120,13 @@ class RaftLogWorker implements Runnable {
     this.stateMachine = stateMachine;
 
     this.storage = storage;
+    this.queue = new 
ArrayBlockingQueue<>(RaftServerConfigKeys.Log.queueSize(properties));
     this.segmentMaxSize = 
RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
     this.preallocatedSize = 
RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
     this.bufferSize = 
RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
     this.forceSyncNum = RaftServerConfigKeys.Log.forceSyncNum(properties);
 
-    this.stateMachineDataSync = 
RaftServerConfigKeys.Log.StateMachineData.sync(properties);
-    this.stateMachineDataSyncTimeout = 
RaftServerConfigKeys.Log.StateMachineData.syncTimeout(properties);
+    this.stateMachineDataPolicy = new StateMachineDataPolicy(properties);
 
     this.workerThread = new Thread(this, name);
 
@@ -229,12 +257,12 @@ class RaftLogWorker implements Runnable {
         final CompletableFuture<Void> f = stateMachine != null ?
             stateMachine.flushStateMachineData(lastWrittenIndex) :
             CompletableFuture.completedFuture(null);
-        if (stateMachineDataSync) {
-          IOUtils.getFromFuture(f, () -> name + "-flushStateMachineData", 
stateMachineDataSyncTimeout);
+        if (stateMachineDataPolicy.isSync()) {
+          stateMachineDataPolicy.getFromFuture(f, () -> this + 
"-flushStateMachineData");
         }
         out.flush();
-        if (!stateMachineDataSync) {
-          IOUtils.getFromFuture(f, () -> name + "-flushStateMachineData");
+        if (!stateMachineDataPolicy.isSync()) {
+          IOUtils.getFromFuture(f, () -> this + "-flushStateMachineData");
         }
       } finally {
         timerContext.stop();
@@ -306,8 +334,8 @@ class RaftLogWorker implements Runnable {
 
     @Override
     public void execute() throws IOException {
-      if (stateMachineDataSync && stateMachineFuture != null) {
-        IOUtils.getFromFuture(stateMachineFuture, () -> this + 
"-writeStateMachineData", stateMachineDataSyncTimeout);
+      if (stateMachineDataPolicy.isSync() && stateMachineFuture != null) {
+        stateMachineDataPolicy.getFromFuture(stateMachineFuture, () -> this + 
"-writeStateMachineData");
       }
 
       Preconditions.assertTrue(out != null);

Reply via email to