Repository: incubator-ratis
Updated Branches:
  refs/heads/master 85d8e025f -> c59425465


RATIS-373. Tolerate the last partially written log entry.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c5942546
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c5942546
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c5942546

Branch: refs/heads/master
Commit: c59425465d531b57f17fdba35b1567076e4be306
Parents: 85d8e02
Author: Tsz Wo Nicholas Sze <[email protected]>
Authored: Fri Oct 26 17:42:11 2018 +0800
Committer: Tsz Wo Nicholas Sze <[email protected]>
Committed: Fri Oct 26 17:42:11 2018 +0800

----------------------------------------------------------------------
 .../java/org/apache/ratis/util/FileUtils.java   |   3 +-
 .../java/org/apache/ratis/util/IOUtils.java     |   3 +-
 .../org/apache/ratis/util/OpenCloseState.java   | 103 +++++++++++++++++++
 .../ratis/server/storage/LogOutputStream.java   |  11 +-
 .../apache/ratis/server/storage/LogReader.java  |  19 +++-
 .../apache/ratis/server/storage/RaftLog.java    |  23 +++--
 .../ratis/server/storage/RaftLogCache.java      |   6 +-
 .../server/storage/RaftStorageDirectory.java    |  11 +-
 .../ratis/server/storage/SegmentedRaftLog.java  |   4 +-
 .../java/org/apache/ratis/MiniRaftCluster.java  |  16 +--
 .../ratis/TestRaftServerSlownessDetection.java  |   3 +
 .../apache/ratis/server/ServerRestartTests.java |  95 +++++++++++++----
 .../ratis/server/simulation/RequestHandler.java |   5 +-
 .../server/storage/TestRaftLogSegment.java      |  61 ++++++++---
 .../statemachine/RaftSnapshotBaseTest.java      |   2 +-
 .../SimpleStateMachine4Testing.java             |   4 +-
 16 files changed, 302 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
