Repository: incubator-ratis Updated Branches: refs/heads/master da8a0b494 -> 1d07b18ed
RATIS-396. Support retry if writeStateMachineData throws TimeoutIOException. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/1d07b18e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/1d07b18e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/1d07b18e Branch: refs/heads/master Commit: 1d07b18eda61318c14661691051079f943a8ee29 Parents: da8a0b4 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Wed Nov 7 11:34:28 2018 -0800 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Wed Nov 7 11:34:28 2018 -0800 ---------------------------------------------------------------------- .../java/org/apache/ratis/util/IOUtils.java | 2 +- .../ratis/server/RaftServerConfigKeys.java | 24 ++++++++++ .../ratis/server/storage/RaftLogWorker.java | 50 +++++++++++++++----- 3 files changed, 64 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d07b18e/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java index dbb8d20..1f81170 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java @@ -84,7 +84,7 @@ public interface IOUtils { } catch (CompletionException e) { throw asIOException(JavaUtils.unwrapCompletionException(e)); } catch(TimeoutException e) { - throw new TimeoutIOException("Timeout: " + name.get(), e); + throw new TimeoutIOException("Timeout " + timeout + ": " + name.get(), e); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d07b18e/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index 22799f2..5e0017c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -91,6 +91,15 @@ public interface RaftServerConfigKeys { setBoolean(properties::setBoolean, USE_MEMORY_KEY, useMemory); } + String QUEUE_SIZE_KEY = PREFIX + ".queue.size"; + int QUEUE_SIZE_DEFAULT = 4096; + static int queueSize(RaftProperties properties) { + return getInt(properties::getInt, QUEUE_SIZE_KEY, QUEUE_SIZE_DEFAULT, getDefaultLog(), requireMin(1)); + } + static void setQueueSize(RaftProperties properties, int queueSize) { + setInt(properties::setInt, QUEUE_SIZE_KEY, queueSize, requireMin(1)); + } + String SEGMENT_SIZE_MAX_KEY = PREFIX + ".segment.size.max"; SizeInBytes SEGMENT_SIZE_MAX_DEFAULT = SizeInBytes.valueOf("8MB"); static SizeInBytes segmentSizeMax(RaftProperties properties) { @@ -166,6 +175,21 @@ public interface RaftServerConfigKeys { static void setSyncTimeout(RaftProperties properties, TimeDuration syncTimeout) { setTimeDuration(properties::setTimeDuration, SYNC_TIMEOUT_KEY, syncTimeout); } + + /** + * -1: retry indefinitely + * 0: no retry + * >0: the number of retries + */ + String SYNC_TIMEOUT_RETRY_KEY = PREFIX + ".sync.timeout.retry"; + int SYNC_TIMEOUT_RETRY_DEFAULT = -1; + static int syncTimeoutRetry(RaftProperties properties) { + return getInt(properties::getInt, SYNC_TIMEOUT_RETRY_KEY, SYNC_TIMEOUT_RETRY_DEFAULT, getDefaultLog(), + requireMin(-1)); + } + static void setSyncTimeoutRetry(RaftProperties properties, int syncTimeoutRetry) { + setInt(properties::setInt, SYNC_TIMEOUT_RETRY_KEY, syncTimeoutRetry, requireMin(-1)); + } } interface Appender { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d07b18e/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java index 6c64057..82b0b49 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java @@ -22,6 +22,7 @@ import com.codahale.metrics.Timer; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.metrics.RatisMetricsRegistry; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.TimeoutIOException; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.ServerProtoUtils; @@ -50,11 +51,39 @@ import java.util.function.Supplier; class RaftLogWorker implements Runnable { static final Logger LOG = LoggerFactory.getLogger(RaftLogWorker.class); + static class StateMachineDataPolicy { + private final boolean sync; + private final TimeDuration syncTimeout; + private final int syncTimeoutRetry; + + StateMachineDataPolicy(RaftProperties properties) { + this.sync = RaftServerConfigKeys.Log.StateMachineData.sync(properties); + this.syncTimeout = RaftServerConfigKeys.Log.StateMachineData.syncTimeout(properties); + this.syncTimeoutRetry = RaftServerConfigKeys.Log.StateMachineData.syncTimeoutRetry(properties); + } + + boolean isSync() { + return sync; + } + + void getFromFuture(CompletableFuture<?> future, Supplier<Object> getName) throws IOException { + Preconditions.assertTrue(isSync()); + for(int retry = 0; syncTimeoutRetry == -1 || retry <= syncTimeoutRetry; retry++) { + try { + IOUtils.getFromFuture(future, getName, syncTimeout); + return; + } catch(TimeoutIOException e) { + LOG.warn("Timeout " + retry + (syncTimeoutRetry == -1? "/~": "/" + syncTimeoutRetry), e); + } + } + } + } + private final String name; /** * The task queue accessed by rpc handler threads and the io worker thread. */ - private final BlockingQueue<Task> queue = new ArrayBlockingQueue<>(4096); + private final BlockingQueue<Task> queue; private volatile boolean running = true; private final Thread workerThread; @@ -80,8 +109,7 @@ class RaftLogWorker implements Runnable { private final long preallocatedSize; private final int bufferSize; - private final boolean stateMachineDataSync; - private final TimeDuration stateMachineDataSyncTimeout; + private final StateMachineDataPolicy stateMachineDataPolicy; RaftLogWorker(RaftPeerId selfId, StateMachine stateMachine, Runnable submitUpdateCommitEvent, RaftStorage storage, RaftProperties properties) { @@ -92,13 +120,13 @@ class RaftLogWorker implements Runnable { this.stateMachine = stateMachine; this.storage = storage; + this.queue = new ArrayBlockingQueue<>(RaftServerConfigKeys.Log.queueSize(properties)); this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); this.preallocatedSize = RaftServerConfigKeys.Log.preallocatedSize(properties).getSize(); this.bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt(); this.forceSyncNum = RaftServerConfigKeys.Log.forceSyncNum(properties); - this.stateMachineDataSync = RaftServerConfigKeys.Log.StateMachineData.sync(properties); - this.stateMachineDataSyncTimeout = RaftServerConfigKeys.Log.StateMachineData.syncTimeout(properties); + this.stateMachineDataPolicy = new StateMachineDataPolicy(properties); this.workerThread = new Thread(this, name); @@ -229,12 +257,12 @@ class RaftLogWorker implements Runnable { final CompletableFuture<Void> f = stateMachine != null ? stateMachine.flushStateMachineData(lastWrittenIndex) : CompletableFuture.completedFuture(null); - if (stateMachineDataSync) { - IOUtils.getFromFuture(f, () -> name + "-flushStateMachineData", stateMachineDataSyncTimeout); + if (stateMachineDataPolicy.isSync()) { + stateMachineDataPolicy.getFromFuture(f, () -> this + "-flushStateMachineData"); } out.flush(); - if (!stateMachineDataSync) { - IOUtils.getFromFuture(f, () -> name + "-flushStateMachineData"); + if (!stateMachineDataPolicy.isSync()) { + IOUtils.getFromFuture(f, () -> this + "-flushStateMachineData"); } } finally { timerContext.stop(); @@ -306,8 +334,8 @@ class RaftLogWorker implements Runnable { @Override public void execute() throws IOException { - if (stateMachineDataSync && stateMachineFuture != null) { - IOUtils.getFromFuture(stateMachineFuture, () -> this + "-writeStateMachineData", stateMachineDataSyncTimeout); + if (stateMachineDataPolicy.isSync() && stateMachineFuture != null) { + stateMachineDataPolicy.getFromFuture(stateMachineFuture, () -> this + "-writeStateMachineData"); } Preconditions.assertTrue(out != null);
