Repository: incubator-ratis
Updated Branches:
  refs/heads/master b7d089a1a -> 42273af45


RATIS-117. Add test for situation when old leader can/cannot commit log. 
Contributed by Yubo Xu.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/42273af4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/42273af4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/42273af4

Branch: refs/heads/master
Commit: 42273af454d368293e467518431c16f7c3c248aa
Parents: b7d089a
Author: Jing Zhao <[email protected]>
Authored: Thu Nov 9 13:15:26 2017 -0800
Committer: Jing Zhao <[email protected]>
Committed: Thu Nov 9 13:15:26 2017 -0800

----------------------------------------------------------------------
 .../ratis/server/storage/LogOutputStream.java   | 25 ++++--
 .../java/org/apache/ratis/RaftBasicTests.java   | 94 +++++++++++++++++++-
 .../java/org/apache/ratis/RaftTestUtil.java     | 39 +++++++-
 3 files changed, 147 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42273af4/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
index 80e344c..dedfe7e 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogOutputStream.java
@@ -17,11 +17,10 @@
  */
 package org.apache.ratis.server.storage;
 
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.PureJavaCrc32C;
 import org.slf4j.Logger;
@@ -68,11 +67,27 @@ public class LogOutputStream implements Closeable {
     fc = rp.getChannel();
     fc.position(fc.size());
     preallocatedPos = fc.size();
-
     out = new BufferedWriteChannel(fc, bufferSize);
 
-    if (!append) {
-      create();
+    try {
+      fc = rp.getChannel();
+      fc.position(fc.size());
+      preallocatedPos = fc.size();
+
+      out = new BufferedWriteChannel(fc, bufferSize);
+      if (!append) {
+        create();
+      }
+    } catch (IOException ioe) {
+      LOG.warn("Hit IOException while creating log segment " + file
+          + ", delete the partial file.");
+      // hit IOException, clean up the in-progress log file
+      try {
+        FileUtils.deleteFully(file);
+      } catch (IOException e) {
+        LOG.warn("Failed to delete the file " + file, e);
+      }
+      throw ioe;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42273af4/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 9875845..0cd9222 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -21,18 +21,27 @@ import org.apache.log4j.Level;
 import org.apache.ratis.RaftTestUtil.SimpleMessage;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
+
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.storage.RaftStorageTestUtils;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.ExitUtils;
+
+
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.apache.ratis.server.storage.RaftLog;
+
+
+import static org.apache.ratis.RaftTestUtil.*;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.List;
@@ -42,11 +51,10 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.ratis.RaftTestUtil.waitAndKillLeader;
-import static org.apache.ratis.RaftTestUtil.waitForLeader;
 
 public abstract class RaftBasicTests extends BaseTest {
   {
@@ -129,6 +137,84 @@ public abstract class RaftBasicTests extends BaseTest {
   }
 
   @Test
+  public void testOldLeaderCommit() throws Exception {
+    LOG.info("Running testOldLeaderCommit");
+    final MiniRaftCluster cluster = getCluster();
+    final RaftServerImpl leader = waitForLeader(cluster);
+    final RaftPeerId leaderId = leader.getId();
+    final long term = leader.getState().getCurrentTerm();
+
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    final RaftServerImpl followerToSendLog = followers.get(0);
+    for (int i = 1; i < NUM_SERVERS - 1; i++) {
+      RaftServerImpl follower = followers.get(i);
+      cluster.killServer(follower.getId());
+    }
+
+    SimpleMessage[] messages = SimpleMessage.create(1);
+    RaftTestUtil.sendMessageInNewThread(cluster, messages);
+
+    Thread.sleep(cluster.getMaxTimeout() + 100);
+    RaftLog followerLog = followerToSendLog.getState().getLog();
+    assertTrue(logEntriesContains(followerLog, messages));
+
+    LOG.info(String.format("killing old leader: %s", leaderId.toString()));
+    cluster.killServer(leaderId);
+
+    for (int i = 1; i < 3; i++) {
+      RaftServerImpl follower = followers.get(i);
+      LOG.info(String.format("restarting follower: %s", 
follower.getId().toString()));
+      cluster.restartServer(follower.getId(), false );
+    }
+
+    Thread.sleep(cluster.getMaxTimeout() * 5);
+    // confirm the server with log is elected as new leader.
+    final RaftPeerId newLeaderId = waitForLeader(cluster).getId();
+    Assert.assertEquals(followerToSendLog.getId(), newLeaderId);
+
+    cluster.getServerAliveStream()
+            .map(s -> s.getState().getLog())
+            .forEach(log -> RaftTestUtil.assertLogEntries(log,
+                    log.getEntries(1, 2), 1, term, messages));
+    LOG.info("terminating testOldLeaderCommit test");
+  }
+
+  @Test
+  public void testOldLeaderNotCommit() throws Exception {
+    LOG.info("Running testOldLeaderNotCommit");
+    final MiniRaftCluster cluster = getCluster();
+    final RaftPeerId leaderId = waitForLeader(cluster).getId();
+
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    final RaftServerImpl followerToCommit = followers.get(0);
+    for (int i = 1; i < NUM_SERVERS - 1; i++) {
+      RaftServerImpl follower = followers.get(i);
+      cluster.killServer(follower.getId());
+    }
+
+    SimpleMessage[] messages = SimpleMessage.create(1);
+    sendMessageInNewThread(cluster, messages);
+
+    Thread.sleep(cluster.getMaxTimeout() + 100);
+    logEntriesContains(followerToCommit.getState().getLog(), messages);
+
+    cluster.killServer(leaderId);
+    cluster.killServer(followerToCommit.getId());
+
+    for (int i = 1; i < NUM_SERVERS - 1; i++) {
+      RaftServerImpl follower = followers.get(i);
+      cluster.restartServer(follower.getId(), false );
+    }
+    waitForLeader(cluster);
+    Thread.sleep(cluster.getMaxTimeout() + 100);
+
+    final Predicate<LogEntryProto> predicate = l -> l.getTerm() != 1;
+    cluster.getServerAliveStream()
+            .map(s -> s.getState().getLog())
+            .forEach(log -> RaftTestUtil.checkLogEntries(log, messages, 
predicate));
+  }
+
+  @Test
   public void testEnforceLeader() throws Exception {
     LOG.info("Running testEnforceLeader");
     final String leader = "s" + 
ThreadLocalRandom.current().nextInt(NUM_SERVERS);
@@ -161,7 +247,7 @@ public abstract class RaftBasicTests extends BaseTest {
       try(RaftClient client = getCluster().createClient()) {
         for (; step.get() < messages.length; ) {
           final RaftClientReply reply = 
client.send(messages[step.getAndIncrement()]);
-          Assert.assertTrue(reply.isSuccess());
+          assertTrue(reply.isSuccess());
         }
       } catch(Throwable t) {
         if (exceptionInClientThread.compareAndSet(null, t)) {
@@ -244,7 +330,7 @@ public abstract class RaftBasicTests extends BaseTest {
       }
 
       final int n = clients.stream().mapToInt(c -> c.step.get()).sum();
-      Assert.assertTrue(n >= lastStep.get());
+      assertTrue(n >= lastStep.get());
 
       if (n - lastStep.get() < 50 * numClients) { // Change leader at least 50 
steps.
         Thread.sleep(10);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/42273af4/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 927ad88..62545fe 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis;
 
+import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeerId;
@@ -44,6 +45,8 @@ import java.util.Collection;
 import java.util.Objects;
 import java.util.function.BooleanSupplier;
 import java.util.function.IntSupplier;
+import java.util.function.Predicate;
+
 
 public interface RaftTestUtil {
 
@@ -131,6 +134,25 @@ public interface RaftTestUtil {
     return idxExpected == expectedMessages.length;
   }
 
+  static void checkLogEntries(RaftLog log, SimpleMessage[] expectedMessages,
+      Predicate<LogEntryProto> predicate) {
+    TermIndex[] termIndices = log.getEntries(0, Long.MAX_VALUE);
+    for (int i = 0; i < termIndices.length; i++) {
+      for (int j = 0; j < expectedMessages.length; j++) {
+        final LogEntryProto e;
+        try {
+          e = log.get(termIndices[i].getIndex());
+          if (Arrays.equals(expectedMessages[j].getContent().toByteArray(),
+                  e.getSmLogEntry().getData().toByteArray())) {
+            Assert.assertTrue(predicate.test(e));
+          }
+        } catch (IOException exception) {
+          exception.printStackTrace();
+        }
+      }
+    }
+  }
+
   static void assertLogEntries(Collection<RaftServerProxy> servers,
       SimpleMessage... expectedMessages) {
     final int size = servers.size();
@@ -146,7 +168,7 @@ public interface RaftTestUtil {
   }
 
   static void assertLogEntries(RaftLog log, TermIndex[] entries,
-      long startIndex, long expertedTerm, SimpleMessage... expectedMessages) {
+      long startIndex, long expectedTerm, SimpleMessage... expectedMessages) {
     Assert.assertEquals(expectedMessages.length, entries.length);
     for(int i = 0; i < entries.length; i++) {
       final LogEntryProto e;
@@ -155,7 +177,7 @@ public interface RaftTestUtil {
       } catch (IOException exception) {
         throw new RuntimeException(exception);
       }
-      Assert.assertEquals(expertedTerm, e.getTerm());
+      Assert.assertEquals(expectedTerm, e.getTerm());
       Assert.assertEquals(startIndex + i, e.getIndex());
       Assert.assertArrayEquals(expectedMessages[i].getContent().toByteArray(),
           e.getSmLogEntry().getData().toByteArray());
@@ -298,4 +320,17 @@ public interface RaftTestUtil {
 
     Thread.sleep(3 * maxTimeout);
   }
+
+  static void sendMessageInNewThread(MiniRaftCluster cluster, SimpleMessage... 
messages) {
+    RaftPeerId leaderId = cluster.getLeader().getId();
+    new Thread(() -> {
+      try (final RaftClient client = cluster.createClient(leaderId)) {
+        for (SimpleMessage mssg: messages) {
+          client.send(mssg);
+        }
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }).start();
+  }
 }

Reply via email to