Repository: incubator-ratis
Updated Branches:
  refs/heads/master 291f51b42 -> 06002e67a


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index e9c6d65..9086289 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -31,6 +31,7 @@ import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
 import org.apache.ratis.util.CheckedRunnable;
+import org.apache.ratis.util.JavaUtils;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,14 +53,31 @@ public class RaftTestUtil {
   public static final LogEntryProto[] EMPTY_LOGENTRY_ARRAY = new 
LogEntryProto[0];
   static final Logger LOG = LoggerFactory.getLogger(RaftTestUtil.class);
 
+
+  public static RaftServerImpl getImplAsUnchecked(RaftServerProxy proxy) {
+    return JavaUtils.callAsUnchecked(proxy::getImpl);
+  }
+
   public static RaftServerImpl waitForLeader(MiniRaftCluster cluster)
       throws InterruptedException {
+    return waitForLeader(cluster, false);
+  }
+
+  public static RaftServerImpl waitForLeader(
+      MiniRaftCluster cluster, boolean tolerateMultipleLeaders)
+      throws InterruptedException {
     final long sleepTime = (cluster.getMaxTimeout() * 3) >> 1;
     LOG.info(cluster.printServers());
     RaftServerImpl leader = null;
     for(int i = 0; leader == null && i < 10; i++) {
       Thread.sleep(sleepTime);
-      leader = cluster.getLeader();
+      try {
+        leader = cluster.getLeader();
+      } catch(IllegalStateException e) {
+        if (!tolerateMultipleLeaders) {
+          throw e;
+        }
+      }
     }
     LOG.info(cluster.printServers());
     return leader;
@@ -116,8 +134,7 @@ public class RaftTestUtil {
   public static void assertLogEntries(Collection<RaftServerProxy> servers,
       SimpleMessage... expectedMessages) {
     final int size = servers.size();
-    final long count = servers.stream()
-        .map(RaftServerProxy::getImpl)
+    final long count = MiniRaftCluster.getServerStream(servers)
         .filter(RaftServerImpl::isAlive)
         .map(s -> s.getState().getLog())
         .filter(log -> logEntriesContains(log, expectedMessages))

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 8dd5ae8..20e66e6 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -222,7 +222,7 @@ public abstract class RaftReconfigurationBaseTest {
 
       // check configuration manager's internal state
       // each reconf will generate two configurations: (old, new) and (new)
-      cluster.getServersAliveStream()
+      cluster.getServerAliveStream()
           .forEach(server -> {
         ConfigurationManager confManager =
             (ConfigurationManager) Whitebox.getInternalState(server.getState(),

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
new file mode 100644
index 0000000..fcb39dd
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.LogUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public abstract class ReinitializationBaseTest {
+  static {
+    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    LogUtils.setLogLevel(RaftClient.LOG, Level.TRACE);
+  }
+  static final Logger LOG = 
LoggerFactory.getLogger(ReinitializationBaseTest.class);
+
+  static final RaftProperties prop = new RaftProperties();
+
+  public abstract MiniRaftCluster.Factory<? extends MiniRaftCluster> 
getClusterFactory();
+
+  public MiniRaftCluster getCluster(int peerNum) throws IOException {
+    return getClusterFactory().newCluster(peerNum, prop);
+  }
+
+  @Test
+  public void testReinitialize() throws Exception {
+    final MiniRaftCluster cluster = getCluster(0);
+    LOG.info("Start testReinitialize" + cluster.printServers());
+
+    // Start server with an empty conf
+    final RaftConfiguration emptyConf = 
MiniRaftCluster.initConfiguration(Collections.emptyList());
+
+    final List<RaftPeerId> ids = Arrays.asList(MiniRaftCluster.generateIds(3, 
0))
+        .stream().map(RaftPeerId::valueOf).collect(Collectors.toList());
+    ids.stream().forEach(id -> cluster.putNewServer(id, emptyConf, true));
+    LOG.info("putNewServer: " + cluster.printServers());
+
+    cluster.start();
+    LOG.info("start: " + cluster.printServers());
+
+    // Make sure that there are no leaders.
+    TimeUnit.SECONDS.sleep(1);
+    Assert.assertNull(cluster.getLeader());
+
+    // Reinitialize servers
+    final RaftPeer[] peers = cluster.getPeers().toArray(RaftPeer.EMPTY_PEERS);
+    for(RaftPeer p : peers) {
+      final RaftClient client = cluster.createClient(p.getId(), new 
ArrayList<>(Arrays.asList(p)));
+      client.reinitialize(peers, p.getId());
+    }
+    Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster, true));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
index 66004c6..f8a32d9 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
@@ -22,7 +22,6 @@ import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerFactory;
 
 import java.util.Objects;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index 9551da8..44313dd 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -17,16 +17,15 @@
  */
 package org.apache.ratis.server.simulation;
 
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.protocol.*;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +39,7 @@ import java.util.concurrent.TimeUnit;
 class SimulatedServerRpc implements RaftServerRpc {
   static final Logger LOG = LoggerFactory.getLogger(SimulatedServerRpc.class);
 
-  private final RaftServerImpl server;
+  private final RaftServerProxy server;
   private final RequestHandler<RaftServerRequest, RaftServerReply> 
serverHandler;
   private final RequestHandler<RaftClientRequest, RaftClientReply> 
clientHandler;
   private final ExecutorService executor = Executors.newFixedThreadPool(3, 
Daemon::new);
@@ -48,8 +47,7 @@ class SimulatedServerRpc implements RaftServerRpc {
   SimulatedServerRpc(RaftServer server,
       SimulatedRequestReply<RaftServerRequest, RaftServerReply> 
serverRequestReply,
       SimulatedRequestReply<RaftClientRequest, RaftClientReply> 
clientRequestReply) {
-    this.server = server instanceof RaftServerProxy?
-        ((RaftServerProxy)server).getImpl(): (RaftServerImpl)server;
+    this.server = (RaftServerProxy)server;
     this.serverHandler = new RequestHandler<>(server.getId().toString(),
         "serverHandler", serverRequestReply, serverHandlerImpl, 3);
     this.clientHandler = new RequestHandler<>(server.getId().toString(),
@@ -122,7 +120,7 @@ class SimulatedServerRpc implements RaftServerRpc {
       = new RequestHandler.HandlerInterface<RaftServerRequest, 
RaftServerReply>() {
     @Override
     public boolean isAlive() {
-      return server.isAlive();
+      return RaftTestUtil.getImplAsUnchecked(server).isAlive();
     }
 
     @Override
@@ -144,14 +142,17 @@ class SimulatedServerRpc implements RaftServerRpc {
       = new RequestHandler.HandlerInterface<RaftClientRequest, 
RaftClientReply>() {
     @Override
     public boolean isAlive() {
-      return server.isAlive();
+      return RaftTestUtil.getImplAsUnchecked(server).isAlive();
     }
 
     @Override
     public RaftClientReply handleRequest(RaftClientRequest request)
         throws IOException {
       final CompletableFuture<RaftClientReply> future;
-      if (request instanceof SetConfigurationRequest) {
+      if (request instanceof ReinitializeRequest) {
+        future = CompletableFuture.completedFuture(
+            server.reinitialize((ReinitializeRequest) request));
+      } else if (request instanceof SetConfigurationRequest) {
         future = server.setConfigurationAsync((SetConfigurationRequest) 
request);
       } else {
         future = server.submitClientRequestAsync(request);
@@ -159,11 +160,7 @@ class SimulatedServerRpc implements RaftServerRpc {
 
       future.whenCompleteAsync((reply, exception) -> {
         try {
-          IOException e = null;
-          if (exception != null) {
-            e = exception instanceof IOException ?
-                (IOException) exception : new IOException(exception);
-          }
+          final IOException e = IOUtils.asIOException(exception);
           clientHandler.getRpc().sendReply(request, reply, e);
         } catch (IOException e) {
           LOG.warn("Failed to send reply {} for request {} due to exception 
{}",

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestReinitializationWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestReinitializationWithSimulatedRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestReinitializationWithSimulatedRpc.java
new file mode 100644
index 0000000..7fc0c6c
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestReinitializationWithSimulatedRpc.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.server.impl.ReinitializationBaseTest;
+
+public class TestReinitializationWithSimulatedRpc extends 
ReinitializationBaseTest {
+  @Override
+  public MiniRaftCluster.Factory<? extends MiniRaftCluster> 
getClusterFactory() {
+    return MiniRaftClusterWithSimulatedRpc.FACTORY;
+  }
+}

Reply via email to