Repository: incubator-ratis Updated Branches: refs/heads/master 06002e67a -> ddb82cd19
RATIS-90. After hitting IOException, RaftClient should randomly pick a known server as leader. Contributed by Jing Zhao. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/ddb82cd1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/ddb82cd1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/ddb82cd1 Branch: refs/heads/master Commit: ddb82cd190f91997d37b6937db1630a96737112c Parents: 06002e6 Author: Jing Zhao <[email protected]> Authored: Mon May 29 10:18:57 2017 -0700 Committer: Jing Zhao <[email protected]> Committed: Mon May 29 10:18:57 2017 -0700 ---------------------------------------------------------------------- .../ratis/client/impl/RaftClientImpl.java | 5 ++-- .../org/apache/ratis/util/CollectionUtils.java | 24 ++++++++++++++++++++ .../ratis/grpc/client/AppendStreamer.java | 3 ++- .../ratis/RaftNotLeaderExceptionBaseTest.java | 20 +++++++++++++++- 4 files changed, 47 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ddb82cd1/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index 40e670d..75ef2a4 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -26,12 +26,10 @@ import org.apache.ratis.protocol.*; import java.io.IOException; import java.io.InterruptedIOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; -import java.util.stream.Collector; import java.util.stream.Collectors; /** A client who sends requests to a raft service. */ @@ -177,7 +175,8 @@ final class RaftClientImpl implements RaftClient { final RaftPeerId oldLeader = request.getServerId(); if (newLeader == null && oldLeader.equals(leaderId)) { - newLeader = CollectionUtils.next(oldLeader, CollectionUtils.as(peers, RaftPeer::getId)); + newLeader = CollectionUtils.random(oldLeader, + peers.stream().map(RaftPeer::getId).collect(Collectors.toList())); } if (newLeader != null && oldLeader.equals(leaderId)) { LOG.debug("{}: change Leader from {} to {}", clientId, oldLeader, newLeader); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ddb82cd1/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java index 05fa2fb..5f68d47 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java @@ -20,11 +20,16 @@ package org.apache.ratis.util; +import java.util.Collection; import java.util.Iterator; +import java.util.List; import java.util.Objects; +import java.util.Random; import java.util.function.Function; public interface CollectionUtils { + Random random = new Random(); + /** * @return the next element in the iteration right after the given element; * if the given element is not in the iteration, return the first one @@ -45,6 +50,25 @@ public interface CollectionUtils { return first; } + /** + * @return a randomly picked element which is not the given element. + */ + static <T> T random(final T given, List<T> list) { + Objects.requireNonNull(given, "given == null"); + Preconditions.assertTrue(list != null && !list.isEmpty(), "c is null or empty"); + + if (list.size() == 1) { + return list.get(0); + } + + T selected; + do { + selected = list.get(random.nextInt(list.size())); + } while (selected == given); + + return selected; + } + static <INPUT, OUTPUT> Iterable<OUTPUT> as( Iterable<INPUT> iteration, Function<INPUT, OUTPUT> converter) { return () -> new Iterator<OUTPUT>() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ddb82cd1/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java index c92820c..f9ff00a 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java @@ -115,7 +115,8 @@ public class AppendStreamer implements Closeable { if (oldLeader == null) { leaderId = peers.keySet().iterator().next(); } else { - leaderId = CollectionUtils.next(oldLeader, peers.keySet()); + leaderId = CollectionUtils.random(oldLeader, + new ArrayList<>(peers.keySet())); } } LOG.debug("{} switches leader from {} to {}. suggested leader: {}", this, http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ddb82cd1/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java index fe38778..83c88f5 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java @@ -46,7 +46,7 @@ public abstract class RaftNotLeaderExceptionBaseTest { public static final Logger LOG = LoggerFactory.getLogger(RaftNotLeaderExceptionBaseTest.class); - public static final int NUM_PEERS = 3; + public static final int NUM_PEERS = 5; @Rule public Timeout globalTimeout = new Timeout(60 * 1000); @@ -70,6 +70,19 @@ public abstract class RaftNotLeaderExceptionBaseTest { @Test public void testHandleNotLeaderException() throws Exception { + testHandleNotLeaderException(false); + } + + /** + * Test handle both IOException and NotLeaderException + */ + @Test + public void testHandleNotLeaderAndIOException() throws Exception { + testHandleNotLeaderException(true); + } + + private void testHandleNotLeaderException(boolean killNewLeader) + throws Exception { RaftTestUtil.waitForLeader(cluster); final RaftPeerId leaderId = cluster.getLeader().getId(); final RaftClient client = cluster.createClient(leaderId); @@ -81,6 +94,11 @@ public abstract class RaftNotLeaderExceptionBaseTest { RaftPeerId newLeader = RaftTestUtil.changeLeader(cluster, leaderId); Assert.assertNotEquals(leaderId, newLeader); + if (killNewLeader) { + // kill the new leader + cluster.killServer(newLeader); + } + RaftClientRpc rpc = client.getClientRpc(); reply= null; for (int i = 0; reply == null && i < 10; i++) {