index 3171d4e..d7f722b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
@@ -31,12 +31,13 @@ public interface FileUtils {
   Logger LOG = LoggerFactory.getLogger(FileUtils.class);
 
   static void truncateFile(File f, long target) throws IOException {
+    final long original = f.length();
     LogUtils.runAndLog(LOG,
         () -> {
           try (FileOutputStream out = new FileOutputStream(f, true)) {
             out.getChannel().truncate(target);
           }},
-        () -> "FileOutputStream.getChannel().truncate " + f + " to target 
length " + target);
+        () -> "FileOutputStream.getChannel().truncate " + f + " length: " + 
original + " -> " + target);
   }
 
   static OutputStream createNewFile(Path p) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
index c560bc6..8559239 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
@@ -97,7 +97,8 @@ public interface IOUtils {
     for(int toRead = len; toRead > 0; ) {
       final int ret = in.read(buf, off, toRead);
       if (ret < 0) {
-        throw new IOException( "Premature EOF from inputStream");
+        final int read = len - toRead;
+        throw new EOFException("Premature EOF: read length is " + len + " but 
encountered EOF at " + read);
       }
       toRead -= ret;
       off += ret;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java 
b/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java
new file mode 100644
index 0000000..c6ccbd3
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * The state of objects that can be unopened, opened, closed or exception.
+ */
+public class OpenCloseState {
+  private static class OpenTrace extends Throwable {
+    OpenTrace(String message) {
+      super(message);
+    }
+  }
+
+  private static class CloseTrace extends Throwable {
+    CloseTrace(String message) {
+      super(message);
+    }
+  }
+
+  private final String name;
+  private final Throwable initTrace;
+  /**
+   * The referenced {@link Throwable} indicates the state:
+   *   InitTrace: unopened
+   *   OpenTrace: opened
+   *   CloseTrace: closed
+   *   Other {@link Throwable} subclass: exception
+   */
+  private final AtomicReference<Throwable> state;
+
+  public OpenCloseState(String name) {
+    this.name = name;
+    this.initTrace = new Throwable("Initialize " + name);
+    this.state = new AtomicReference<>(initTrace);
+  }
+
+  /**
+   * Assert this is open.
+   */
+  public void assertOpen() {
+    final Throwable t = state.get();
+    if (!(t instanceof OpenTrace)) {
+      final String s = name + " is expected to be opened but it is " + 
toString(t);
+      throw new IllegalArgumentException(s, t);
+    }
+  }
+
+  /**
+   * Transit to open state.
+   * The method is NOT idempotent.
+   */
+  public void open() throws IOException {
+    final OpenTrace openTrace = new OpenTrace("Open " + name);
+    final Throwable t = state.updateAndGet(previous -> previous == initTrace? 
openTrace: previous);
+    if (t != openTrace) {
+      throw new IOException("Failed to open " + name +" since it is " + 
toString(t));
+    }
+  }
+
+  /**
+   * Transit to close state.
+   * The method is idempotent.
+   */
+  public void close() throws IOException {
+    final Throwable t = state.updateAndGet(
+        previous -> previous instanceof OpenTrace? new CloseTrace("Close "+ 
name): previous);
+    // If t is CloseTrace, t is either the previous or the newly created 
object.
+    if (!(t instanceof CloseTrace)) {
+      throw new IOException("Failed to close " + name + " since it is " + 
toString(t));
+    }
+  }
+
+  @Override
+  public String toString() {
+    return toString(state.get());
+  }
+
+  private String toString(Throwable t) {
+    return t == initTrace? "UNOPENED"
+        : t instanceof OpenTrace? "OPENED"
+        : t instanceof CloseTrace? "CLOSED"
+        : t.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
index a09b90d..118a3e8 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
@@ -92,9 +92,16 @@ public class LogOutputStream implements Closeable {
   }
 
   /**
+   * Write the given entry to this output stream.
+   *
    * Format:
-   * LogEntryProto's protobuf
-   * 4-byte checksum of the above protobuf
+   *   (1) The serialized size of the entry.
+   *   (2) The entry.
+   *   (3) 4-byte checksum of the entry.
+   *
+   * Size in bytes to be written:
+   *   (size to encode n) + n + (checksum size),
+   *   where n is the entry serialized size and the checksum size is 4.
    */
   public void write(LogEntryProto entry) throws IOException {
     final int serialized = entry.getSerializedSize();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java
index ce149dd..735e8a3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java
@@ -25,12 +25,15 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.PureJavaCrc32C;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.*;
 import java.nio.charset.StandardCharsets;
 import java.util.zip.Checksum;
 
 public class LogReader implements Closeable {
+  static final Logger LOG = LoggerFactory.getLogger(LogReader.class);
   /**
    * InputStream wrapper that keeps track of the current stream position.
    *
@@ -121,12 +124,14 @@ public class LogReader implements Closeable {
 
   private static final int maxOpSize = 32 * 1024 * 1024;
 
+  private final File file;
   private final LimitedInputStream limiter;
   private final DataInputStream in;
   private byte[] temp = new byte[4096];
   private final Checksum checksum;
 
   LogReader(File file) throws FileNotFoundException {
+    this.file = file;
     this.limiter = new LimitedInputStream(
         new BufferedInputStream(new FileInputStream(file)));
     in = new DataInputStream(limiter);
@@ -153,6 +158,16 @@ public class LogReader implements Closeable {
   LogEntryProto readEntry() throws IOException {
     try {
       return decodeEntry();
+    } catch (EOFException eof) {
+      in.reset();
+      // The last entry is partially written.
+      // It is okay to ignore it since this entry is never committed in this 
server.
+      if (LOG.isWarnEnabled()) {
+        LOG.warn("Ignoring the last partial written log entry in " + file + ": 
" + eof);
+      } else if (LOG.isTraceEnabled()) {
+        LOG.trace("Ignoring the last partial written log entry in " + file , 
eof);
+      }
+      return null;
     } catch (IOException e) {
       in.reset();
 
@@ -286,7 +301,7 @@ public class LogReader implements Closeable {
   }
 
   @Override
-  public void close() throws IOException {
-    IOUtils.cleanup(null, in);
+  public void close() {
+    IOUtils.cleanup(LOG, in);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index effc027..8b88b31 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -29,13 +29,13 @@ import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.OpenCloseState;
 import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
@@ -53,6 +53,7 @@ public abstract class RaftLog implements Closeable {
   public static final Logger LOG = LoggerFactory.getLogger(RaftLog.class);
   public static final String LOG_SYNC = RaftLog.class.getSimpleName() + 
".logSync";
 
+
   /**
    * The largest committed index. Note the last committed log may be included
    * in the latest snapshot file.
@@ -63,11 +64,12 @@ public abstract class RaftLog implements Closeable {
   private final int maxBufferSize;
 
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
-  private volatile boolean isOpen = false;
+  private final OpenCloseState state;
 
   public RaftLog(RaftPeerId selfId, int maxBufferSize) {
     this.selfId = selfId;
     this.maxBufferSize = maxBufferSize;
+    this.state = new OpenCloseState(getName());
   }
 
   public long getLastCommittedIndex() {
@@ -75,8 +77,7 @@ public abstract class RaftLog implements Closeable {
   }
 
   public void checkLogState() {
-    Preconditions.assertTrue(isOpen,
-        () -> getSelfId() + ": The RaftLog has not been opened or has been 
closed");
+    state.assertOpen();
   }
 
   /**
@@ -182,7 +183,7 @@ public abstract class RaftLog implements Closeable {
 
   public void open(long lastIndexInSnapshot, Consumer<LogEntryProto> consumer)
       throws IOException {
-    isOpen = true;
+    state.open();
   }
 
   public abstract long getStartIndex();
@@ -293,11 +294,7 @@ public abstract class RaftLog implements Closeable {
 
   @Override
   public String toString() {
-    if (!isOpen) {
-      return "Closed log";
-    }
-    TermIndex last = getLastEntryTermIndex();
-    return last == null ? "null" : Collections.singletonList(last).toString();
+    return getName() + ":" + state;
   }
 
   public static class Metadata {
@@ -336,13 +333,17 @@ public abstract class RaftLog implements Closeable {
 
   @Override
   public void close() throws IOException {
-    isOpen = false;
+    state.close();
   }
 
   public RaftPeerId getSelfId() {
     return selfId;
   }
 
+  public String getName() {
+    return selfId + "-" + getClass().getSimpleName();
+  }
+
   /**
    * Holds proto entry along with future which contains read state machine data
    */

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
index 0cb1047..25291c8 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
@@ -105,10 +105,10 @@ class RaftLogCache {
     return maxCachedSegments;
   }
 
-  void loadSegment(LogPathAndIndex pi, boolean isOpen, boolean 
keepEntryInCache,
+  void loadSegment(LogPathAndIndex pi, boolean keepEntryInCache,
       Consumer<LogEntryProto> logConsumer) throws IOException {
-    LogSegment logSegment = LogSegment.loadSegment(storage, pi.path.toFile(),
-        pi.startIndex, pi.endIndex, isOpen, keepEntryInCache, logConsumer);
+    LogSegment logSegment = LogSegment.loadSegment(storage, 
pi.getPath().toFile(),
+        pi.startIndex, pi.endIndex, pi.isOpen(), keepEntryInCache, 
logConsumer);
     addSegment(logSegment);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
index c2feabb..2242934 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.server.storage;
 
+import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.util.AtomicFileOutputStream;
 import org.apache.ratis.util.FileUtils;
 import org.slf4j.Logger;
@@ -61,7 +62,7 @@ public class RaftStorageDirectory {
   }
 
   public static class LogPathAndIndex {
-    public final Path path;
+    private final Path path;
     public final long startIndex;
     public final long endIndex;
 
@@ -71,6 +72,14 @@ public class RaftStorageDirectory {
       this.endIndex = endIndex;
     }
 
+    public Path getPath() {
+      return path;
+    }
+
+    public boolean isOpen() {
+      return endIndex == RaftServerConstants.INVALID_LOG_INDEX;
+    }
+
     @Override
     public String toString() {
       return path + "-" + startIndex + "-" + endIndex;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index c16c478..0dc7cea 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -20,7 +20,6 @@ package org.apache.ratis.server.storage;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -141,7 +140,6 @@ public class SegmentedRaftLog extends RaftLog {
       List<LogPathAndIndex> paths = 
storage.getStorageDir().getLogSegmentFiles();
       int i = 0;
       for (LogPathAndIndex pi : paths) {
-        boolean isOpen = pi.endIndex == RaftServerConstants.INVALID_LOG_INDEX;
         // During the initial loading, we can only confirm the committed
         // index based on the snapshot. This means if a log segment is not kept
         // in cache after the initial loading, later we have to load its 
content
@@ -150,7 +148,7 @@ public class SegmentedRaftLog extends RaftLog {
         // so that during the initial loading we can apply part of the log
         // entries to the state machine
         boolean keepEntryInCache = (paths.size() - i++) <= 
cache.getMaxCachedSegments();
-        cache.loadSegment(pi, isOpen, keepEntryInCache, logConsumer);
+        cache.loadSegment(pi, keepEntryInCache, logConsumer);
       }
 
       // if the largest index is smaller than the last index in snapshot, we do

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 2d28896..3dd0612 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -204,15 +204,17 @@ public abstract class MiniRaftCluster implements 
Closeable {
   /**
    * start a stopped server again.
    */
-  public void restartServer(RaftPeerId newId, boolean format) throws 
IOException {
-    restartServer(newId, group, format);
+  public RaftServerImpl restartServer(RaftPeerId newId, boolean format) throws 
IOException {
+    return restartServer(newId, group, format);
   }
 
-  public void restartServer(RaftPeerId newId, RaftGroup group, boolean format) 
throws IOException {
+  public RaftServerImpl restartServer(RaftPeerId newId, RaftGroup group, 
boolean format) throws IOException {
     killServer(newId);
     servers.remove(newId);
 
-    putNewServer(newId, group, format).start();
+    final RaftServerProxy proxy = putNewServer(newId, group, format);
+    proxy.start();
+    return group == null? null: proxy.getImpl(group.getGroupId());
   }
 
   public void restart(boolean format) throws IOException {
@@ -579,18 +581,18 @@ public abstract class MiniRaftCluster implements 
Closeable {
     LOG.info(printServers());
 
     final ExecutorService executor = 
Executors.newFixedThreadPool(servers.size(), Daemon::new);
+    getServers().forEach(proxy -> executor.submit(proxy::close));
     try {
-      getServers().forEach(proxy -> executor.submit(proxy::close));
+      executor.shutdown();
       // just wait for a few seconds
       executor.awaitTermination(5, TimeUnit.SECONDS);
     } catch(InterruptedException e) {
       LOG.warn("shutdown interrupted", e);
-    } finally {
-      executor.shutdownNow();
     }
 
     timer.cancel();
     ExitUtils.assertNotTerminated();
+    LOG.info(getClass().getSimpleName() + " shutdown completed");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
 
b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
index e109598..96a164e 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
@@ -31,6 +31,7 @@ import org.apache.ratis.util.TimeDuration;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 
@@ -41,6 +42,8 @@ import java.util.concurrent.TimeUnit;
 /**
  * Test Raft Server Slowness detection and notification to Leader's 
statemachine.
  */
+//TODO: fix StateMachine.notifySlowness(..); see RATIS-370
+@Ignore
 public class TestRaftServerSlownessDetection extends BaseTest {
   static {
     LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java 
b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index 5353caa..95ac858 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -25,23 +25,30 @@ import org.apache.ratis.RaftTestUtil.SimpleMessage;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.impl.ServerState;
 import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
 import org.apache.ratis.util.SizeInBytes;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 
-import java.util.ArrayList;
-import java.util.Arrays;
+import java.io.File;
+import java.nio.file.Path;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 /**
  * Test restarting raft peers.
@@ -49,15 +56,14 @@ import java.util.List;
 public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
     extends BaseTest
     implements MiniRaftCluster.Factory.Get<CLUSTER> {
-  static {
+  {
     LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
     LogUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
   }
 
   static final int NUM_SERVERS = 3;
 
-  @Before
-  public void setup() {
+  {
     final RaftProperties prop = getProperties();
     prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
         SimpleStateMachine4Testing.class, StateMachine.class);
@@ -75,15 +81,11 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
     cluster.start();
     RaftTestUtil.waitForLeader(cluster);
     final RaftPeerId leaderId = cluster.getLeader().getId();
-    final RaftClient client = cluster.createClient(leaderId);
 
     // write some messages
-    final byte[] content = new byte[1024];
-    Arrays.fill(content, (byte)1);
-    final SimpleMessage message = new SimpleMessage(new String(content));
-    for(int i = 0; i < 10; i++) {
-      Assert.assertTrue(client.send(message).isSuccess());
-    }
+    final AtomicInteger messageCount = new AtomicInteger();
+    final Supplier<Message> newMessage = () -> new SimpleMessage("m" + 
messageCount.getAndIncrement());
+    writeSomething(newMessage, cluster);
 
     // restart a follower
     RaftPeerId followerId = cluster.getFollowers().get(0).getId();
@@ -91,10 +93,8 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
     cluster.restartServer(followerId, false);
 
     // write some more messages
-    for(int i = 0; i < 10; i++) {
-      Assert.assertTrue(client.send(message).isSuccess());
-    }
-    client.close();
+    writeSomething(newMessage, cluster);
+    final int truncatedMessageIndex = messageCount.get() - 1;
 
     final long leaderLastIndex = 
cluster.getLeader().getState().getLog().getLastEntryTermIndex().getIndex();
     // make sure the restarted follower can catchup
@@ -103,8 +103,63 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
         10, 500, "follower catchup", LOG);
 
     // make sure the restarted peer's log segments is correct
-    cluster.restartServer(followerId, false);
-    Assert.assertTrue(cluster.getRaftServerImpl(followerId).getState().getLog()
-        .getLastEntryTermIndex().getIndex() >= leaderLastIndex);
+    final RaftServerImpl follower = cluster.restartServer(followerId, false);
+    final RaftLog followerLog = follower.getState().getLog();
+    final long followerLastIndex = 
followerLog.getLastEntryTermIndex().getIndex();
+    Assert.assertTrue(followerLastIndex >= leaderLastIndex);
+
+    final File followerOpenLogFile = getOpenLogFile(follower);
+    final File leaderOpenLogFile = 
getOpenLogFile(cluster.getRaftServerImpl(leaderId));
+
+    // shutdown all servers
+    cluster.getServers().forEach(RaftServerProxy::close);
+
+    // truncate log and
+    assertTruncatedLog(followerId, followerOpenLogFile, followerLastIndex, 
cluster);
+    assertTruncatedLog(leaderId, leaderOpenLogFile, leaderLastIndex, cluster);
+
+    // restart and write something.
+    cluster.restart(false);
+    writeSomething(newMessage, cluster);
+
+    // restart again and check messages.
+    cluster.restart(false);
+    try(final RaftClient client = cluster.createClient()) {
+      for(int i = 0; i < messageCount.get(); i++) {
+        if (i != truncatedMessageIndex) {
+          final Message m = new SimpleMessage("m" + i);
+          final RaftClientReply reply = client.sendReadOnly(m);
+          Assert.assertTrue(reply.isSuccess());
+          LOG.info("query {}: {} {}", m, reply, 
LogEntryProto.parseFrom(reply.getMessage().getContent()));
+        }
+      }
+    }
+  }
+
+  static void writeSomething(Supplier<Message> newMessage, MiniRaftCluster 
cluster) throws Exception {
+    try(final RaftClient client = cluster.createClient()) {
+      // write some messages
+      for(int i = 0; i < 10; i++) {
+        Assert.assertTrue(client.send(newMessage.get()).isSuccess());
+      }
+    }
+  }
+
+  static void assertTruncatedLog(RaftPeerId id, File openLogFile, long 
lastIndex, MiniRaftCluster cluster) throws Exception {
+    // truncate log
+    FileUtils.truncateFile(openLogFile, openLogFile.length() - 1);
+    final RaftServerImpl server = cluster.restartServer(id, false);
+    // the last index should be one less than before
+    Assert.assertEquals(lastIndex - 1, 
server.getState().getLog().getLastEntryTermIndex().getIndex());
+    server.getProxy().close();
+  }
+
+  static File getOpenLogFile(RaftServerImpl server) throws Exception {
+    final List<Path> openLogs = 
server.getState().getStorage().getStorageDir().getLogSegmentFiles().stream()
+        .filter(LogPathAndIndex::isOpen)
+        .map(LogPathAndIndex::getPath)
+        .collect(Collectors.toList());
+    Assert.assertEquals(1, openLogs.size());
+    return openLogs.get(0).toFile();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
index 5c12ef4..802797f 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java
@@ -77,7 +77,7 @@ public class RequestHandler<REQUEST extends RaftRpcMessage,
   void interruptAndJoinDaemon() throws InterruptedException {
     daemons.forEach(Thread::interrupt);
     for (Daemon d : daemons) {
-      d.join();
+      d.join(1000);
     }
   }
 
@@ -118,6 +118,9 @@ public class RequestHandler<REQUEST extends RaftRpcMessage,
     public void run() {
       while (handlerImpl.isAlive()) {
         try {
+          if (Thread.interrupted()) {
+            throw new InterruptedException(this + " was interrupted 
previously.");
+          }
           handleRequest(rpc.takeRequest(getServerId()));
         } catch (InterruptedIOException e) {
           LOG.info(this + " is interrupted by " + e);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
index a6150aa..bc2339d 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
@@ -26,7 +26,9 @@ import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
 import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TraditionalBinaryPrefix;
 import org.junit.After;
@@ -42,6 +44,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static 
org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
 import static org.apache.ratis.server.storage.LogSegment.getEntrySize;
@@ -75,26 +78,47 @@ public class TestRaftLogSegment extends BaseTest {
     }
   }
 
-  private File prepareLog(boolean isOpen, long start, int size, long term)
+  File prepareLog(boolean isOpen, long startIndex, int numEntries, long term, 
boolean isLastEntryPartiallyWritten)
       throws IOException {
+    if (!isOpen) {
+      Preconditions.assertTrue(!isLastEntryPartiallyWritten, "For closed log, 
the last entry cannot be partially written.");
+    }
     RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
-    File file = isOpen ? storage.getStorageDir().getOpenLogFile(start) :
-        storage.getStorageDir().getClosedLogFile(start, start + size - 1);
+    final File file = isOpen ?
+        storage.getStorageDir().getOpenLogFile(startIndex) :
+        storage.getStorageDir().getClosedLogFile(startIndex, startIndex + 
numEntries - 1);
 
-    LogEntryProto[] entries = new LogEntryProto[size];
+    final LogEntryProto[] entries = new LogEntryProto[numEntries];
     try (LogOutputStream out = new LogOutputStream(file, false,
         segmentMaxSize, preallocatedSize, bufferSize)) {
-      for (int i = 0; i < size; i++) {
+      for (int i = 0; i < entries.length; i++) {
         SimpleOperation op = new SimpleOperation("m" + i);
-        entries[i] = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 
term, i + start);
+        entries[i] = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 
term, i + startIndex);
         out.write(entries[i]);
       }
     }
+
+    if (isLastEntryPartiallyWritten) {
+      final int entrySize = size(entries[entries.length - 1]);
+      final int truncatedEntrySize = 
ThreadLocalRandom.current().nextInt(entrySize - 1) + 1;
+      // 0 < truncatedEntrySize < entrySize
+      final long fileLength = file.length();
+      final long truncatedFileLength = fileLength - (entrySize - 
truncatedEntrySize);
+      LOG.info("truncate last entry: entry(size={}, truncated={}), 
file(length={}, truncated={})",
+          entrySize, truncatedEntrySize, fileLength, truncatedFileLength);
+      FileUtils.truncateFile(file, truncatedFileLength);
+    }
+
     storage.close();
     return file;
   }
 
-  private void checkLogSegment(LogSegment segment, long start, long end,
+  static int size(LogEntryProto entry) {
+    final int n = entry.getSerializedSize();
+    return CodedOutputStream.computeUInt32SizeNoTag(n) + n + 4;
+  }
+
+  static void checkLogSegment(LogSegment segment, long start, long end,
       boolean isOpen, long totalSize, long term) throws Exception {
     Assert.assertEquals(start, segment.getStartIndex());
     Assert.assertEquals(end, segment.getEndIndex());
@@ -117,27 +141,38 @@ public class TestRaftLogSegment extends BaseTest {
 
   @Test
   public void testLoadLogSegment() throws Exception {
-    testLoadSegment(true);
+    testLoadSegment(true, false);
+  }
+
+  @Test
+  public void testLoadLogSegmentLastEntryPartiallyWritten() throws Exception {
+    testLoadSegment(true, true);
   }
 
   @Test
   public void testLoadCache() throws Exception {
-    testLoadSegment(false);
+    testLoadSegment(false, false);
+  }
+
+  @Test
+  public void testLoadCacheLastEntryPartiallyWritten() throws Exception {
+    testLoadSegment(false, true);
   }
 
-  private void testLoadSegment(boolean loadInitial) throws Exception {
+  private void testLoadSegment(boolean loadInitial, boolean 
isLastEntryPartiallyWritten) throws Exception {
     // load an open segment
-    File openSegmentFile = prepareLog(true, 0, 100, 0);
+    final File openSegmentFile = prepareLog(true, 0, 100, 0, 
isLastEntryPartiallyWritten);
     RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
     LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile, 
0,
         INVALID_LOG_INDEX, true, loadInitial, null);
-    checkLogSegment(openSegment, 0, 99, true, openSegmentFile.length(), 0);
+    final int delta = isLastEntryPartiallyWritten? 1: 0;
+    checkLogSegment(openSegment, 0, 99 - delta, true, 
openSegmentFile.length(), 0);
     storage.close();
     // for open segment we currently always keep log entries in the memory
     Assert.assertEquals(0, openSegment.getLoadingTimes());
 
     // load a closed segment (1000-1099)
-    File closedSegmentFile = prepareLog(false, 1000, 100, 1);
+    final File closedSegmentFile = prepareLog(false, 1000, 100, 1, false);
     LogSegment closedSegment = LogSegment.loadSegment(storage, 
closedSegmentFile,
         1000, 1099, false, loadInitial, null);
     checkLogSegment(closedSegment, 1000, 1099, false,

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index 0a5e38d..cbb1ee2 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -189,7 +189,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest 
{
 
     // delete the log segments from the leader
     for (LogPathAndIndex path : logs) {
-      FileUtils.deleteFile(path.path.toFile());
+      FileUtils.delete(path.getPath());
     }
 
     // restart the peer

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c5942546/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 313e713..2c6883e 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -151,7 +151,9 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
   private void put(LogEntryProto entry) {
     final LogEntryProto previous = indexMap.put(entry.getIndex(), entry);
     Preconditions.assertNull(previous, "previous");
-    dataMap.put(entry.getStateMachineLogEntry().getLogData().toStringUtf8(), 
entry);
+    final String s = 
entry.getStateMachineLogEntry().getLogData().toStringUtf8();
+    dataMap.put(s, entry);
+    LOG.info("put {}, {} -> {}", entry.getIndex(), s, 
ServerProtoUtils.toLogEntryString(entry));
   }
 
   @Override


Reply via email to