This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c41936  RATIS-579. ExitUtils.terminate results in JVM terminated for 
peers not part of the group.  Contributed by Siddharth Wagle
3c41936 is described below

commit 3c41936730062e27cbe5097f90b38a7ed01efad3
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Jun 14 15:00:32 2019 -0700

    RATIS-579. ExitUtils.terminate results in JVM terminated for peers not part 
of the group.  Contributed by Siddharth Wagle
---
 .../main/java/org/apache/ratis/util/ExitUtils.java |  2 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   |  2 +-
 .../ratis/server/impl/StateMachineUpdater.java     |  6 ++--
 .../server/raftlog/segmented/SegmentedRaftLog.java |  3 +-
 .../raftlog/segmented/SegmentedRaftLogWorker.java  | 21 ++++++++-----
 .../raftlog/segmented/TestSegmentedRaftLog.java    | 35 +++++++++++++---------
 6 files changed, 43 insertions(+), 26 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java
index 669c13c..2f66db6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java
@@ -106,7 +106,7 @@ public interface ExitUtils {
   /** @throws AssertionError if {@link #isTerminated()} == true. */
   static void assertNotTerminated() {
     if (ExitUtils.isTerminated()) {
-      throw new AssertionError("Unexpected exited.", getFirstExitException());
+      throw new AssertionError("Unexpected exit.", getFirstExitException());
     }
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 6bfd7a1..ecd790d 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -240,7 +240,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     return RaftGroup.valueOf(groupId, getRaftConf().getPeers());
   }
 
-  void shutdown(boolean deleteDirectory) {
+  public void shutdown(boolean deleteDirectory) {
     lifeCycle.checkStateAndClose(() -> {
       LOG.info("{}: shutdown {}", getId(), groupId);
       try {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index c380781..699d8e0 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -160,11 +160,13 @@ class StateMachineUpdater implements Runnable {
           LOG.info("{}: the StateMachineUpdater is interrupted and will 
exit.", this);
         } else {
           final String s = this + ": the StateMachineUpdater is wrongly 
interrupted";
-          ExitUtils.terminate(1, s, e, LOG);
+          LOG.error(s, e);
+          server.shutdown(false);
         }
       } catch (Throwable t) {
         final String s = this + ": the StateMachineUpdater hits Throwable";
-        ExitUtils.terminate(2, s, t, LOG);
+        LOG.error(s, t);
+        server.shutdown(false);
       }
     }
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 1586ecb..a602bdc 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -123,7 +123,8 @@ public class SegmentedRaftLog extends RaftLog {
     this.storage = storage;
     segmentMaxSize = 
RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
     this.cache = new SegmentedRaftLogCache(selfId, storage, properties);
-    this.fileLogWorker = new SegmentedRaftLogWorker(selfId, stateMachine, 
submitUpdateCommitEvent, storage, properties);
+    this.fileLogWorker = new SegmentedRaftLogWorker(selfId, stateMachine,
+        submitUpdateCommitEvent, server, storage, properties);
     stateMachineCachingEnabled = 
RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(properties);
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 78e7191..a29a727 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -27,6 +27,7 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.TimeoutIOException;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.metrics.RatisMetrics;
 import org.apache.ratis.server.storage.RaftStorage;
@@ -119,11 +120,12 @@ class SegmentedRaftLogWorker implements Runnable {
   private final long segmentMaxSize;
   private final long preallocatedSize;
   private final int bufferSize;
+  private final RaftServerImpl server;
 
   private final StateMachineDataPolicy stateMachineDataPolicy;
 
   SegmentedRaftLogWorker(RaftPeerId selfId, StateMachine stateMachine, 
Runnable submitUpdateCommitEvent,
-      RaftStorage storage, RaftProperties properties) {
+                         RaftServerImpl server, RaftStorage storage, 
RaftProperties properties) {
     this.name = selfId + "-" + getClass().getSimpleName() + ":" + 
storage.getStorageDir();
     LOG.info("new {} for {}", name, storage);
 
@@ -131,6 +133,7 @@ class SegmentedRaftLogWorker implements Runnable {
     this.stateMachine = stateMachine;
     this.metricRegistry = 
RatisMetrics.createMetricRegistryForLogWorker(selfId.toString());
     this.storage = storage;
+    this.server = server;
     final SizeInBytes queueByteLimit = 
RaftServerConfigKeys.Log.queueByteLimit(properties);
     final int queueElementLimit = 
RaftServerConfigKeys.Log.queueElementLimit(properties);
     this.queue =
@@ -214,7 +217,10 @@ class SegmentedRaftLogWorker implements Runnable {
         LOG.info("Got InterruptedException when adding task " + task
             + ". The SegmentedRaftLogWorker already stopped.");
       } else {
-        ExitUtils.terminate(2, "Failed to add IO task " + task, t, LOG);
+        LOG.error("Failed to add IO task {}", task, t);
+        if (server != null) {
+          server.shutdown(false);
+        }
       }
     }
     return task;
@@ -238,6 +244,7 @@ class SegmentedRaftLogWorker implements Runnable {
                   + " which is smaller than the lastWrittenIndex."
                   + " There should be a snapshot installed.", e);
             } else {
+              task.getFuture().completeExceptionally(e);
               throw e;
             }
           }
@@ -258,11 +265,11 @@ class SegmentedRaftLogWorker implements Runnable {
           LOG.info("{} got closed and hit exception",
               Thread.currentThread().getName(), t);
         } else {
-          // TODO avoid terminating the jvm, we should
-          // 1) support multiple log directories
-          // 2) only shutdown the raft server impl
-          ExitUtils.terminate(1, Thread.currentThread().getName() + " failed.",
-              t, LOG);
+          LOG.error("{} hit exception", Thread.currentThread().getName(), t);
+          // Shutdown raft group instead of terminating jvm.
+          if (server != null) {
+            server.shutdown(false);
+          }
         }
       }
     }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 07b4f2c..dcbe013 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -36,7 +36,6 @@ import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
-import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
@@ -54,14 +53,17 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class TestSegmentedRaftLog extends BaseTest {
@@ -509,14 +511,13 @@ public class TestSegmentedRaftLog extends BaseTest {
     }
   }
 
-  @Test
-  public void testSegmentedRaftLogStateMachineDataTimeoutIOException() throws 
Exception {
+  @Test(expected = TimeoutIOException.class)
+  public void testServerShutdownOnTimeoutIOException() throws Throwable {
     RaftServerConfigKeys.Log.StateMachineData.setSync(properties, true);
     final TimeDuration syncTimeout = TimeDuration.valueOf(100, 
TimeUnit.MILLISECONDS);
     RaftServerConfigKeys.Log.StateMachineData.setSyncTimeout(properties, 
syncTimeout);
     final int numRetries = 2;
     RaftServerConfigKeys.Log.StateMachineData.setSyncTimeoutRetry(properties, 
numRetries);
-    ExitUtils.disableSystemExit();
 
     final LogEntryProto entry = prepareLogEntry(0, 0, null, true);
     final StateMachine sm = new BaseStateMachine() {
@@ -526,17 +527,23 @@ public class TestSegmentedRaftLog extends BaseTest {
       }
     };
 
-    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, sm, 
null, storage, -1, properties)) {
+    RaftServerImpl server = mock(RaftServerImpl.class);
+    doNothing().when(server).shutdown(false);
+    Throwable ex = null; // TimeoutIOException
+    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, server, sm, 
null, storage, -1, properties)) {
       raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
-      raftLog.appendEntry(entry);  // SegmentedRaftLogWorker should catch 
TimeoutIOException
-
-      JavaUtils.attempt(() -> {
-        final ExitUtils.ExitException exitException = 
ExitUtils.getFirstExitException();
-        Objects.requireNonNull(exitException, "exitException == null");
-        Assert.assertEquals(TimeoutIOException.class, 
exitException.getCause().getClass());
-      }, 3*numRetries, syncTimeout, "SegmentedRaftLogWorker should catch 
TimeoutIOException and exit", LOG);
-      ExitUtils.clear();
+      // SegmentedRaftLogWorker should catch TimeoutIOException
+      CompletableFuture<Long> f = raftLog.appendEntry(entry);
+      // Wait for async writeStateMachineData to finish
+      try {
+        f.get();
+      } catch (ExecutionException e) {
+        ex = e.getCause();
+      }
     }
+    Assert.assertNotNull(ex);
+    verify(server, times(1)).shutdown(false);
+    throw ex;
   }
 
   static Thread startAppendEntryThread(RaftLog raftLog, LogEntryProto entry) {

Reply via email to