Repository: incubator-ratis
Updated Branches:
  refs/heads/master b901b3a5a -> 4d723a2c7


RATIS-377. Tolerate partially written log header.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/4d723a2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/4d723a2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/4d723a2c

Branch: refs/heads/master
Commit: 4d723a2c73b353f5228b067635383e03a697f563
Parents: b901b3a
Author: Tsz Wo Nicholas Sze <[email protected]>
Authored: Mon Oct 29 13:46:31 2018 +0800
Committer: Tsz Wo Nicholas Sze <[email protected]>
Committed: Mon Oct 29 13:46:31 2018 +0800

----------------------------------------------------------------------
 .../apache/ratis/io/CorruptedFileException.java | 27 +++++++
 .../org/apache/ratis/util/CheckedConsumer.java  | 37 +++++++++
 .../org/apache/ratis/util/OpenCloseState.java   | 38 +++++++--
 .../java/org/apache/ratis/util/StringUtils.java |  5 ++
 .../apache/ratis/server/impl/LogAppender.java   |  4 +
 .../ratis/server/impl/RaftServerConstants.java  |  1 -
 .../ratis/server/storage/LogInputStream.java    | 55 ++++++-------
 .../ratis/server/storage/LogOutputStream.java   |  6 +-
 .../apache/ratis/server/storage/LogReader.java  | 44 ++++++++---
 .../apache/ratis/server/storage/LogSegment.java | 29 ++++---
 .../ratis/server/storage/RaftLogCache.java      |  4 +-
 .../ratis/server/storage/SegmentedRaftLog.java  |  3 -
 .../server/storage/SegmentedRaftLogFormat.java  | 82 ++++++++++++++++++++
 .../apache/ratis/server/ServerRestartTests.java | 54 ++++++++++++-
 .../apache/ratis/server/TestRaftLogMetrics.java | 18 +++--
 .../server/storage/TestRaftLogReadWrite.java    |  4 +-
 .../server/storage/TestRaftLogSegment.java      | 10 +--
 .../server/storage/TestSegmentedRaftLog.java    | 13 ++++
 18 files changed, 354 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-common/src/main/java/org/apache/ratis/io/CorruptedFileException.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/io/CorruptedFileException.java 
