Repository: incubator-ratis
Updated Branches:
  refs/heads/master 341456e04 -> e6419972c


RATIS-85. TestNotLeaderExceptionWithHadoopRpc and 
TestRaftReconfigurationWithHadoopRpc fail intermittently.  Contributed by Jing 
Zhao


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/e6419972
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/e6419972
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/e6419972

Branch: refs/heads/master
Commit: e6419972c8c2119f7ce48ad80cd09918d51e5725
Parents: 341456e
Author: Tsz-Wo Nicholas Sze <[email protected]>
Authored: Fri May 12 16:41:06 2017 -0700
Committer: Tsz-Wo Nicholas Sze <[email protected]>
Committed: Fri May 12 16:41:06 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/ratis/util/FileUtils.java   |  1 +
 .../java/org/apache/ratis/util/ProtoUtils.java  |  4 +--
 .../org/apache/ratis/grpc/TestRaftWithGrpc.java |  2 ++
 .../ratis/server/impl/RaftServerImpl.java       |  6 ++--
 .../ratis/server/impl/ServerProtoUtils.java     |  2 +-
 .../apache/ratis/server/impl/ServerState.java   |  2 ++
 .../apache/ratis/server/storage/RaftLog.java    |  3 ++
 .../ratis/server/storage/RaftLogCache.java      |  2 +-
 .../ratis/server/storage/RaftLogWorker.java     | 35 ++++++++++++++++----
 .../ratis/server/storage/SegmentedRaftLog.java  |  6 ++--
 .../java/org/apache/ratis/RaftTestUtil.java     |  7 ++--
 11 files changed, 50 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
