Repository: incubator-ratis Updated Branches: refs/heads/master 5016d5d7a -> cf1e6a3c8
RATIS-239. LogAppender should reconnect for some IOException. Contributed by Tsz Wo Nicholas Sze. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/cf1e6a3c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/cf1e6a3c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/cf1e6a3c Branch: refs/heads/master Commit: cf1e6a3c8732cbaaf2d5b55a95c88011147bb127 Parents: 5016d5d Author: Mukul Kumar Singh <msi...@apache.org> Authored: Thu May 31 20:33:12 2018 +0530 Committer: Mukul Kumar Singh <msi...@apache.org> Committed: Thu May 31 20:33:12 2018 +0530 ---------------------------------------------------------------------- .../org/apache/ratis/client/RaftClientRpc.java | 2 +- .../client/impl/RaftClientRpcWithProxy.java | 13 +---- .../java/org/apache/ratis/util/IOUtils.java | 8 +++ .../java/org/apache/ratis/util/LifeCycle.java | 1 - .../org/apache/ratis/util/PeerProxyMap.java | 6 ++ .../ratis/grpc/server/GRpcLogAppender.java | 10 +--- .../org/apache/ratis/server/RaftServerRpc.java | 4 ++ .../apache/ratis/server/impl/LeaderState.java | 12 ++-- .../apache/ratis/server/impl/LogAppender.java | 58 +++++++++++++++----- .../ratis/server/impl/RaftServerImpl.java | 2 +- .../ratis/server/impl/RaftServerProxy.java | 8 ++- .../server/impl/RaftServerRpcWithProxy.java | 5 ++ .../ratis/server/impl/StateMachineUpdater.java | 4 -- .../java/org/apache/ratis/MiniRaftCluster.java | 27 ++++++--- .../java/org/apache/ratis/RaftBasicTests.java | 3 +- .../java/org/apache/ratis/RetryCacheTests.java | 4 +- .../impl/RaftReconfigurationBaseTest.java | 17 +++--- .../server/simulation/SimulatedClientRpc.java | 2 +- .../server/simulation/SimulatedServerRpc.java | 5 ++ 19 files changed, 124 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java index c6c2d61..505ae7e 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java @@ -40,5 +40,5 @@ public interface RaftClientRpc extends Closeable { void addServers(Iterable<RaftPeer> servers); /** Handle the given exception. For example, try reconnecting. */ - void handleException(RaftPeerId serverId, Exception e, boolean shouldClose); + void handleException(RaftPeerId serverId, Exception e, boolean reconnect); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java index 6f4f37f..15fa061 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java @@ -21,13 +21,8 @@ import org.apache.ratis.client.RaftClientRpc; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.util.PeerProxyMap; -import org.apache.ratis.util.ReflectionUtils; import java.io.Closeable; -import java.io.EOFException; -import java.net.SocketException; -import java.net.SocketTimeoutException; -import java.nio.channels.ClosedChannelException; /** An abstract {@link RaftClientRpc} implementation using {@link PeerProxyMap}. */ public abstract class RaftClientRpcWithProxy<PROXY extends Closeable> @@ -48,12 +43,8 @@ public abstract class RaftClientRpcWithProxy<PROXY extends Closeable> } @Override - public void handleException(RaftPeerId serverId, Exception e, boolean shouldClose) { - if (shouldClose || ReflectionUtils.isInstance(e, - SocketException.class, SocketTimeoutException.class, - ClosedChannelException.class, EOFException.class)) { - proxies.resetProxy(serverId); - } + public void handleException(RaftPeerId serverId, Exception e, boolean reconnect) { + getProxies().handleException(serverId, e, reconnect); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java index 4976be8..915f4a2 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java @@ -25,7 +25,10 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; +import java.net.SocketException; +import java.net.SocketTimeoutException; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.concurrent.ExecutionException; @@ -53,6 +56,11 @@ public interface IOUtils { return cause != null? asIOException(cause): new IOException(e); } + static boolean shouldReconnect(Exception e) { + return ReflectionUtils.isInstance(e, + SocketException.class, SocketTimeoutException.class, ClosedChannelException.class, EOFException.class); + } + static void readFully(InputStream in, int buffSize) throws IOException { final byte buf[] = new byte[buffSize]; for(int bytesRead = in.read(buf); bytesRead >= 0; ) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java index c6ace9e..93af1ef 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java @@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory; * | | | V V | * ------- -------> EXCEPTION ----- * </pre> - * Note that there is no transition from PAUSING to CLOSING. */ public class LifeCycle { public static final Logger LOG = LoggerFactory.getLogger(LifeCycle.class); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java index 8b52a7e..53a2936 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java @@ -121,6 +121,12 @@ public class PeerProxyMap<PROXY extends Closeable> implements Closeable { } } + public void handleException(RaftPeerId serverId, Exception e, boolean reconnect) { + if (reconnect || IOUtils.shouldReconnect(e)) { + resetProxy(serverId); + } + } + public PROXY createProxyImpl(RaftPeer peer) throws IOException { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java index 9a2163c..7d144e9 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java @@ -81,7 +81,7 @@ public class GRpcLogAppender extends LogAppender { } @Override - public void run() { + protected void runAppenderImpl() throws IOException { for(; isAppenderRunning(); mayWait()) { if (shouldSendRequest()) { SnapshotInfo snapshot = shouldInstallSnapshot(); @@ -89,13 +89,7 @@ public class GRpcLogAppender extends LogAppender { installSnapshot(snapshot); } else if (!shouldWait()) { // keep appending log entries or sending heartbeats - try { - appendLog(); - } catch (IOException e) { - LOG.error(this + " hit IOException while loading raft log", e); - stopSender(); - return; - } + appendLog(); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java index 77958c0..33db36d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.server; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.server.protocol.RaftServerProtocol; @@ -57,4 +58,7 @@ public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get, Closeabl /** add information of the given peers */ void addPeers(Iterable<RaftPeer> peers); + + /** Handle the given exception. For example, try reconnecting. */ + void handleException(RaftPeerId serverId, Exception e, boolean reconnect); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index 242cd7c..32b787f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -166,21 +166,17 @@ public class LeaderState { raftLog.append(placeHolder); processor.start(); - startSenders(); + senders.forEach(LogAppender::startAppender); } boolean isReady() { return server.getState().getLastAppliedIndex() >= placeHolderIndex; } - private void startSenders() { - senders.forEach(Thread::start); - } - void stop() { this.running = false; // do not interrupt event processor since it may be in the middle of logSync - senders.forEach(sender -> sender.stopSender().interrupt()); + senders.forEach(LogAppender::stopAppender); try { pendingRequests.sendNotLeaderResponses(); } catch (IOException e) { @@ -284,14 +280,14 @@ public class LeaderState { senders.addAll(newMembers.stream().map(peer -> { LogAppender sender = server.newLogAppender(this, peer, t, nextIndex, false); - sender.start(); + sender.startAppender(); return sender; }).collect(Collectors.toList())); } void stopAndRemoveSenders(Predicate<LogAppender> predicate) { final List<LogAppender> toStop = senders.stream().filter(predicate).collect(Collectors.toList()); - toStop.forEach(s -> s.stopSender().interrupt()); + toStop.forEach(LogAppender::stopAppender); senders.removeAll(toStop); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index 3e9acfa..1db4c69 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -41,11 +41,17 @@ import java.util.*; import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID; import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX; +import static org.apache.ratis.util.LifeCycle.State.CLOSED; +import static org.apache.ratis.util.LifeCycle.State.CLOSING; +import static org.apache.ratis.util.LifeCycle.State.EXCEPTION; +import static org.apache.ratis.util.LifeCycle.State.NEW; +import static org.apache.ratis.util.LifeCycle.State.RUNNING; +import static org.apache.ratis.util.LifeCycle.State.STARTING; /** * A daemon thread appending log entries to a follower peer. */ -public class LogAppender extends Daemon { +public class LogAppender { public static final Logger LOG = LoggerFactory.getLogger(LogAppender.class); protected final RaftServerImpl server; @@ -58,7 +64,8 @@ public class LogAppender extends Daemon { private final int snapshotChunkMaxSize; protected final long halfMinTimeoutMs; - private volatile boolean sending = true; + private final LifeCycle lifeCycle; + private final Daemon daemon = new Daemon(this::runAppender); public LogAppender(RaftServerImpl server, LeaderState leaderState, FollowerInfo f) { this.follower = f; @@ -73,6 +80,7 @@ public class LogAppender extends Daemon { this.halfMinTimeoutMs = server.getMinTimeoutMs() / 2; this.buffer = new LogEntryBuffer(); + this.lifeCycle = new LifeCycle(this); } @Override @@ -81,24 +89,37 @@ public class LogAppender extends Daemon { follower.getPeer().getId() + ")"; } - @Override - public void run() { + public void startAppender() { + lifeCycle.transition(STARTING); + daemon.start(); + } + + private void runAppender() { + lifeCycle.transition(RUNNING); try { - checkAndSendAppendEntries(); + runAppenderImpl(); } catch (InterruptedException | InterruptedIOException e) { LOG.info(this + " was interrupted: " + e); - } catch (RaftLogIOException e) { + } catch (IOException e) { LOG.error(this + " hit IOException while loading raft log", e); + lifeCycle.transition(EXCEPTION); + } catch (Throwable e) { + LOG.error(this + " unexpected exception", e); + lifeCycle.transition(EXCEPTION); + } finally { + if (!lifeCycle.compareAndTransition(CLOSING, CLOSED)) { + lifeCycle.transition(EXCEPTION); + } } } protected boolean isAppenderRunning() { - return sending; + return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED, EXCEPTION); } - public LogAppender stopSender() { - this.sending = false; - return this; + public void stopAppender() { + lifeCycle.transition(CLOSING); + daemon.interrupt(); } public FollowerInfo getFollower() { @@ -219,7 +240,10 @@ public class LogAppender extends Daemon { throw e; } catch (IOException ioe) { // TODO should have more detailed retry policy here. - LOG.trace(this + ": failed to send appendEntries; retry " + retry++, ioe); + if (retry % 10 == 1) { // to reduce the number of messages + LOG.warn("{}: Failed to appendEntries (retry={}): {}", this, retry++, ioe); + } + handleException(ioe); } if (isAppenderRunning()) { leaderState.getSyncInterval().sleep(); @@ -358,8 +382,8 @@ public class LogAppender extends Daemon { } catch (InterruptedIOException iioe) { throw iioe; } catch (Exception ioe) { - LOG.warn(this + ": failed to install SnapshotInfo " + snapshot.getFiles(), - ioe); + LOG.warn("{}: Failed to installSnapshot {}: {}", this, snapshot, ioe); + handleException(ioe); return null; } @@ -388,8 +412,7 @@ public class LogAppender extends Daemon { } /** Check and send appendEntries RPC */ - private void checkAndSendAppendEntries() - throws InterruptedException, InterruptedIOException, RaftLogIOException { + protected void runAppenderImpl() throws InterruptedException, IOException { while (isAppenderRunning()) { if (shouldSendRequest()) { SnapshotInfo snapshot = shouldInstallSnapshot(); @@ -455,6 +478,11 @@ public class LogAppender extends Daemon { } } + private void handleException(Exception e) { + LOG.trace("TRACE", e); + server.getServerRpc().handleException(follower.getPeer().getId(), e, false); + } + protected void submitEventOnSuccessAppend() { LeaderState.StateUpdateEvent e = follower.isAttendingVote() ? LeaderState.UPDATE_COMMIT_EVENT : http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 6d1eb96..93666df 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -353,7 +353,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou // add the commit infos of other servers if (isLeader()) { - Optional.of(leaderState).ifPresent( + Optional.ofNullable(leaderState).ifPresent( leader -> leader.updateFollowerCommitInfos(commitInfoCache, infos)); } else { getRaftConf().getPeers().stream() http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index cfb369b..5f1717a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -122,8 +122,12 @@ public class RaftServerProxy implements RaftServer { } public RaftServerImpl getImpl() throws IOException { + final CompletableFuture<RaftServerImpl> i = impl; + if (i == null) { + throw new ServerNotReadyException(getId() + " is not initialized."); + } try { - return impl.get(); + return i.get(); } catch (InterruptedException e) { throw IOUtils.toInterruptedIOException(getId() + ": getImpl interrupted.", e); } catch (ExecutionException e) { @@ -133,12 +137,14 @@ public class RaftServerProxy implements RaftServer { @Override public void start() { + LOG.info("{}: start", getId()); JavaUtils.getAndConsume(impl, RaftServerImpl::start); getServerRpc().start(); } @Override public void close() { + LOG.info("{}: close", getId()); JavaUtils.getAndConsume(impl, RaftServerImpl::shutdown); try { getServerRpc().close(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java index fe41859..aa8e2f7 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java @@ -59,6 +59,11 @@ public abstract class RaftServerRpcWithProxy<PROXY extends Closeable, PROXIES ex } @Override + public void handleException(RaftPeerId serverId, Exception e, boolean reconnect) { + getProxies().handleException(serverId, e, reconnect); + } + + @Override public final void start() { getLifeCycle().startAndTransition(() -> startImpl()); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index 5fffb1f..31ea6d3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -87,10 +87,6 @@ class StateMachineUpdater implements Runnable { state = State.STOP; updater.interrupt(); try { - updater.join(); - } catch (InterruptedException ignored) { - } - try { stateMachine.close(); } catch (IOException ignored) { } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 2643073..2634717 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -39,7 +39,10 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -527,11 +530,11 @@ public abstract class MiniRaftCluster { } public void setConfiguration(RaftPeer... peers) throws IOException { - final RaftServerImpl leader = getLeader(); - final SetConfigurationRequest r = newSetConfigurationRequest( - ClientId.randomId(), leader.getId(), peers); - LOG.info("Start changing the configuration: {}", r); - leader.setConfiguration(r); + try(RaftClient client = createClient()) { + LOG.info("Start changing the configuration: {}", Arrays.asList(peers)); + final RaftClientReply reply = client.setConfiguration(peers); + Preconditions.assertTrue(reply.isSuccess()); + } } public void shutdown() { @@ -540,10 +543,20 @@ public abstract class MiniRaftCluster { LOG.info("*** Stopping " + getClass().getSimpleName()); LOG.info("*** "); LOG.info("************************************************************** "); + LOG.info(printServers()); - timer.cancel(); - getServerAliveStream().map(RaftServerImpl::getProxy).forEach(RaftServerProxy::close); + final ExecutorService executor = Executors.newFixedThreadPool(servers.size()); + try { + getServers().forEach(proxy -> executor.submit(proxy::close)); + // just wait for a few seconds + executor.awaitTermination(5, TimeUnit.SECONDS); + } catch(InterruptedException e) { + LOG.warn("shutdown interrupted", e); + } finally { + executor.shutdownNow(); + } + timer.cancel(); ExitUtils.assertNotTerminated(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index f35878c..207a458 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -176,7 +176,8 @@ public abstract class RaftBasicTests extends BaseTest { for(RaftServerProxy server : cluster.getServers()) { final RaftServerImpl impl = server.getImpl(); if (impl.isAlive() || replication == ReplicationLevel.ALL) { - RaftTestUtil.assertLogEntries(impl, term, messages); + JavaUtils.attempt(() -> RaftTestUtil.assertLogEntries(impl, term, messages), + 5, 1000, impl.getId() + " assertLogEntries", LOG); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java index 474482b..2d352b4 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java @@ -34,6 +34,7 @@ import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.Arrays; import static java.util.Arrays.asList; @@ -142,6 +143,7 @@ public abstract class RetryCacheTests extends BaseTest { // same clientId and callId in the request r = cluster.newRaftClientRequest(client.getId(), newLeaderId, callId, seqNum, new SimpleMessage("message")); + rpc.addServers(Arrays.asList(change.newPeers)); for (int i = 0; i < 10; i++) { try { reply = rpc.sendRequest(r); @@ -150,7 +152,7 @@ public abstract class RetryCacheTests extends BaseTest { Assert.assertEquals(callId, reply.getCallId()); Assert.assertTrue(reply.isSuccess()); } catch (Exception e) { - LOG.info("hit exception while retrying the same request: " + e); + LOG.info("hit exception while retrying the same request: " + r, e); } Thread.sleep(100); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 1b0ef0c..8a340d7 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -340,19 +340,20 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { // originally 3 peers final MiniRaftCluster cluster = getCluster(3); cluster.start(); + final AtomicBoolean clientRunning = new AtomicBoolean(true); + Thread clientThread = null; try { RaftTestUtil.waitForLeader(cluster); final RaftPeerId leaderId = cluster.getLeader().getId(); final RaftClient client = cluster.createClient(leaderId); - PeerChanges c1 = cluster.addNewPeers(2, false); - PeerChanges c2 = cluster.removePeers(2, false, asList(c1.newPeers)); + PeerChanges c1 = cluster.addNewPeers(1, false); + PeerChanges c2 = cluster.removePeers(1, false, asList(c1.newPeers)); LOG.info("Start changing the configuration: {}", asList(c2.allPeersInNewConf)); final AtomicReference<Boolean> success = new AtomicReference<>(); - final AtomicBoolean clientRunning = new AtomicBoolean(true); - Thread clientThread = new Thread(() -> { + clientThread = new Thread(() -> { try { boolean r = false; while (clientRunning.get() && !r) { @@ -378,7 +379,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { LOG.info("kill the current leader"); final String oldLeaderId = RaftTestUtil.waitAndKillLeader(cluster, true); - LOG.info("start the two new peers: {}", Arrays.asList(c1.newPeers)); + LOG.info("start new peers: {}", Arrays.asList(c1.newPeers)); for (RaftPeer np : c1.newPeers) { cluster.restartServer(np.getId(), false); } @@ -388,9 +389,11 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { // will retry the same setConfiguration request waitAndCheckNewConf(cluster, c2.allPeersInNewConf, 2, Collections.singletonList(oldLeaderId)); - clientRunning.set(false); - //Assert.assertTrue(success.get()); } finally { + if (clientThread != null) { + clientRunning.set(false); + clientThread.interrupt(); + } cluster.shutdown(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java index a141789..c9c9f9b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java @@ -36,7 +36,7 @@ class SimulatedClientRpc } @Override - public void handleException(RaftPeerId serverId, Exception e, boolean shouldClose) { + public void handleException(RaftPeerId serverId, Exception e, boolean reconnect) { // do nothing } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cf1e6a3c/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java index fc80f15..1be6c3a 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java @@ -116,6 +116,11 @@ class SimulatedServerRpc implements RaftServerRpc { // do nothing } + @Override + public void handleException(RaftPeerId serverId, Exception e, boolean reconnect) { + // do nothing + } + final RequestHandler.HandlerInterface<RaftServerRequest, RaftServerReply> serverHandlerImpl = new RequestHandler.HandlerInterface<RaftServerRequest, RaftServerReply>() { @Override