b/ratis-common/src/main/java/org/apache/ratis/io/CorruptedFileException.java
new file mode 100644
index 0000000..3cf525b
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/io/CorruptedFileException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.io;
+
+import java.io.File;
+import java.io.IOException;
+
+public class CorruptedFileException extends IOException {
+  public CorruptedFileException(File file, String message) {
+    super("File " + file + " (exist? " + file.exists() + ", length=" + 
file.length() + ") is corrupted: " + message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-common/src/main/java/org/apache/ratis/util/CheckedConsumer.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/CheckedConsumer.java 
b/ratis-common/src/main/java/org/apache/ratis/util/CheckedConsumer.java
new file mode 100644
index 0000000..0532ea1
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedConsumer.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+/** Consumer with a throws-clause. */
+@FunctionalInterface
+public interface CheckedConsumer<INPUT, THROWABLE extends Throwable> {
+  /**
+   * The same as {@link java.util.function.Consumer#accept(Object)}
+   * except that this method is declared with a throws-clause.
+   */
+  void accept(INPUT input) throws THROWABLE;
+
+  /** @return a {@link CheckedFunction} with {@link Void} return type. */
+  static <INPUT, THROWABLE extends Throwable> CheckedFunction<INPUT, Void, 
THROWABLE> asCheckedFunction(
+      CheckedConsumer<INPUT, THROWABLE> consumer) {
+    return input -> {
+      consumer.accept(input);
+      return null;
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java 
b/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java
index c6ccbd3..7847c21 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java
@@ -64,6 +64,22 @@ public class OpenCloseState {
     }
   }
 
+  public boolean isUnopened() {
+    return state.get() == initTrace;
+  }
+
+  public boolean isOpened() {
+    return state.get() instanceof OpenTrace;
+  }
+
+  public boolean isClosed() {
+    return state.get() instanceof CloseTrace;
+  }
+
+  public Throwable getThrowable() {
+    return state.get();
+  }
+
   /**
    * Transit to open state.
    * The method is NOT idempotent.
@@ -76,17 +92,27 @@ public class OpenCloseState {
     }
   }
 
+  private boolean readyToClose(Throwable t) {
+    return t == initTrace || t instanceof OpenTrace;
+  }
+
   /**
    * Transit to close state.
    * The method is idempotent.
+   *
+   * @return true if the state is transited to close as a result of the 
invocation of this method.
+   *         Otherwise, return false since it is already closed.
+   *
+   * @throws IOException if the current state is not allowed to transit to 
close.
    */
-  public void close() throws IOException {
-    final Throwable t = state.updateAndGet(
-        previous -> previous instanceof OpenTrace? new CloseTrace("Close "+ 
name): previous);
-    // If t is CloseTrace, t is either the previous or the newly created 
object.
-    if (!(t instanceof CloseTrace)) {
-      throw new IOException("Failed to close " + name + " since it is " + 
toString(t));
+  public boolean close() throws IOException {
+    final Throwable previous = state.getAndUpdate(prev -> readyToClose(prev)? 
new CloseTrace("Close "+ name): prev);
+    if (readyToClose(previous)) {
+      return true;
+    } else if (previous instanceof CloseTrace) {
+      return false; // already closed
     }
+    throw new IOException("Failed to close " + name + " since it is " + 
toString(previous));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
index 6a9442e..be797a0 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
@@ -88,6 +88,11 @@ public class StringUtils {
     return bytes2HexString(ByteBuffer.wrap(bytes));
   }
 
+  public static String bytes2HexString(byte[] bytes, int offset, int length) {
+    Objects.requireNonNull(bytes, "bytes == null");
+    return bytes2HexString(ByteBuffer.wrap(bytes, offset, length));
+  }
+
   public static String bytes2HexString(ByteBuffer bytes) {
     Objects.requireNonNull(bytes, "bytes == null");
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 0542f8b..02e0f56 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -44,6 +44,7 @@ import static 
org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX
 import static org.apache.ratis.util.LifeCycle.State.CLOSED;
 import static org.apache.ratis.util.LifeCycle.State.CLOSING;
 import static org.apache.ratis.util.LifeCycle.State.EXCEPTION;
+import static org.apache.ratis.util.LifeCycle.State.NEW;
 import static org.apache.ratis.util.LifeCycle.State.RUNNING;
 import static org.apache.ratis.util.LifeCycle.State.STARTING;
 
@@ -117,6 +118,9 @@ public class LogAppender {
   }
 
   public void stopAppender() {
+    if (lifeCycle.compareAndTransition(NEW, CLOSED)) {
+      return;
+    }
     lifeCycle.transition(CLOSING);
     daemon.interrupt();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
index 35e3df8..a2b7057 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
@@ -19,7 +19,6 @@ package org.apache.ratis.server.impl;
 
 public interface RaftServerConstants {
   long INVALID_LOG_INDEX = -1;
-  byte LOG_TERMINATE_BYTE = 0;
   long DEFAULT_CALLID = 0;
   long DEFAULT_SEQNUM = 0L;
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java
index 22d7949..6eb1e38 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java
@@ -23,9 +23,11 @@ import java.io.Closeable;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
+import java.util.Optional;
 
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.OpenCloseState;
 import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,17 +59,11 @@ public class LogInputStream implements Closeable {
     }
   }
 
-  private enum State {
-    UNINIT,
-    OPEN,
-    CLOSED
-  }
-
   private final File logFile;
   private final long startIndex;
   private final long endIndex;
   private final boolean isOpen;
-  private State state = State.UNINIT;
+  private final OpenCloseState state;
   private LogReader reader;
 
   public LogInputStream(File log, long startIndex, long endIndex,
@@ -82,20 +78,19 @@ public class LogInputStream implements Closeable {
     this.startIndex = startIndex;
     this.endIndex = endIndex;
     this.isOpen = isOpen;
+    this.state = new OpenCloseState(getName());
   }
 
   private void init() throws IOException {
-    Preconditions.assertTrue(state == State.UNINIT);
+    state.open();
     try {
-      reader = new LogReader(logFile);
-      // read the log header
-      String header = reader.readLogHeader();
-      Preconditions.assertTrue(SegmentedRaftLog.HEADER_STR.equals(header),
-          "Corrupted log header: %s", header);
-      state = State.OPEN;
+      final LogReader r = new LogReader(logFile);
+      if (r.verifyHeader()) {
+        reader = r;
+      }
     } finally {
       if (reader == null) {
-        state = State.CLOSED;
+        state.close();
       }
     }
   }
@@ -113,19 +108,18 @@ public class LogInputStream implements Closeable {
   }
 
   public LogEntryProto nextEntry() throws IOException {
-    LogEntryProto entry = null;
-    switch (state) {
-      case UNINIT:
+    if (state.isUnopened()) {
         try {
           init();
         } catch (Throwable e) {
           LOG.error("caught exception initializing " + this, e);
           throw IOUtils.asIOException(e);
         }
-        Preconditions.assertTrue(state != State.UNINIT);
-        return nextEntry();
-      case OPEN:
-        entry = reader.readEntry();
+    }
+
+    Preconditions.assertTrue(!state.isUnopened());
+    if (state.isOpened()) {
+        final LogEntryProto entry = reader.readEntry();
         if (entry != null) {
           long index = entry.getIndex();
           if (!isOpen() && index >= endIndex) {
@@ -142,20 +136,20 @@ public class LogInputStream implements Closeable {
             }
           }
         }
-        break;
-      case CLOSED:
-        break; // return null
+        return entry;
+    } else if (state.isClosed()) {
+      return null;
     }
-    return entry;
+    throw new IOException("Failed to get next entry from " + this, 
state.getThrowable());
   }
 
   long scanNextEntry() throws IOException {
-    Preconditions.assertTrue(state == State.OPEN);
+    state.assertOpen();
     return reader.scanEntry();
   }
 
   long getPosition() {
-    if (state == State.OPEN) {
+    if (state.isOpened()) {
       return reader.getPos();
     } else {
       return 0;
@@ -164,10 +158,9 @@ public class LogInputStream implements Closeable {
 
   @Override
   public void close() throws IOException {
-    if (state == State.OPEN) {
-      reader.close();
+    if (state.close()) {
+      Optional.ofNullable(reader).ifPresent(LogReader::close);
     }
-    state = State.CLOSED;
   }
 
   boolean isOpen() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
index 118a3e8..46da9da 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
@@ -17,9 +17,9 @@
  */
 package org.apache.ratis.server.storage;
 
-import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.util.CheckedConsumer;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.PureJavaCrc32C;
@@ -43,7 +43,7 @@ public class LogOutputStream implements Closeable {
     fill = ByteBuffer.allocateDirect(BUFFER_SIZE);
     fill.position(0);
     for (int i = 0; i < fill.capacity(); i++) {
-      fill.put(RaftServerConstants.LOG_TERMINATE_BYTE);
+      fill.put(SegmentedRaftLogFormat.getTerminator());
     }
   }
 
@@ -136,7 +136,7 @@ public class LogOutputStream implements Closeable {
     preallocatedPos = 0;
     preallocate(); // preallocate file
 
-    out.write(SegmentedRaftLog.HEADER_BYTES);
+    
SegmentedRaftLogFormat.applyHeaderTo(CheckedConsumer.asCheckedFunction(out::write));
     flush();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java
index 735e8a3..b15b72f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.server.storage;
 
+import org.apache.ratis.io.CorruptedFileException;
 import org.apache.ratis.protocol.ChecksumException;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.thirdparty.com.google.protobuf.CodedInputStream;
@@ -25,11 +26,11 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.PureJavaCrc32C;
+import org.apache.ratis.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.*;
-import java.nio.charset.StandardCharsets;
 import java.util.zip.Checksum;
 
 public class LogReader implements Closeable {
@@ -138,13 +139,37 @@ public class LogReader implements Closeable {
     checksum = new PureJavaCrc32C();
   }
 
-  String readLogHeader() throws IOException {
-    byte[] header = new byte[SegmentedRaftLog.HEADER_BYTES.length];
-    int num = in.read(header);
-    if (num < header.length) {
-      throw new EOFException("EOF before reading a complete log header");
+  /**
+   * Read header from the log file:
+   *
+   * (1) The header in file is verified successfully.
+   *     Then, return true.
+   *
+   * (2) The header in file is partially written.
+   *     Then, return false.
+   *
+   * (3) The header in file is corrupted or there is some other {@link 
IOException}.
+   *     Then, throw an exception.
+   */
+  boolean verifyHeader() throws IOException {
+    final int headerLength = SegmentedRaftLogFormat.getHeaderLength();
+    final int readLength = in.read(temp, 0, headerLength);
+    Preconditions.assertTrue(readLength <= headerLength);
+    final int matchLength = SegmentedRaftLogFormat.matchHeader(temp, 0, 
readLength);
+    Preconditions.assertTrue(matchLength <= readLength);
+
+    if (readLength == headerLength && matchLength == readLength) {
+      // The header is matched successfully
+      return true;
+    } else if (SegmentedRaftLogFormat.isTerminator(temp, matchLength, 
readLength - matchLength)) {
+      // The header is partially written
+      return false;
     }
-    return new String(header, StandardCharsets.UTF_8);
+    // The header is corrupted
+    throw new CorruptedFileException(file, "Log header mismatched: expected 
header length="
+        + SegmentedRaftLogFormat.getHeaderLength() + ", read length=" + 
readLength + ", match length=" + matchLength
+        + ", header in file=" + StringUtils.bytes2HexString(temp, 0, 
readLength)
+        + ", expected header=" + 
SegmentedRaftLogFormat.applyHeaderTo(StringUtils::bytes2HexString));
   }
 
   /**
@@ -196,13 +221,12 @@ public class LogReader implements Closeable {
     int numRead = -1, idx = 0;
     while (true) {
       try {
-        numRead = -1;
         numRead = in.read(temp);
         if (numRead == -1) {
           return;
         }
         for (idx = 0; idx < numRead; idx++) {
-          if (temp[idx] != RaftServerConstants.LOG_TERMINATE_BYTE) {
+          if (!SegmentedRaftLogFormat.isTerminator(temp[idx])) {
             throw new IOException("Read extra bytes after the terminator!");
           }
         }
@@ -243,7 +267,7 @@ public class LogReader implements Closeable {
     // Each log entry starts with a var-int. Thus a valid entry's first byte
     // should not be 0. So if the terminate byte is 0, we should hit the end
     // of the segment.
-    if (nextByte == RaftServerConstants.LOG_TERMINATE_BYTE) {
+    if (SegmentedRaftLogFormat.isTerminator(nextByte)) {
       verifyTerminator();
       return null;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
index 5f61864..a74d6a8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
@@ -25,6 +25,8 @@ import 
org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader;
 import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -45,6 +47,8 @@ import java.util.function.Consumer;
  * This class will be protected by the RaftServer's lock.
  */
 class LogSegment implements Comparable<Long> {
+  static final Logger LOG = LoggerFactory.getLogger(LogSegment.class);
+
   static long getEntrySize(LogEntryProto entry) {
     final int serialized = 
ServerProtoUtils.removeStateMachineData(entry).getSerializedSize();
     return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 
4;
@@ -103,12 +107,11 @@ class LogSegment implements Comparable<Long> {
     return new LogSegment(storage, false, start, end);
   }
 
-  private static void readSegmentFile(File file, long start, long end,
+  private static int readSegmentFile(File file, long start, long end,
       boolean isOpen, Consumer<LogEntryProto> entryConsumer) throws 
IOException {
+    int count = 0;
     try (LogInputStream in = new LogInputStream(file, start, end, isOpen)) {
-      LogEntryProto next;
-      LogEntryProto prev = null;
-      while ((next = in.nextEntry()) != null) {
+      for(LogEntryProto prev = null, next; (next = in.nextEntry()) != null; 
prev = next) {
         if (prev != null) {
           Preconditions.assertTrue(next.getIndex() == prev.getIndex() + 1,
               "gap between entry %s and entry %s", prev, next);
@@ -117,9 +120,10 @@ class LogSegment implements Comparable<Long> {
         if (entryConsumer != null) {
           entryConsumer.accept(next);
         }
-        prev = next;
+        count++;
       }
     }
+    return count;
   }
 
   static LogSegment loadSegment(RaftStorage storage, File file,
@@ -130,15 +134,20 @@ class LogSegment implements Comparable<Long> {
         LogSegment.newOpenSegment(storage, start) :
         LogSegment.newCloseSegment(storage, start, end);
 
-    readSegmentFile(file, start, end, isOpen, entry -> {
-      segment.append(keepEntryInCache | isOpen, entry);
+    final int entryCount = readSegmentFile(file, start, end, isOpen, entry -> {
+      segment.append(keepEntryInCache || isOpen, entry);
       if (logConsumer != null) {
         logConsumer.accept(entry);
       }
     });
+    LOG.info("Successfully read {} entries from segment file {}", entryCount, 
file);
 
-    // truncate padding if necessary
-    if (file.length() > segment.getTotalSize()) {
+    if (entryCount == 0) {
+      // The segment does not have any entries, delete the file.
+      FileUtils.deleteFile(file);
+      return null;
+    } else if (file.length() > segment.getTotalSize()) {
+      // The segment has extra padding, truncate it.
       FileUtils.truncateFile(file, segment.getTotalSize());
     }
 
@@ -217,7 +226,7 @@ class LogSegment implements Comparable<Long> {
     this.isOpen = isOpen;
     this.startIndex = start;
     this.endIndex = end;
-    totalSize = SegmentedRaftLog.HEADER_BYTES.length;
+    totalSize = SegmentedRaftLogFormat.getHeaderLength();
     hasEntryCache = isOpen;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
index 25291c8..451e713 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
@@ -109,7 +109,9 @@ class RaftLogCache {
       Consumer<LogEntryProto> logConsumer) throws IOException {
     LogSegment logSegment = LogSegment.loadSegment(storage, 
pi.getPath().toFile(),
         pi.startIndex, pi.endIndex, pi.isOpen(), keepEntryInCache, 
logConsumer);
-    addSegment(logSegment);
+    if (logSegment != null) {
+      addSegment(logSegment);
+    }
   }
 
   long getCachedSegmentNum() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index 0dc7cea..f2b5c1a 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -68,9 +68,6 @@ import java.util.function.Consumer;
  * we may have hole when append further log.
  */
 public class SegmentedRaftLog extends RaftLog {
-  static final String HEADER_STR = "RAFTLOG1";
-  static final byte[] HEADER_BYTES = 
HEADER_STR.getBytes(StandardCharsets.UTF_8);
-
   /**
    * I/O task definitions.
    */

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLogFormat.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLogFormat.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLogFormat.java
new file mode 100644
index 0000000..1a00a84
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLogFormat.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.util.CheckedFunction;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+public interface SegmentedRaftLogFormat {
+  class Internal {
+    private static final byte[] HEADER_BYTES = 
"RaftLog1".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] HEADER_BYTES_CLONE = HEADER_BYTES.clone();
+    private static final byte TERMINATOR_BYTE = 0;
+
+    private static void assertHeader() {
+      Preconditions.assertTrue(Arrays.equals(HEADER_BYTES, 
HEADER_BYTES_CLONE));
+    }
+  }
+
+  static int getHeaderLength() {
+    return Internal.HEADER_BYTES.length;
+  }
+
+  static int matchHeader(byte[] bytes, int offset, int length) {
+    Preconditions.assertTrue(length <= getHeaderLength());
+    for(int i = 0; i < length; i++) {
+      if (bytes[offset + i] != Internal.HEADER_BYTES[i]) {
+        return i;
+      }
+    }
+    return length;
+  }
+
+  static <T> T applyHeaderTo(CheckedFunction<byte[], T, IOException> function) 
throws IOException {
+    final T t = function.apply(Internal.HEADER_BYTES);
+    Internal.assertHeader(); // assert that the header is unmodified by the 
function.
+    return t;
+  }
+
+  static byte getTerminator() {
+    return Internal.TERMINATOR_BYTE;
+  }
+
+  static boolean isTerminator(byte b) {
+    return b == Internal.TERMINATOR_BYTE;
+  }
+
+  static boolean isTerminator(byte[] bytes, int offset, int length) {
+    return indexOfNonTerminator(bytes, offset, length) == -1;
+  }
+
+  /**
+   * @return The index of the first non-terminator if it exists.
+   *         Otherwise, return -1, i.e. all bytes are terminator.
+   */
+  static int indexOfNonTerminator(byte[] bytes, int offset, int length) {
+    for(int i = 0; i < length; i++) {
+      if (!isTerminator(bytes[offset + i])) {
+        return i;
+      }
+    }
+    return -1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java 
b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index 95ac858..531d2e2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -33,17 +33,21 @@ import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.impl.ServerState;
 import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
+import org.apache.ratis.server.storage.SegmentedRaftLogFormat;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 
 import java.io.File;
+import java.io.RandomAccessFile;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -57,7 +61,6 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
     extends BaseTest
     implements MiniRaftCluster.Factory.Get<CLUSTER> {
   {
-    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
     LogUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
   }
 
@@ -154,12 +157,57 @@ public abstract class ServerRestartTests<CLUSTER extends 
MiniRaftCluster>
     server.getProxy().close();
   }
 
-  static File getOpenLogFile(RaftServerImpl server) throws Exception {
-    final List<Path> openLogs = 
server.getState().getStorage().getStorageDir().getLogSegmentFiles().stream()
+  static List<Path> getOpenLogFiles(RaftServerImpl server) throws Exception {
+    return 
server.getState().getStorage().getStorageDir().getLogSegmentFiles().stream()
         .filter(LogPathAndIndex::isOpen)
         .map(LogPathAndIndex::getPath)
         .collect(Collectors.toList());
+  }
+
+  static File getOpenLogFile(RaftServerImpl server) throws Exception {
+    final List<Path> openLogs = getOpenLogFiles(server);
     Assert.assertEquals(1, openLogs.size());
     return openLogs.get(0).toFile();
   }
+
+  @Test
+  public void testRestartWithCorruptedLogHeader() throws Exception {
+    try(final MiniRaftCluster cluster = newCluster(NUM_SERVERS)) {
+      runTestRestartWithCorruptedLogHeader(cluster, LOG);
+    }
+  }
+
+  static void runTestRestartWithCorruptedLogHeader(MiniRaftCluster cluster, 
Logger LOG) throws Exception {
+    cluster.start();
+    RaftTestUtil.waitForLeader(cluster);
+
+    // shutdown all servers
+    cluster.getServers().forEach(RaftServerProxy::close);
+
+    for(RaftServerImpl impl : cluster.iterateServerImpls()) {
+      final File openLogFile = JavaUtils.attempt(() -> getOpenLogFile(impl),
+          10, 100, impl.getId() + "-getOpenLogFile", LOG);
+      for(int i = 0; i < SegmentedRaftLogFormat.getHeaderLength(); i++) {
+        assertCorruptedLogHeader(impl.getId(), openLogFile, i, cluster, LOG);
+        Assert.assertTrue(getOpenLogFiles(impl).isEmpty());
+      }
+    }
+  }
+
+  static void assertCorruptedLogHeader(RaftPeerId id, File openLogFile, int 
partialLength,
+      MiniRaftCluster cluster, Logger LOG) throws Exception {
+    Preconditions.assertTrue(partialLength < 
SegmentedRaftLogFormat.getHeaderLength());
+    try(final RandomAccessFile raf = new RandomAccessFile(openLogFile, "rw")) {
+      SegmentedRaftLogFormat.applyHeaderTo(header -> {
+        LOG.info("header    = {}", StringUtils.bytes2HexString(header));
+        final byte[] corrupted = new byte[header.length];
+        System.arraycopy(header, 0, corrupted, 0, partialLength);
+        LOG.info("corrupted = {}", StringUtils.bytes2HexString(corrupted));
+        raf.write(corrupted);
+        return null;
+      });
+    }
+    final RaftServerImpl server = cluster.restartServer(id, false);
+    server.getProxy().close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java 
b/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
index 9cc60a6..58e319d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
@@ -26,11 +26,11 @@ import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.metrics.RatisMetricsRegistry;
 import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
 import org.apache.ratis.server.storage.RaftStorageTestUtils;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -90,12 +90,21 @@ public class TestRaftLogMetrics extends BaseTest
       }
     }
 
-    for (RaftServerProxy rsp: cluster.getServers()) {
-      final String flushTimeMetric = 
RaftStorageTestUtils.getLogFlushTimeMetric(rsp.getId());
+    // For leader, flush must happen before client can get replies.
+    assertFlushCount(cluster.getLeader());
+
+    // For followers, flush can be lagged behind.  Attempt multiple times.
+    for(RaftServerImpl f : cluster.getFollowers()) {
+      JavaUtils.attempt(() -> assertFlushCount(f), 10, 100, f.getId() + 
"-assertFlushCount", null);
+    }
+  }
+
+  static void assertFlushCount(RaftServerImpl server) throws Exception {
+      final String flushTimeMetric = 
RaftStorageTestUtils.getLogFlushTimeMetric(server.getId());
       Timer tm = 
RatisMetricsRegistry.getRegistry().getTimers().get(flushTimeMetric);
       Assert.assertNotNull(tm);
 
-      final MetricsStateMachine stateMachine = 
MetricsStateMachine.get(rsp.getImpl(cluster.getGroupId()));
+      final MetricsStateMachine stateMachine = MetricsStateMachine.get(server);
       final int expectedFlush = stateMachine.getFlushCount();
 
       Assert.assertEquals(expectedFlush, tm.getCount());
@@ -106,6 +115,5 @@ public class TestRaftLogMetrics extends BaseTest
       Assert.assertEquals(expectedFlush,
           ((Long) 
ManagementFactory.getPlatformMBeanServer().getAttribute(oname, "Count"))
               .intValue());
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
index f3b314f..7d9fdf5 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
@@ -104,7 +104,7 @@ public class TestRaftLogReadWrite extends BaseTest {
   public void testReadWriteLog() throws IOException {
     final RaftStorage storage = new RaftStorage(storageDir, 
StartupOption.REGULAR);
     File openSegment = storage.getStorageDir().getOpenLogFile(0);
-    long size = SegmentedRaftLog.HEADER_BYTES.length;
+    long size = SegmentedRaftLogFormat.getHeaderLength();
 
     final LogEntryProto[] entries = new LogEntryProto[100];
     try (LogOutputStream out =
@@ -162,7 +162,7 @@ public class TestRaftLogReadWrite extends BaseTest {
   public void testReadWithPadding() throws IOException {
     final RaftStorage storage = new RaftStorage(storageDir, 
StartupOption.REGULAR);
     File openSegment = storage.getStorageDir().getOpenLogFile(0);
-    long size = SegmentedRaftLog.HEADER_BYTES.length;
+    long size = SegmentedRaftLogFormat.getHeaderLength();
 
     LogEntryProto[] entries = new LogEntryProto[100];
     LogOutputStream out = new LogOutputStream(openSegment, false,

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
index bc2339d..270e279 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
@@ -125,7 +125,7 @@ public class TestRaftLogSegment extends BaseTest {
     Assert.assertEquals(isOpen, segment.isOpen());
     Assert.assertEquals(totalSize, segment.getTotalSize());
 
-    long offset = SegmentedRaftLog.HEADER_BYTES.length;
+    long offset = SegmentedRaftLogFormat.getHeaderLength();
     for (long i = start; i <= end; i++) {
       LogSegment.LogRecord record = segment.getLogRecord(i);
       LogRecordWithEntry lre = segment.getEntryWithoutLoading(i);
@@ -184,7 +184,7 @@ public class TestRaftLogSegment extends BaseTest {
   public void testAppendEntries() throws Exception {
     final long start = 1000;
     LogSegment segment = LogSegment.newOpenSegment(null, start);
-    long size = SegmentedRaftLog.HEADER_BYTES.length;
+    long size = SegmentedRaftLogFormat.getHeaderLength();
     final long max = 8 * 1024 * 1024;
     checkLogSegment(segment, start, start - 1, true, size, 0);
 
@@ -267,7 +267,7 @@ public class TestRaftLogSegment extends BaseTest {
     segment.truncate(start);
     Assert.assertEquals(0, segment.numOfEntries());
     checkLogSegment(segment, start, start - 1, false,
-        SegmentedRaftLog.HEADER_BYTES.length, term);
+        SegmentedRaftLogFormat.getHeaderLength(), term);
   }
 
   @Test
@@ -307,7 +307,7 @@ public class TestRaftLogSegment extends BaseTest {
       out.write(entry);
     }
     Assert.assertEquals(file.length(),
-        size + SegmentedRaftLog.HEADER_BYTES.length);
+        size + SegmentedRaftLogFormat.getHeaderLength());
     try (LogInputStream in = new LogInputStream(file, 0,
         INVALID_LOG_INDEX, true)) {
       LogEntryProto entry = in.nextEntry();
@@ -332,7 +332,7 @@ public class TestRaftLogSegment extends BaseTest {
     LogEntryProto entry = 
ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0);
     final long entrySize = LogSegment.getEntrySize(entry);
 
-    long totalSize = SegmentedRaftLog.HEADER_BYTES.length;
+    long totalSize = SegmentedRaftLogFormat.getHeaderLength();
     long preallocated = 16 * 1024;
     try (LogOutputStream out = new LogOutputStream(file, false,
         max.getSize(), 16 * 1024, 10 * 1024)) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4d723a2c/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
index 9971d30..5cb498a 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
@@ -40,6 +40,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -398,4 +399,16 @@ public class TestSegmentedRaftLog extends BaseTest {
       Assert.assertEquals(5, cache.getNumOfSegments());
     }
   }
+
+
+  @Test
+  public void testSegmentedRaftLogFormatInternalHeader() throws Exception {
+    testFailureCase("testSegmentedRaftLogFormatInternalHeader",
+        () -> SegmentedRaftLogFormat.applyHeaderTo(header -> {
+          LOG.info("header  = " + new String(header, StandardCharsets.UTF_8));
+          header[0] += 1; // try changing the internal header
+          LOG.info("header' = " + new String(header, StandardCharsets.UTF_8));
+          return null;
+        }), IllegalStateException.class);
+  }
 }


Reply via email to