Repository: incubator-ratis Updated Branches: refs/heads/master 341456e04 -> e6419972c
RATIS-85. TestNotLeaderExceptionWithHadoopRpc and TestRaftReconfigurationWithHadoopRpc fail intermittently. Contributed by Jing Zhao Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/e6419972 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/e6419972 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/e6419972 Branch: refs/heads/master Commit: e6419972c8c2119f7ce48ad80cd09918d51e5725 Parents: 341456e Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Fri May 12 16:41:06 2017 -0700 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Fri May 12 16:41:06 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/ratis/util/FileUtils.java | 1 + .../java/org/apache/ratis/util/ProtoUtils.java | 4 +-- .../org/apache/ratis/grpc/TestRaftWithGrpc.java | 2 ++ .../ratis/server/impl/RaftServerImpl.java | 6 ++-- .../ratis/server/impl/ServerProtoUtils.java | 2 +- .../apache/ratis/server/impl/ServerState.java | 2 ++ .../apache/ratis/server/storage/RaftLog.java | 3 ++ .../ratis/server/storage/RaftLogCache.java | 2 +- .../ratis/server/storage/RaftLogWorker.java | 35 ++++++++++++++++---- .../ratis/server/storage/SegmentedRaftLog.java | 6 ++-- .../java/org/apache/ratis/RaftTestUtil.java | 7 ++-- 11 files changed, 50 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java index 732e4c8..ea6e41d 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java @@ -74,6 +74,7 @@ public class FileUtils { } final boolean wasDeleted = f.delete(); if (wasDeleted) { + LOG.debug("Deleted file or dir {}", f.getAbsolutePath()); return true; } final boolean ex = f.exists(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java index ea74d09..d694d7e 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java @@ -131,12 +131,12 @@ public class ProtoUtils { } public static String toString(RaftRpcRequestProto proto) { - return proto.getRequestorId() + "->" + proto.getReplyId() + return proto.getRequestorId().toStringUtf8() + "->" + proto.getReplyId().toStringUtf8() + "#" + proto.getCallId(); } public static String toString(RaftRpcReplyProto proto) { - return proto.getRequestorId() + "<-" + proto.getReplyId() + return proto.getRequestorId().toStringUtf8() + "<-" + proto.getReplyId().toStringUtf8() + "#" + proto.getCallId() + ":" + (proto.getSuccess()? "OK": "FAIL"); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java index d7b72c2..76a64b3 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java @@ -21,6 +21,7 @@ import org.apache.log4j.Level; import org.apache.ratis.RaftBasicTests; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.LogUtils; import org.junit.Assert; import org.junit.Test; @@ -30,6 +31,7 @@ import java.io.IOException; public class TestRaftWithGrpc extends RaftBasicTests { static { LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + LogUtils.setLogLevel(FileUtils.LOG, Level.DEBUG); } private final MiniRaftClusterWithGRpc cluster; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 4ba7a10..ccc8c72 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -735,8 +735,10 @@ public class RaftServerImpl implements RaftServerProtocol, } private boolean containPrevious(TermIndex previous) { - LOG.trace("{}: prev:{}, latestSnapshot:{}, latestInstalledSnapshot:{}", - getId(), previous, state.getLatestSnapshot(), state.getLatestInstalledSnapshot()); + if (LOG.isTraceEnabled()) { + LOG.trace("{}: prev:{}, latestSnapshot:{}, latestInstalledSnapshot:{}", + getId(), previous, state.getLatestSnapshot(), state.getLatestInstalledSnapshot()); + } return state.getLog().contains(previous) || (state.getLatestSnapshot() != null && state.getLatestSnapshot().getTermIndex().equals(previous)) http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index 6705e91..ffd4378 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -76,7 +76,7 @@ public class ServerProtoUtils { private static String toString(RaftRpcReplyProto reply) { return reply.getRequestorId().toStringUtf8() + "->" - + reply.getReplyId().toString() + "," + reply.getSuccess(); + + reply.getReplyId().toStringUtf8() + "," + reply.getSuccess(); } public static RaftConfigurationProto toRaftConfigurationProto( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index da1aa3c..d595691 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -311,6 +311,8 @@ public class ServerState implements Closeable { stateMachineUpdater.stop(); RaftServerImpl.LOG.info("{} closes. The last applied log index is {}", getSelfId(), getLastAppliedIndex()); + + log.close(); storage.close(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index 4d84a57..8b60f2d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -262,6 +262,9 @@ public abstract class RaftLog implements Closeable { @Override public String toString() { + if (!isOpen) { + return "Closed log"; + } TermIndex last = getLastEntryTermIndex(); return last == null ? "null" : Collections.singletonList(last).toString(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java index 5863f8d..2ae0660 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java @@ -77,7 +77,7 @@ class RaftLogCache { } } - private LogSegment openSegment; + private volatile LogSegment openSegment; private final List<LogSegment> closedSegments; private final RaftStorage storage; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java index ee21d8b..1dc8ae1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java @@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory; * raft peer. */ class RaftLogWorker implements Runnable { - static final Logger LOG = LoggerFactory.getLogger(RaftLogWorker.class); + static final Logger LOG = RaftServerImpl.LOG; /** * The task queue accessed by rpc handler threads and the io worker thread. */ @@ -53,7 +53,7 @@ class RaftLogWorker implements Runnable { private final Thread workerThread; private final RaftStorage storage; - private LogOutputStream out; + private volatile LogOutputStream out; private final RaftServerImpl raftServer; /** @@ -94,9 +94,11 @@ class RaftLogWorker implements Runnable { this.running = false; workerThread.interrupt(); try { - workerThread.join(); + workerThread.join(3000); } catch (InterruptedException ignored) { } + IOUtils.cleanup(LOG, out); + LOG.info("{} closes.", this.toString()); } /** @@ -120,7 +122,8 @@ class RaftLogWorker implements Runnable { * This is protected by the RaftServer and RaftLog's lock. */ private Task addIOTask(Task task) { - LOG.debug("add task {}", task); + LOG.debug("{} adds IO task {}", + raftServer != null ? raftServer.getId() : "", task); try { if (!queue.offer(task, 1, TimeUnit.SECONDS)) { Preconditions.assertTrue(isAlive(), @@ -162,12 +165,26 @@ class RaftLogWorker implements Runnable { task.done(); } } catch (InterruptedException e) { + if (running) { + LOG.warn("{} got interrupted while still running", + Thread.currentThread().getName()); + } LOG.info(Thread.currentThread().getName() + " was interrupted, exiting. There are " + queue.size() + " tasks remaining in the queue."); + Thread.currentThread().interrupt(); + return; } catch (Throwable t) { - // TODO avoid terminating the jvm by supporting multiple log directories - ExitUtils.terminate(1, Thread.currentThread().getName() + " failed.", t, LOG); + if (!running) { + LOG.info("{} got closed and hit exception", + Thread.currentThread().getName(), t); + } else { + // TODO avoid terminating the jvm, we should + // 1) support multiple log directories + // 2) only shutdown the raft server impl + ExitUtils.terminate(1, Thread.currentThread().getName() + " failed.", + t, LOG); + } } } } @@ -252,12 +269,14 @@ class RaftLogWorker implements Runnable { @Override public void execute() throws IOException { - IOUtils.cleanup(null, out); + IOUtils.cleanup(LOG, out); out = null; Preconditions.assertTrue(segmentToClose != null); File openFile = storage.getStorageDir() .getOpenLogFile(segmentToClose.getStartIndex()); + LOG.info("{} finalizing log segment {}", RaftLogWorker.this.toString(), + openFile.getAbsolutePath()); Preconditions.assertTrue(openFile.exists(), "File %s does not exist.", openFile); if (segmentToClose.numOfEntries() > 0) { @@ -289,6 +308,8 @@ class RaftLogWorker implements Runnable { @Override void execute() throws IOException { File openFile = storage.getStorageDir().getOpenLogFile(newStartIndex); + LOG.info("{} creating new log segment {}", RaftLogWorker.this.toString(), + openFile.getAbsolutePath()); Preconditions.assertTrue(!openFile.exists(), "open file %s exists for %s", openFile.getAbsolutePath(), RaftLogWorker.this.toString()); Preconditions.assertTrue(out == null && pendingFlushNum == 0); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index 0c54af7..d82550b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -26,7 +26,6 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.LogSegment.LogRecord; import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry; import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex; -import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.CodeInjectionForTesting; @@ -366,7 +365,10 @@ public class SegmentedRaftLog extends RaftLog { @Override public void close() throws IOException { - super.close(); + try(AutoCloseableLock writeLock = writeLock()) { + super.close(); + cache.clear(); + } fileLogWorker.close(); storage.close(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index b83e1f2..e9c6d65 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -70,11 +70,8 @@ public class RaftTestUtil { LOG.info(cluster.printServers()); for(int i = 0; !cluster.tryEnforceLeader(leaderId) && i < 10; i++) { RaftServerImpl currLeader = cluster.getLeader(); - if (LOG.isDebugEnabled()) { - LOG.debug("try enforcing leader to " + leaderId + " but " - + (currLeader == null? "no leader for this round" - : "new leader is " + currLeader.getId())); - } + LOG.info("try enforcing leader to " + leaderId + " but " + + (currLeader == null ? "no leader for this round" : "new leader is " + currLeader.getId())); } LOG.info(cluster.printServers());
