Repository: incubator-ratis
Updated Branches:
  refs/heads/master 5016d5d7a -> cf1e6a3c8


RATIS-239. LogAppender should reconnect for some IOException. Contributed by 
Tsz Wo Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/cf1e6a3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/cf1e6a3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/cf1e6a3c

Branch: refs/heads/master
Commit: cf1e6a3c8732cbaaf2d5b55a95c88011147bb127
Parents: 5016d5d
Author: Mukul Kumar Singh <msi...@apache.org>
Authored: Thu May 31 20:33:12 2018 +0530
Committer: Mukul Kumar Singh <msi...@apache.org>
Committed: Thu May 31 20:33:12 2018 +0530

----------------------------------------------------------------------
 .../org/apache/ratis/client/RaftClientRpc.java  |  2 +-
 .../client/impl/RaftClientRpcWithProxy.java     | 13 +----
 .../java/org/apache/ratis/util/IOUtils.java     |  8 +++
 .../java/org/apache/ratis/util/LifeCycle.java   |  1 -
 .../org/apache/ratis/util/PeerProxyMap.java     |  6 ++
 .../ratis/grpc/server/GRpcLogAppender.java      | 10 +---
 .../org/apache/ratis/server/RaftServerRpc.java  |  4 ++
 .../apache/ratis/server/impl/LeaderState.java   | 12 ++--
 .../apache/ratis/server/impl/LogAppender.java   | 58 +++++++++++++++-----
 .../ratis/server/impl/RaftServerImpl.java       |  2 +-
 .../ratis/server/impl/RaftServerProxy.java      |  8 ++-
 .../server/impl/RaftServerRpcWithProxy.java     |  5 ++
 .../ratis/server/impl/StateMachineUpdater.java  |  4 --
 .../java/org/apache/ratis/MiniRaftCluster.java  | 27 ++++++---
 .../java/org/apache/ratis/RaftBasicTests.java   |  3 +-
 .../java/org/apache/ratis/RetryCacheTests.java  |  4 +-
 .../impl/RaftReconfigurationBaseTest.java       | 17 +++---
 .../server/simulation/SimulatedClientRpc.java   |  2 +-
 .../server/simulation/SimulatedServerRpc.java   |  5 ++
 19 files changed, 124 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
index c6c2d61..505ae7e 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
@@ -40,5 +40,5 @@ public interface RaftClientRpc extends Closeable {
   void addServers(Iterable<RaftPeer> servers);
 
   /** Handle the given exception.  For example, try reconnecting. */
-  void handleException(RaftPeerId serverId, Exception e, boolean shouldClose);
+  void handleException(RaftPeerId serverId, Exception e, boolean reconnect);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
index 6f4f37f..15fa061 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
@@ -21,13 +21,8 @@ import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.util.PeerProxyMap;
-import org.apache.ratis.util.ReflectionUtils;
 
 import java.io.Closeable;
-import java.io.EOFException;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.nio.channels.ClosedChannelException;
 
 /** An abstract {@link RaftClientRpc} implementation using {@link 
PeerProxyMap}. */
 public abstract class RaftClientRpcWithProxy<PROXY extends Closeable>
@@ -48,12 +43,8 @@ public abstract class RaftClientRpcWithProxy<PROXY extends 
Closeable>
   }
 
   @Override
