This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 3c41936 RATIS-579. ExitUtils.terminate results in JVM terminated for
peers not part of the group. Contributed by Siddharth Wagle
3c41936 is described below
commit 3c41936730062e27cbe5097f90b38a7ed01efad3
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Jun 14 15:00:32 2019 -0700
RATIS-579. ExitUtils.terminate results in JVM terminated for peers not part
of the group. Contributed by Siddharth Wagle
---
.../main/java/org/apache/ratis/util/ExitUtils.java | 2 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 2 +-
.../ratis/server/impl/StateMachineUpdater.java | 6 ++--
.../server/raftlog/segmented/SegmentedRaftLog.java | 3 +-
.../raftlog/segmented/SegmentedRaftLogWorker.java | 21 ++++++++-----
.../raftlog/segmented/TestSegmentedRaftLog.java | 35 +++++++++++++---------
6 files changed, 43 insertions(+), 26 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java
index 669c13c..2f66db6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java
@@ -106,7 +106,7 @@ public interface ExitUtils {
/** @throws AssertionError if {@link #isTerminated()} == true. */
static void assertNotTerminated() {
if (ExitUtils.isTerminated()) {
- throw new AssertionError("Unexpected exited.", getFirstExitException());
+ throw new AssertionError("Unexpected exit.", getFirstExitException());
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 6bfd7a1..ecd790d 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -240,7 +240,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
return RaftGroup.valueOf(groupId, getRaftConf().getPeers());
}
- void shutdown(boolean deleteDirectory) {
+ public void shutdown(boolean deleteDirectory) {
lifeCycle.checkStateAndClose(() -> {
LOG.info("{}: shutdown {}", getId(), groupId);
try {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index c380781..699d8e0 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -160,11 +160,13 @@ class StateMachineUpdater implements Runnable {
LOG.info("{}: the StateMachineUpdater is interrupted and will
exit.", this);
} else {
final String s = this + ": the StateMachineUpdater is wrongly
interrupted";
- ExitUtils.terminate(1, s, e, LOG);
+ LOG.error(s, e);
+ server.shutdown(false);
}
} catch (Throwable t) {
final String s = this + ": the StateMachineUpdater hits Throwable";
- ExitUtils.terminate(2, s, t, LOG);
+ LOG.error(s, t);
+ server.shutdown(false);
}
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 1586ecb..a602bdc 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -123,7 +123,8 @@ public class SegmentedRaftLog extends RaftLog {
this.storage = storage;
segmentMaxSize =
RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
this.cache = new SegmentedRaftLogCache(selfId, storage, properties);
- this.fileLogWorker = new SegmentedRaftLogWorker(selfId, stateMachine,
submitUpdateCommitEvent, storage, properties);
+ this.fileLogWorker = new SegmentedRaftLogWorker(selfId, stateMachine,
+ submitUpdateCommitEvent, server, storage, properties);
stateMachineCachingEnabled =
RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(properties);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 78e7191..a29a727 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -27,6 +27,7 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.TimeoutIOException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.metrics.RatisMetrics;
import org.apache.ratis.server.storage.RaftStorage;
@@ -119,11 +120,12 @@ class SegmentedRaftLogWorker implements Runnable {
private final long segmentMaxSize;
private final long preallocatedSize;
private final int bufferSize;
+ private final RaftServerImpl server;
private final StateMachineDataPolicy stateMachineDataPolicy;
SegmentedRaftLogWorker(RaftPeerId selfId, StateMachine stateMachine,
Runnable submitUpdateCommitEvent,
- RaftStorage storage, RaftProperties properties) {
+ RaftServerImpl server, RaftStorage storage,
RaftProperties properties) {
this.name = selfId + "-" + getClass().getSimpleName() + ":" +
storage.getStorageDir();
LOG.info("new {} for {}", name, storage);
@@ -131,6 +133,7 @@ class SegmentedRaftLogWorker implements Runnable {
this.stateMachine = stateMachine;
this.metricRegistry =
RatisMetrics.createMetricRegistryForLogWorker(selfId.toString());
this.storage = storage;
+ this.server = server;
final SizeInBytes queueByteLimit =
RaftServerConfigKeys.Log.queueByteLimit(properties);
final int queueElementLimit =
RaftServerConfigKeys.Log.queueElementLimit(properties);
this.queue =
@@ -214,7 +217,10 @@ class SegmentedRaftLogWorker implements Runnable {
LOG.info("Got InterruptedException when adding task " + task
+ ". The SegmentedRaftLogWorker already stopped.");
} else {
- ExitUtils.terminate(2, "Failed to add IO task " + task, t, LOG);
+ LOG.error("Failed to add IO task {}", task, t);
+ if (server != null) {
+ server.shutdown(false);
+ }
}
}
return task;
@@ -238,6 +244,7 @@ class SegmentedRaftLogWorker implements Runnable {
+ " which is smaller than the lastWrittenIndex."
+ " There should be a snapshot installed.", e);
} else {
+ task.getFuture().completeExceptionally(e);
throw e;
}
}
@@ -258,11 +265,11 @@ class SegmentedRaftLogWorker implements Runnable {
LOG.info("{} got closed and hit exception",
Thread.currentThread().getName(), t);
} else {
- // TODO avoid terminating the jvm, we should
- // 1) support multiple log directories
- // 2) only shutdown the raft server impl
- ExitUtils.terminate(1, Thread.currentThread().getName() + " failed.",
- t, LOG);
+ LOG.error("{} hit exception", Thread.currentThread().getName(), t);
+ // Shutdown raft group instead of terminating jvm.
+ if (server != null) {
+ server.shutdown(false);
+ }
}
}
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 07b4f2c..dcbe013 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -36,7 +36,6 @@ import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
-import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
@@ -54,14 +53,17 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestSegmentedRaftLog extends BaseTest {
@@ -509,14 +511,13 @@ public class TestSegmentedRaftLog extends BaseTest {
}
}
- @Test
- public void testSegmentedRaftLogStateMachineDataTimeoutIOException() throws
Exception {
+ @Test(expected = TimeoutIOException.class)
+ public void testServerShutdownOnTimeoutIOException() throws Throwable {
RaftServerConfigKeys.Log.StateMachineData.setSync(properties, true);
final TimeDuration syncTimeout = TimeDuration.valueOf(100,
TimeUnit.MILLISECONDS);
RaftServerConfigKeys.Log.StateMachineData.setSyncTimeout(properties,
syncTimeout);
final int numRetries = 2;
RaftServerConfigKeys.Log.StateMachineData.setSyncTimeoutRetry(properties,
numRetries);
- ExitUtils.disableSystemExit();
final LogEntryProto entry = prepareLogEntry(0, 0, null, true);
final StateMachine sm = new BaseStateMachine() {
@@ -526,17 +527,23 @@ public class TestSegmentedRaftLog extends BaseTest {
}
};
- try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, sm,
null, storage, -1, properties)) {
+ RaftServerImpl server = mock(RaftServerImpl.class);
+ doNothing().when(server).shutdown(false);
+ Throwable ex = null; // TimeoutIOException
+ try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, server, sm,
null, storage, -1, properties)) {
raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
- raftLog.appendEntry(entry); // SegmentedRaftLogWorker should catch
TimeoutIOException
-
- JavaUtils.attempt(() -> {
- final ExitUtils.ExitException exitException =
ExitUtils.getFirstExitException();
- Objects.requireNonNull(exitException, "exitException == null");
- Assert.assertEquals(TimeoutIOException.class,
exitException.getCause().getClass());
- }, 3*numRetries, syncTimeout, "SegmentedRaftLogWorker should catch
TimeoutIOException and exit", LOG);
- ExitUtils.clear();
+ // SegmentedRaftLogWorker should catch TimeoutIOException
+ CompletableFuture<Long> f = raftLog.appendEntry(entry);
+ // Wait for async writeStateMachineData to finish
+ try {
+ f.get();
+ } catch (ExecutionException e) {
+ ex = e.getCause();
+ }
}
+ Assert.assertNotNull(ex);
+ verify(server, times(1)).shutdown(false);
+ throw ex;
}
static Thread startAppendEntryThread(RaftLog raftLog, LogEntryProto entry) {