index 732e4c8..ea6e41d 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
@@ -74,6 +74,7 @@ public class FileUtils {
     }
     final boolean wasDeleted = f.delete();
     if (wasDeleted) {
+      LOG.debug("Deleted file or dir {}", f.getAbsolutePath());
       return true;
     }
     final boolean ex = f.exists();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index ea74d09..d694d7e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -131,12 +131,12 @@ public class ProtoUtils {
   }
 
   public static String toString(RaftRpcRequestProto proto) {
-    return proto.getRequestorId() + "->" + proto.getReplyId()
+    return proto.getRequestorId().toStringUtf8() + "->" + 
proto.getReplyId().toStringUtf8()
         + "#" + proto.getCallId();
   }
 
   public static String toString(RaftRpcReplyProto proto) {
-    return proto.getRequestorId() + "<-" + proto.getReplyId()
+    return proto.getRequestorId().toStringUtf8() + "<-" + 
proto.getReplyId().toStringUtf8()
         + "#" + proto.getCallId() + ":"
         + (proto.getSuccess()? "OK": "FAIL");
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index d7b72c2..76a64b3 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -21,6 +21,7 @@ import org.apache.log4j.Level;
 import org.apache.ratis.RaftBasicTests;
 import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
 import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.LogUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -30,6 +31,7 @@ import java.io.IOException;
 public class TestRaftWithGrpc extends RaftBasicTests {
   static {
     LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    LogUtils.setLogLevel(FileUtils.LOG, Level.DEBUG);
   }
 
   private final MiniRaftClusterWithGRpc cluster;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 4ba7a10..ccc8c72 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -735,8 +735,10 @@ public class RaftServerImpl implements RaftServerProtocol,
   }
 
   private boolean containPrevious(TermIndex previous) {
-    LOG.trace("{}: prev:{}, latestSnapshot:{}, latestInstalledSnapshot:{}",
-        getId(), previous, state.getLatestSnapshot(), 
state.getLatestInstalledSnapshot());
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("{}: prev:{}, latestSnapshot:{}, latestInstalledSnapshot:{}",
+          getId(), previous, state.getLatestSnapshot(), 
state.getLatestInstalledSnapshot());
+    }
     return state.getLog().contains(previous)
         ||  (state.getLatestSnapshot() != null
              && state.getLatestSnapshot().getTermIndex().equals(previous))

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 6705e91..ffd4378 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -76,7 +76,7 @@ public class ServerProtoUtils {
 
   private static String toString(RaftRpcReplyProto reply) {
     return reply.getRequestorId().toStringUtf8() + "->"
-        + reply.getReplyId().toString() + "," + reply.getSuccess();
+        + reply.getReplyId().toStringUtf8() + "," + reply.getSuccess();
   }
 
   public static RaftConfigurationProto toRaftConfigurationProto(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index da1aa3c..d595691 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -311,6 +311,8 @@ public class ServerState implements Closeable {
     stateMachineUpdater.stop();
     RaftServerImpl.LOG.info("{} closes. The last applied log index is {}",
         getSelfId(), getLastAppliedIndex());
+
+    log.close();
     storage.close();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 4d84a57..8b60f2d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -262,6 +262,9 @@ public abstract class RaftLog implements Closeable {
 
   @Override
   public String toString() {
+    if (!isOpen) {
+      return "Closed log";
+    }
     TermIndex last = getLastEntryTermIndex();
     return last == null ? "null" : Collections.singletonList(last).toString();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
index 5863f8d..2ae0660 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
@@ -77,7 +77,7 @@ class RaftLogCache {
     }
   }
 
-  private LogSegment openSegment;
+  private volatile LogSegment openSegment;
   private final List<LogSegment> closedSegments;
   private final RaftStorage storage;
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
index ee21d8b..1dc8ae1 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
  * raft peer.
  */
 class RaftLogWorker implements Runnable {
-  static final Logger LOG = LoggerFactory.getLogger(RaftLogWorker.class);
+  static final Logger LOG = RaftServerImpl.LOG;
   /**
    * The task queue accessed by rpc handler threads and the io worker thread.
    */
@@ -53,7 +53,7 @@ class RaftLogWorker implements Runnable {
   private final Thread workerThread;
 
   private final RaftStorage storage;
-  private LogOutputStream out;
+  private volatile LogOutputStream out;
   private final RaftServerImpl raftServer;
 
   /**
@@ -94,9 +94,11 @@ class RaftLogWorker implements Runnable {
     this.running = false;
     workerThread.interrupt();
     try {
-      workerThread.join();
+      workerThread.join(3000);
     } catch (InterruptedException ignored) {
     }
+    IOUtils.cleanup(LOG, out);
+    LOG.info("{} closes.", this.toString());
   }
 
   /**
@@ -120,7 +122,8 @@ class RaftLogWorker implements Runnable {
    * This is protected by the RaftServer and RaftLog's lock.
    */
   private Task addIOTask(Task task) {
-    LOG.debug("add task {}", task);
+    LOG.debug("{} adds IO task {}",
+        raftServer != null ? raftServer.getId() : "", task);
     try {
       if (!queue.offer(task, 1, TimeUnit.SECONDS)) {
         Preconditions.assertTrue(isAlive(),
@@ -162,12 +165,26 @@ class RaftLogWorker implements Runnable {
           task.done();
         }
       } catch (InterruptedException e) {
+        if (running) {
+          LOG.warn("{} got interrupted while still running",
+              Thread.currentThread().getName());
+        }
         LOG.info(Thread.currentThread().getName()
             + " was interrupted, exiting. There are " + queue.size()
             + " tasks remaining in the queue.");
+        Thread.currentThread().interrupt();
+        return;
       } catch (Throwable t) {
-        // TODO avoid terminating the jvm by supporting multiple log 
directories
-        ExitUtils.terminate(1, Thread.currentThread().getName() + " failed.", 
t, LOG);
+        if (!running) {
+          LOG.info("{} got closed and hit exception",
+              Thread.currentThread().getName(), t);
+        } else {
+          // TODO avoid terminating the jvm, we should
+          // 1) support multiple log directories
+          // 2) only shutdown the raft server impl
+          ExitUtils.terminate(1, Thread.currentThread().getName() + " failed.",
+              t, LOG);
+        }
       }
     }
   }
@@ -252,12 +269,14 @@ class RaftLogWorker implements Runnable {
 
     @Override
     public void execute() throws IOException {
-      IOUtils.cleanup(null, out);
+      IOUtils.cleanup(LOG, out);
       out = null;
       Preconditions.assertTrue(segmentToClose != null);
 
       File openFile = storage.getStorageDir()
           .getOpenLogFile(segmentToClose.getStartIndex());
+      LOG.info("{} finalizing log segment {}", RaftLogWorker.this.toString(),
+          openFile.getAbsolutePath());
       Preconditions.assertTrue(openFile.exists(),
           "File %s does not exist.", openFile);
       if (segmentToClose.numOfEntries() > 0) {
@@ -289,6 +308,8 @@ class RaftLogWorker implements Runnable {
     @Override
     void execute() throws IOException {
       File openFile = storage.getStorageDir().getOpenLogFile(newStartIndex);
+      LOG.info("{} creating new log segment {}", RaftLogWorker.this.toString(),
+          openFile.getAbsolutePath());
       Preconditions.assertTrue(!openFile.exists(), "open file %s exists for 
%s",
           openFile.getAbsolutePath(), RaftLogWorker.this.toString());
       Preconditions.assertTrue(out == null && pendingFlushNum == 0);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index 0c54af7..d82550b 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -26,7 +26,6 @@ import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.LogSegment.LogRecord;
 import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry;
 import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
-import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.CodeInjectionForTesting;
@@ -366,7 +365,10 @@ public class SegmentedRaftLog extends RaftLog {
 
   @Override
   public void close() throws IOException {
-    super.close();
+    try(AutoCloseableLock writeLock = writeLock()) {
+      super.close();
+      cache.clear();
+    }
     fileLogWorker.close();
     storage.close();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e6419972/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index b83e1f2..e9c6d65 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -70,11 +70,8 @@ public class RaftTestUtil {
     LOG.info(cluster.printServers());
     for(int i = 0; !cluster.tryEnforceLeader(leaderId) && i < 10; i++) {
       RaftServerImpl currLeader = cluster.getLeader();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("try enforcing leader to " + leaderId + " but "
-            + (currLeader == null? "no leader for this round"
-                : "new leader is " + currLeader.getId()));
-      }
+      LOG.info("try enforcing leader to " + leaderId + " but " +
+          (currLeader == null ? "no leader for this round" : "new leader is " 
+ currLeader.getId()));
     }
     LOG.info(cluster.printServers());
 

Reply via email to