-  public void handleException(RaftPeerId serverId, Exception e, boolean 
shouldClose) {
-    if (shouldClose || ReflectionUtils.isInstance(e,
-        SocketException.class, SocketTimeoutException.class,
-        ClosedChannelException.class, EOFException.class)) {
-      proxies.resetProxy(serverId);
-    }
+  public void handleException(RaftPeerId serverId, Exception e, boolean 
reconnect) {
+    getProxies().handleException(serverId, e, reconnect);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
index 4976be8..915f4a2 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
@@ -25,7 +25,10 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InterruptedIOException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.util.concurrent.ExecutionException;
 
@@ -53,6 +56,11 @@ public interface IOUtils {
     return cause != null? asIOException(cause): new IOException(e);
   }
 
+  static boolean shouldReconnect(Exception e) {
+    return ReflectionUtils.isInstance(e,
+        SocketException.class, SocketTimeoutException.class, 
ClosedChannelException.class, EOFException.class);
+  }
+
   static void readFully(InputStream in, int buffSize) throws IOException {
     final byte buf[] = new byte[buffSize];
     for(int bytesRead = in.read(buf); bytesRead >= 0; ) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java 
b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
index c6ace9e..93af1ef 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
@@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory;
  *  |       |    |          V     V       |
  *   -------      -------> EXCEPTION -----
  * </pre>
- * Note that there is no transition from PAUSING to CLOSING.
  */
 public class LifeCycle {
   public static final Logger LOG = LoggerFactory.getLogger(LifeCycle.class);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java 
b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index 8b52a7e..53a2936 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -121,6 +121,12 @@ public class PeerProxyMap<PROXY extends Closeable> 
implements Closeable {
     }
   }
 
+  public void handleException(RaftPeerId serverId, Exception e, boolean 
reconnect) {
+    if (reconnect || IOUtils.shouldReconnect(e)) {
+      resetProxy(serverId);
+    }
+  }
+
   public PROXY createProxyImpl(RaftPeer peer) throws IOException {
     throw new UnsupportedOperationException();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
index 9a2163c..7d144e9 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
@@ -81,7 +81,7 @@ public class GRpcLogAppender extends LogAppender {
   }
 
   @Override
-  public void run() {
+  protected void runAppenderImpl() throws IOException {
     for(; isAppenderRunning(); mayWait()) {
       if (shouldSendRequest()) {
         SnapshotInfo snapshot = shouldInstallSnapshot();
@@ -89,13 +89,7 @@ public class GRpcLogAppender extends LogAppender {
           installSnapshot(snapshot);
         } else if (!shouldWait()) {
           // keep appending log entries or sending heartbeats
-          try {
-            appendLog();
-          } catch (IOException e) {
-            LOG.error(this + " hit IOException while loading raft log", e);
-            stopSender();
-            return;
-          }
+          appendLog();
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
index 77958c0..33db36d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.server;
 
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
@@ -57,4 +58,7 @@ public interface RaftServerRpc extends RaftServerProtocol, 
RpcType.Get, Closeabl
 
   /** add information of the given peers */
   void addPeers(Iterable<RaftPeer> peers);
+
+  /** Handle the given exception.  For example, try reconnecting. */
+  void handleException(RaftPeerId serverId, Exception e, boolean reconnect);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 242cd7c..32b787f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -166,21 +166,17 @@ public class LeaderState {
     raftLog.append(placeHolder);
 
     processor.start();
-    startSenders();
+    senders.forEach(LogAppender::startAppender);
   }
 
   boolean isReady() {
     return server.getState().getLastAppliedIndex() >= placeHolderIndex;
   }
 
-  private void startSenders() {
-    senders.forEach(Thread::start);
-  }
-
   void stop() {
     this.running = false;
     // do not interrupt event processor since it may be in the middle of 
logSync
-    senders.forEach(sender -> sender.stopSender().interrupt());
+    senders.forEach(LogAppender::stopAppender);
     try {
       pendingRequests.sendNotLeaderResponses();
     } catch (IOException e) {
@@ -284,14 +280,14 @@ public class LeaderState {
 
     senders.addAll(newMembers.stream().map(peer -> {
       LogAppender sender = server.newLogAppender(this, peer, t, nextIndex, 
false);
-      sender.start();
+      sender.startAppender();
       return sender;
     }).collect(Collectors.toList()));
   }
 
   void stopAndRemoveSenders(Predicate<LogAppender> predicate) {
     final List<LogAppender> toStop = 
senders.stream().filter(predicate).collect(Collectors.toList());
-    toStop.forEach(s -> s.stopSender().interrupt());
+    toStop.forEach(LogAppender::stopAppender);
     senders.removeAll(toStop);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 3e9acfa..1db4c69 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -41,11 +41,17 @@ import java.util.*;
 
 import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
 import static 
org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
+import static org.apache.ratis.util.LifeCycle.State.CLOSED;
+import static org.apache.ratis.util.LifeCycle.State.CLOSING;
+import static org.apache.ratis.util.LifeCycle.State.EXCEPTION;
+import static org.apache.ratis.util.LifeCycle.State.NEW;
+import static org.apache.ratis.util.LifeCycle.State.RUNNING;
+import static org.apache.ratis.util.LifeCycle.State.STARTING;
 
 /**
  * A daemon thread appending log entries to a follower peer.
  */
-public class LogAppender extends Daemon {
+public class LogAppender {
   public static final Logger LOG = LoggerFactory.getLogger(LogAppender.class);
 
   protected final RaftServerImpl server;
@@ -58,7 +64,8 @@ public class LogAppender extends Daemon {
   private final int snapshotChunkMaxSize;
   protected final long halfMinTimeoutMs;
 
-  private volatile boolean sending = true;
+  private final LifeCycle lifeCycle;
+  private final Daemon daemon = new Daemon(this::runAppender);
 
   public LogAppender(RaftServerImpl server, LeaderState leaderState, 
FollowerInfo f) {
     this.follower = f;
@@ -73,6 +80,7 @@ public class LogAppender extends Daemon {
     this.halfMinTimeoutMs = server.getMinTimeoutMs() / 2;
 
     this.buffer = new LogEntryBuffer();
+    this.lifeCycle = new LifeCycle(this);
   }
 
   @Override
@@ -81,24 +89,37 @@ public class LogAppender extends Daemon {
         follower.getPeer().getId() + ")";
   }
 
-  @Override
-  public void run() {
+  public void startAppender() {
+    lifeCycle.transition(STARTING);
+    daemon.start();
+  }
+
+  private void runAppender() {
+    lifeCycle.transition(RUNNING);
     try {
-      checkAndSendAppendEntries();
+      runAppenderImpl();
     } catch (InterruptedException | InterruptedIOException e) {
       LOG.info(this + " was interrupted: " + e);
-    } catch (RaftLogIOException e) {
+    } catch (IOException e) {
       LOG.error(this + " hit IOException while loading raft log", e);
+      lifeCycle.transition(EXCEPTION);
+    } catch (Throwable e) {
+      LOG.error(this + " unexpected exception", e);
+      lifeCycle.transition(EXCEPTION);
+    } finally {
+      if (!lifeCycle.compareAndTransition(CLOSING, CLOSED)) {
+        lifeCycle.transition(EXCEPTION);
+      }
     }
   }
 
   protected boolean isAppenderRunning() {
-    return sending;
+    return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED, EXCEPTION);
   }
 
-  public LogAppender stopSender() {
-    this.sending = false;
-    return this;
+  public void stopAppender() {
+    lifeCycle.transition(CLOSING);
+    daemon.interrupt();
   }
 
   public FollowerInfo getFollower() {
@@ -219,7 +240,10 @@ public class LogAppender extends Daemon {
         throw e;
       } catch (IOException ioe) {
         // TODO should have more detailed retry policy here.
-        LOG.trace(this + ": failed to send appendEntries; retry " + retry++, 
ioe);
+        if (retry % 10 == 1) { // to reduce the number of messages
+          LOG.warn("{}: Failed to appendEntries (retry={}): {}", this, 
retry++, ioe);
+        }
+        handleException(ioe);
       }
       if (isAppenderRunning()) {
         leaderState.getSyncInterval().sleep();
@@ -358,8 +382,8 @@ public class LogAppender extends Daemon {
     } catch (InterruptedIOException iioe) {
       throw iioe;
     } catch (Exception ioe) {
-      LOG.warn(this + ": failed to install SnapshotInfo " + 
snapshot.getFiles(),
-          ioe);
+      LOG.warn("{}: Failed to installSnapshot {}: {}", this, snapshot, ioe);
+      handleException(ioe);
       return null;
     }
 
@@ -388,8 +412,7 @@ public class LogAppender extends Daemon {
   }
 
   /** Check and send appendEntries RPC */
-  private void checkAndSendAppendEntries()
-      throws InterruptedException, InterruptedIOException, RaftLogIOException {
+  protected void runAppenderImpl() throws InterruptedException, IOException {
     while (isAppenderRunning()) {
       if (shouldSendRequest()) {
         SnapshotInfo snapshot = shouldInstallSnapshot();
@@ -455,6 +478,11 @@ public class LogAppender extends Daemon {
     }
   }
 
+  private void handleException(Exception e) {
+    LOG.trace("TRACE", e);
+    server.getServerRpc().handleException(follower.getPeer().getId(), e, 
false);
+  }
+
   protected void submitEventOnSuccessAppend() {
     LeaderState.StateUpdateEvent e = follower.isAttendingVote() ?
         LeaderState.UPDATE_COMMIT_EVENT :

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 6d1eb96..93666df 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -353,7 +353,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
 
     // add the commit infos of other servers
     if (isLeader()) {
-      Optional.of(leaderState).ifPresent(
+      Optional.ofNullable(leaderState).ifPresent(
           leader -> leader.updateFollowerCommitInfos(commitInfoCache, infos));
     } else {
       getRaftConf().getPeers().stream()

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index cfb369b..5f1717a 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -122,8 +122,12 @@ public class RaftServerProxy implements RaftServer {
   }
 
   public RaftServerImpl getImpl() throws IOException {
+    final CompletableFuture<RaftServerImpl> i = impl;
+    if (i == null) {
+      throw new ServerNotReadyException(getId() + " is not initialized.");
+    }
     try {
-      return impl.get();
+      return i.get();
     } catch (InterruptedException e) {
       throw IOUtils.toInterruptedIOException(getId() + ": getImpl 
interrupted.", e);
     } catch (ExecutionException e) {
@@ -133,12 +137,14 @@ public class RaftServerProxy implements RaftServer {
 
   @Override
   public void start() {
+    LOG.info("{}: start", getId());
     JavaUtils.getAndConsume(impl, RaftServerImpl::start);
     getServerRpc().start();
   }
 
   @Override
   public void close() {
+    LOG.info("{}: close", getId());
     JavaUtils.getAndConsume(impl, RaftServerImpl::shutdown);
     try {
       getServerRpc().close();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
index fe41859..aa8e2f7 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
@@ -59,6 +59,11 @@ public abstract class RaftServerRpcWithProxy<PROXY extends 
Closeable, PROXIES ex
   }
 
   @Override
+  public void handleException(RaftPeerId serverId, Exception e, boolean 
reconnect) {
+    getProxies().handleException(serverId, e, reconnect);
+  }
+
+  @Override
   public final void start() {
     getLifeCycle().startAndTransition(() -> startImpl());
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 5fffb1f..31ea6d3 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -87,10 +87,6 @@ class StateMachineUpdater implements Runnable {
     state = State.STOP;
     updater.interrupt();
     try {
-      updater.join();
-    } catch (InterruptedException ignored) {
-    }
-    try {
       stateMachine.close();
     } catch (IOException ignored) {
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 2643073..2634717 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -39,7 +39,10 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -527,11 +530,11 @@ public abstract class MiniRaftCluster {
   }
 
   public void setConfiguration(RaftPeer... peers) throws IOException {
-    final RaftServerImpl leader = getLeader();
-    final SetConfigurationRequest r = newSetConfigurationRequest(
-        ClientId.randomId(), leader.getId(), peers);
-    LOG.info("Start changing the configuration: {}", r);
-    leader.setConfiguration(r);
+    try(RaftClient client = createClient()) {
+      LOG.info("Start changing the configuration: {}", Arrays.asList(peers));
+      final RaftClientReply reply = client.setConfiguration(peers);
+      Preconditions.assertTrue(reply.isSuccess());
+    }
   }
 
   public void shutdown() {
@@ -540,10 +543,20 @@ public abstract class MiniRaftCluster {
     LOG.info("***     Stopping " + getClass().getSimpleName());
     LOG.info("*** ");
     LOG.info("************************************************************** 
");
+    LOG.info(printServers());
 
-    timer.cancel();
-    
getServerAliveStream().map(RaftServerImpl::getProxy).forEach(RaftServerProxy::close);
+    final ExecutorService executor = 
Executors.newFixedThreadPool(servers.size());
+    try {
+      getServers().forEach(proxy -> executor.submit(proxy::close));
+      // just wait for a few seconds
+      executor.awaitTermination(5, TimeUnit.SECONDS);
+    } catch(InterruptedException e) {
+      LOG.warn("shutdown interrupted", e);
+    } finally {
+      executor.shutdownNow();
+    }
 
+    timer.cancel();
     ExitUtils.assertNotTerminated();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index f35878c..207a458 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -176,7 +176,8 @@ public abstract class RaftBasicTests extends BaseTest {
     for(RaftServerProxy server : cluster.getServers()) {
       final RaftServerImpl impl = server.getImpl();
       if (impl.isAlive() || replication == ReplicationLevel.ALL) {
-        RaftTestUtil.assertLogEntries(impl, term, messages);
+        JavaUtils.attempt(() -> RaftTestUtil.assertLogEntries(impl, term, 
messages),
+            5, 1000, impl.getId() + " assertLogEntries", LOG);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index 474482b..2d352b4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -34,6 +34,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 import static java.util.Arrays.asList;
 
@@ -142,6 +143,7 @@ public abstract class RetryCacheTests extends BaseTest {
     // same clientId and callId in the request
     r = cluster.newRaftClientRequest(client.getId(), newLeaderId,
         callId, seqNum, new SimpleMessage("message"));
+    rpc.addServers(Arrays.asList(change.newPeers));
     for (int i = 0; i < 10; i++) {
       try {
         reply = rpc.sendRequest(r);
@@ -150,7 +152,7 @@ public abstract class RetryCacheTests extends BaseTest {
         Assert.assertEquals(callId, reply.getCallId());
         Assert.assertTrue(reply.isSuccess());
       } catch (Exception e) {
-        LOG.info("hit exception while retrying the same request: " + e);
+        LOG.info("hit exception while retrying the same request: " + r, e);
       }
       Thread.sleep(100);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 1b0ef0c..8a340d7 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -340,19 +340,20 @@ public abstract class RaftReconfigurationBaseTest extends 
BaseTest {
     // originally 3 peers
     final MiniRaftCluster cluster = getCluster(3);
     cluster.start();
+    final AtomicBoolean clientRunning = new AtomicBoolean(true);
+    Thread clientThread = null;
     try {
       RaftTestUtil.waitForLeader(cluster);
       final RaftPeerId leaderId = cluster.getLeader().getId();
       final RaftClient client = cluster.createClient(leaderId);
 
-      PeerChanges c1 = cluster.addNewPeers(2, false);
-      PeerChanges c2 = cluster.removePeers(2, false, asList(c1.newPeers));
+      PeerChanges c1 = cluster.addNewPeers(1, false);
+      PeerChanges c2 = cluster.removePeers(1, false, asList(c1.newPeers));
 
       LOG.info("Start changing the configuration: {}",
           asList(c2.allPeersInNewConf));
       final AtomicReference<Boolean> success = new AtomicReference<>();
-      final AtomicBoolean clientRunning = new AtomicBoolean(true);
-      Thread clientThread = new Thread(() -> {
+      clientThread = new Thread(() -> {
         try {
           boolean r = false;
           while (clientRunning.get() && !r) {
@@ -378,7 +379,7 @@ public abstract class RaftReconfigurationBaseTest extends 
BaseTest {
 
       LOG.info("kill the current leader");
       final String oldLeaderId = RaftTestUtil.waitAndKillLeader(cluster, true);
-      LOG.info("start the two new peers: {}", Arrays.asList(c1.newPeers));
+      LOG.info("start new peers: {}", Arrays.asList(c1.newPeers));
       for (RaftPeer np : c1.newPeers) {
         cluster.restartServer(np.getId(), false);
       }
@@ -388,9 +389,11 @@ public abstract class RaftReconfigurationBaseTest extends 
BaseTest {
       // will retry the same setConfiguration request
       waitAndCheckNewConf(cluster, c2.allPeersInNewConf, 2,
           Collections.singletonList(oldLeaderId));
-      clientRunning.set(false);
-      //Assert.assertTrue(success.get());
     } finally {
+      if (clientThread != null) {
+        clientRunning.set(false);
+        clientThread.interrupt();
+      }
       cluster.shutdown();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
index a141789..c9c9f9b 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
@@ -36,7 +36,7 @@ class SimulatedClientRpc
   }
 
   @Override
-  public void handleException(RaftPeerId serverId, Exception e, boolean 
shouldClose) {
+  public void handleException(RaftPeerId serverId, Exception e, boolean 
reconnect) {
     // do nothing
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index fc80f15..1be6c3a 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -116,6 +116,11 @@ class SimulatedServerRpc implements RaftServerRpc {
     // do nothing
   }
 
+  @Override
+  public void handleException(RaftPeerId serverId, Exception e, boolean 
reconnect) {
+    // do nothing
+  }
+
   final RequestHandler.HandlerInterface<RaftServerRequest, RaftServerReply> 
serverHandlerImpl
       = new RequestHandler.HandlerInterface<RaftServerRequest, 
RaftServerReply>() {
     @Override

Reply via email to