Add a Builder for RaftClient.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/336874fa Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/336874fa Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/336874fa Branch: refs/heads/master Commit: 336874fa36cd040df2d698bfc1030983ab3d0331 Parents: 7e71a2e Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Fri Jan 13 09:48:27 2017 +0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Fri Jan 13 09:48:27 2017 +0800 ---------------------------------------------------------------------- .../org/apache/ratis/client/RaftClient.java | 76 +++++++++++++++++++- .../ratis/client/impl/ClientImplUtils.java | 33 +++++++++ .../ratis/client/impl/RaftClientImpl.java | 21 ++---- .../java/org/apache/ratis/MiniRaftCluster.java | 15 ++-- .../ratis/RaftNotLeaderExceptionBaseTest.java | 9 ++- .../impl/RaftReconfigurationBaseTest.java | 41 +++++------ 6 files changed, 145 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/336874fa/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index e4e0b84..72dc1ca 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -17,6 +17,9 @@ */ package org.apache.ratis.client; +import com.google.common.base.Preconditions; +import org.apache.ratis.client.impl.ClientImplUtils; +import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeer; @@ -25,6 +28,8 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; /** A client who sends requests to a raft service. */ public interface RaftClient extends Closeable { @@ -34,6 +39,9 @@ public interface RaftClient extends Closeable { /** @return the id of this client. */ String getId(); + /** @return the request sender of this client. */ + RaftClientRequestSender getRequestSender(); + /** * Send the given message to the raft service. * The message may change the state of the service. @@ -45,5 +53,71 @@ public interface RaftClient extends Closeable { RaftClientReply sendReadOnly(Message message) throws IOException; /** Send set configuration request to the raft service. */ - RaftClientReply setConfiguration(RaftPeer[] peersInNewConf) throws IOException; + RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException; + + /** @return a {@link Builder}. */ + static Builder newBuilder() { + return new Builder(); + } + + /** To build {@link RaftClient} objects. */ + class Builder { + private static final AtomicInteger COUNT = new AtomicInteger(); + + private String clientId = RaftClient.class.getSimpleName() + COUNT.incrementAndGet(); + private RaftClientRequestSender requestSender; + private Collection<RaftPeer> servers; + private String leaderId; + private RaftProperties properties; + private int retryInterval = RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_DEFAULT; + + private Builder() {} + + /** @return a {@link RaftClient} object. */ + public RaftClient build() { + Preconditions.checkNotNull(requestSender); + Preconditions.checkNotNull(servers); + + if (leaderId == null) { + leaderId = servers.iterator().next().getId(); //use the first peer + } + if (properties != null) { + retryInterval = properties.getInt( + RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_KEY, + RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_DEFAULT); + } + return ClientImplUtils.newRaftClient(clientId, servers, leaderId, + requestSender, retryInterval); + } + + /** Set {@link RaftClient} ID. */ + public Builder setClientId(String clientId) { + this.clientId = clientId; + return this; + } + + /** Set servers. */ + public Builder setServers(Collection<RaftPeer> servers) { + this.servers = servers; + return this; + } + + /** Set leader ID. */ + public Builder setLeaderId(String leaderId) { + this.leaderId = leaderId; + return this; + } + + /** Set {@link RaftClientRequestSender}. */ + public Builder setRequestSender(RaftClientRequestSender requestSender) { + this.requestSender = requestSender; + return this; + } + + /** Set {@link RaftProperties}. */ + public Builder setProperties(RaftProperties properties) { + this.properties = properties; + return this; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/336874fa/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java new file mode 100644 index 0000000..472c8b4 --- /dev/null +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.client.impl; + +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.protocol.RaftPeer; + +import java.util.Collection; + +/** Utilities for the client implementation. */ +public class ClientImplUtils { + public static RaftClient newRaftClient( + String clientId, Collection< RaftPeer > peers, String leaderId, + RaftClientRequestSender requestSender, int retryInterval) { + return new RaftClientImpl(clientId, peers, leaderId, requestSender, retryInterval); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/336874fa/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index e101554..2f5d450 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -17,12 +17,8 @@ */ package org.apache.ratis.client.impl; -import com.google.common.annotations.VisibleForTesting; - import org.apache.ratis.client.RaftClient; -import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.client.RaftClientRequestSender; -import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.*; import org.apache.ratis.util.RaftUtils; import org.apache.ratis.util.StringUtils; @@ -37,9 +33,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; /** A client who sends requests to a raft service. */ -public final class RaftClientImpl implements RaftClient { - public static final long DEFAULT_SEQNUM = 0; - +final class RaftClientImpl implements RaftClient { private final String clientId; private final RaftClientRequestSender requestSender; private final Map<String, RaftPeer> peers; @@ -47,18 +41,15 @@ public final class RaftClientImpl implements RaftClient { private volatile String leaderId; - public RaftClientImpl( - String clientId, Collection<RaftPeer> peers, - RaftClientRequestSender requestSender, String leaderId, - RaftProperties properties) { + RaftClientImpl( + String clientId, Collection<RaftPeer> peers, String leaderId, + RaftClientRequestSender requestSender, int retryInterval) { this.clientId = clientId; this.requestSender = requestSender; this.peers = peers.stream().collect( Collectors.toMap(RaftPeer::getId, Function.identity())); this.leaderId = leaderId != null? leaderId : peers.iterator().next().getId(); - this.retryInterval = properties.getInt( - RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_KEY, - RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_DEFAULT); + this.retryInterval = retryInterval; } @Override @@ -160,7 +151,7 @@ public final class RaftClientImpl implements RaftClient { } } - @VisibleForTesting + @Override public RaftClientRequestSender getRequestSender() { return requestSender; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/336874fa/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index 60cbb9c..7eb6ddb 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -18,10 +18,8 @@ package org.apache.ratis; import com.google.common.base.Preconditions; - import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRequestSender; -import org.apache.ratis.client.impl.RaftClientImpl; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.server.RaftServerConfigKeys; @@ -40,13 +38,13 @@ import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT; - import java.io.File; import java.io.IOException; import java.util.*; import java.util.stream.Collectors; +import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT; + public abstract class MiniRaftCluster { public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class); public static final DelayLocalExecutionInjection logSyncDelay = @@ -393,8 +391,13 @@ public abstract class MiniRaftCluster { } public RaftClient createClient(String clientId, String leaderId) { - return new RaftClientImpl(clientId, conf.getPeers(), - getRaftClientRequestSender(), leaderId, properties); + return RaftClient.newBuilder() + .setClientId(clientId) + .setServers(conf.getPeers()) + .setLeaderId(leaderId) + .setRequestSender(getRaftClientRequestSender()) + .setProperties(properties) + .build(); } public void shutdown() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/336874fa/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java index 6d25835..3ac3bf4 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java @@ -21,7 +21,6 @@ import org.apache.log4j.Level; import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRequestSender; -import org.apache.ratis.client.impl.RaftClientImpl; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeer; @@ -34,12 +33,12 @@ import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM; - import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM; + public abstract class RaftNotLeaderExceptionBaseTest { static { RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); @@ -85,7 +84,7 @@ public abstract class RaftNotLeaderExceptionBaseTest { String newLeader = RaftTestUtil.changeLeader(cluster, leaderId); Assert.assertNotEquals(leaderId, newLeader); - RaftClientRequestSender rpc = ((RaftClientImpl)client).getRequestSender(); + RaftClientRequestSender rpc = client.getRequestSender(); reply= null; for (int i = 0; reply == null && i < 10; i++) { try { @@ -131,7 +130,7 @@ public abstract class RaftNotLeaderExceptionBaseTest { } LOG.info(cluster.printServers()); - RaftClientRequestSender rpc = ((RaftClientImpl)client).getRequestSender(); + RaftClientRequestSender rpc = client.getRequestSender(); RaftClientReply reply = null; // it is possible that the remote peer's rpc server is not ready. need retry for (int i = 0; reply == null && i < 10; i++) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/336874fa/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 30b334f..847678a 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -17,20 +17,6 @@ */ package org.apache.ratis.server.impl; -import static java.util.Arrays.asList; -import static org.apache.ratis.MiniRaftCluster.logSyncDelay; -import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM; -import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf; -import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.log4j.Level; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.MiniRaftCluster.PeerChanges; @@ -38,13 +24,8 @@ import org.apache.ratis.RaftTestUtil; import org.apache.ratis.RaftTestUtil.SimpleMessage; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientRequestSender; -import org.apache.ratis.client.impl.RaftClientImpl; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.ReconfigurationInProgressException; -import org.apache.ratis.protocol.ReconfigurationTimeoutException; -import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.protocol.*; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.simulation.RequestHandler; import org.apache.ratis.server.storage.RaftLog; @@ -56,6 +37,20 @@ import org.mockito.internal.util.reflection.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.Arrays.asList; +import static org.apache.ratis.MiniRaftCluster.logSyncDelay; +import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM; +import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf; +import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; + public abstract class RaftReconfigurationBaseTest { static { RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); @@ -255,7 +250,7 @@ public abstract class RaftReconfigurationBaseTest { asList(c1.allPeersInNewConf)); Assert.assertFalse(cluster.getLeader().getRaftConf().isTransitional()); - final RaftClientRequestSender sender = ((RaftClientImpl)client).getRequestSender(); + final RaftClientRequestSender sender = client.getRequestSender(); final SetConfigurationRequest request = new SetConfigurationRequest( "client", leaderId, DEFAULT_SEQNUM, c1.allPeersInNewConf); try { @@ -472,7 +467,7 @@ public abstract class RaftReconfigurationBaseTest { try(final RaftClient client2 = cluster.createClient("client2", leaderId)) { latch.await(); LOG.info("client2 starts to change conf"); - final RaftClientRequestSender sender2 = ((RaftClientImpl)client2).getRequestSender(); + final RaftClientRequestSender sender2 = client2.getRequestSender(); sender2.sendRequest(new SetConfigurationRequest( "client2", leaderId, DEFAULT_SEQNUM, peersInRequest2)); } catch (ReconfigurationInProgressException e) { @@ -537,7 +532,7 @@ public abstract class RaftReconfigurationBaseTest { new Thread(() -> { try(final RaftClient client = cluster.createClient("client1", leaderId)) { LOG.info("client starts to change conf"); - final RaftClientRequestSender sender = ((RaftClientImpl)client).getRequestSender(); + final RaftClientRequestSender sender = client.getRequestSender(); RaftClientReply reply = sender.sendRequest(new SetConfigurationRequest( "client", leaderId, DEFAULT_SEQNUM, change.allPeersInNewConf)); if (reply.isNotLeader()) {
