This is an automated email from the ASF dual-hosted git repository. ascherbakov pushed a commit to branch ignite-13885 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-13885 by this push: new 60e7551 IGNITE-13885 Raft client wip 4. 60e7551 is described below commit 60e7551dde051fb929137a1c6305347a55066eb0 Author: Alexey Scherbakov <alexey.scherbak...@gmail.com> AuthorDate: Wed Feb 10 19:53:41 2021 +0300 IGNITE-13885 Raft client wip 4. --- .../raft/rpc/ConnectionOpenedEventListener.java} | 18 ++-------- .../ignite/raft/service/AbstractClientService.java | 14 +++----- ...entService.java => RaftGroupClientService.java} | 18 ++++++++-- ...ceImpl.java => RaftGroupClientServiceImpl.java} | 6 +++- .../org/apache/ignite/raft/service/RouteTable.java | 16 ++++----- ...ntTest.java => RaftGroupClientServiceTest.java} | 41 ++++++++++++++++------ .../org/apache/ignite/raft/rpc/CustomRequest.java | 4 +++ .../org/apache/ignite/raft/rpc/CustomResponse.java | 4 +++ ...cessor.java => TestCustomRequestProcessor.java} | 17 ++++----- .../raft/rpc/TestGetLeaderRequestProcessor.java | 3 +- 10 files changed, 83 insertions(+), 58 deletions(-) diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/ConnectionOpenedEventListener.java similarity index 57% copy from modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java copy to modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/ConnectionOpenedEventListener.java index c9e76c2..3011f99 100644 --- a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java +++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/rpc/ConnectionOpenedEventListener.java @@ -16,21 +16,9 @@ */ package org.apache.ignite.raft.rpc; - -import org.apache.ignite.raft.rpc.CliRequests.GetLeaderRequest; -import org.apache.ignite.raft.rpc.RpcRequests.PingRequest; - /** - * Ping request processor. + * */ -public class TestGetLeaderRequestProcessor implements RpcProcessor<GetLeaderRequest> { - @Override public void handleRequest(RpcContext rpcCtx, GetLeaderRequest request) { - CliRequests.GetLeaderResponse.Builder resp = MessageBuilderFactory.DEFAULT.createGetLeaderResponse(); - resp.setLeaderId("127.0.0.1:8081"); - rpcCtx.sendResponse(resp.build()); - } - - @Override public String interest() { - return GetLeaderRequest.class.getName(); - } +public interface ConnectionOpenedEventListener { + void onOpened(final String remoteAddress, final Connection conn); } diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/AbstractClientService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/AbstractClientService.java index da09246..562ccf0 100644 --- a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/AbstractClientService.java +++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/AbstractClientService.java @@ -44,10 +44,6 @@ public abstract class AbstractClientService implements ClientService { protected volatile RpcClient rpcClient; protected RpcOptions rpcOptions; - public RpcClient getRpcClient() { - return this.rpcClient; - } - @Override public boolean isConnected(final Endpoint endpoint) { final RpcClient rc = this.rpcClient; @@ -160,12 +156,12 @@ public abstract class AbstractClientService implements ClientService { if (err == null) { Status status = Status.OK(); - Message msg; + T msg; if (result instanceof ErrorResponse) { status = handleErrorResponse((ErrorResponse) result); - msg = (Message) result; + msg = (T) result; } else { - msg = (Message) result; + msg = (T) result; } if (done != null) { try { @@ -203,12 +199,12 @@ public abstract class AbstractClientService implements ClientService { } catch (final InterruptedException e) { Thread.currentThread().interrupt(); future.completeExceptionally(e); - // should be in another thread to avoid dead locking. + // Should be in another thread to avoid deadlocking. RpcUtils.runClosureInExecutor(currExecutor, done, new Status(RaftError.EINTR, "Sending rpc was interrupted")); } catch (final RemotingException e) { future.completeExceptionally(e); - // should be in another thread to avoid dead locking. + // Should be in another thread to avoid deadlocking. RpcUtils.runClosureInExecutor(currExecutor, done, new Status(RaftError.EINTERNAL, "Fail to send a RPC request:" + e.getMessage())); diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RaftGroupClientService.java similarity index 90% rename from modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java rename to modules/raft-client/src/main/java/org/apache/ignite/raft/service/RaftGroupClientService.java index cd0d6ca..1ca44ab 100644 --- a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientService.java +++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RaftGroupClientService.java @@ -22,11 +22,12 @@ import org.apache.ignite.raft.closure.RpcResponseClosure; import org.apache.ignite.raft.rpc.CliRequests; import org.apache.ignite.raft.rpc.Message; import org.apache.ignite.raft.rpc.RpcRequests; +import org.apache.ignite.raft.rpc.RpcRequests.PingRequest; /** - * Cli RPC client service. + * Raft group RPC client service. */ -public interface CliClientService extends ClientService { +public interface RaftGroupClientService extends ClientService { /** * Ping a node. * @@ -35,7 +36,7 @@ public interface CliClientService extends ClientService { * @param done callback * @return a future with result */ - Future<Message> ping(Endpoint endpoint, RpcRequests.PingRequest request, + Future<Message> ping(Endpoint endpoint, PingRequest request, RpcResponseClosure<RpcRequests.ErrorResponse> done); /** @@ -161,4 +162,15 @@ public interface CliClientService extends ClientService { */ Future<Message> getPeers(Endpoint endpoint, CliRequests.GetPeersRequest request, RpcResponseClosure<CliRequests.GetPeersResponse> done); + + /** + * Send custom request. + * + * @param endpoint server address + * @param request request data + * @param done callback + * @return a future with result + */ + Future<Message> sendCustom(Endpoint endpoint, Message request, + RpcResponseClosure<Message> done); } diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RaftGroupClientServiceImpl.java similarity index 94% rename from modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java rename to modules/raft-client/src/main/java/org/apache/ignite/raft/service/RaftGroupClientServiceImpl.java index f6211ff..cb59429 100644 --- a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/CliClientServiceImpl.java +++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RaftGroupClientServiceImpl.java @@ -43,7 +43,7 @@ import org.apache.ignite.raft.rpc.RpcRequests.ErrorResponse; /** * */ -public class CliClientServiceImpl extends AbstractClientService implements CliClientService { +public class RaftGroupClientServiceImpl extends AbstractClientService implements RaftGroupClientService { @Override public Future<Message> ping(Endpoint endpoint, RpcRequests.PingRequest request, final RpcResponseClosure<ErrorResponse> done) { return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout()); @@ -114,4 +114,8 @@ public class CliClientServiceImpl extends AbstractClientService implements CliCl final RpcResponseClosure<GetPeersResponse> done) { return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout()); } + + @Override public Future<Message> sendCustom(Endpoint endpoint, Message request, RpcResponseClosure<Message> done) { + return invokeWithDone(endpoint, request, done, this.rpcOptions.getRpcDefaultTimeout()); + } } diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RouteTable.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RouteTable.java index 916a768..65e6f2b 100644 --- a/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RouteTable.java +++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/service/RouteTable.java @@ -82,7 +82,7 @@ public class RouteTable { /** * Get the cached leader of the group, return it when found, null otherwise. - * Make sure calls {@link #refreshLeader(CliClientService, String, int)} already + * Make sure calls {@link #refreshLeader(RaftGroupClientService, String, int)} already * before invoke this method. * * @param groupId raft group id @@ -174,7 +174,7 @@ public class RouteTable { * @param timeoutMs timeout millis * @return operation status */ - public Status refreshLeader(final CliClientService cliClientService, final String groupId, final int timeoutMs) + public Status refreshLeader(final RaftGroupClientService raftGroupClientService, final String groupId, final int timeoutMs) throws InterruptedException, TimeoutException { final Configuration conf = getConfiguration(groupId); @@ -188,7 +188,7 @@ public class RouteTable { final CliRequests.GetLeaderRequest request = rb.build(); TimeoutException timeoutException = null; for (final PeerId peer : conf) { - if (!cliClientService.connect(peer.getEndpoint())) { + if (!raftGroupClientService.connect(peer.getEndpoint())) { if (st.isOk()) { st.setError(-1, "Fail to init channel to %s", peer); } else { @@ -197,7 +197,7 @@ public class RouteTable { } continue; } - final Future<Message> result = cliClientService.getLeader(peer.getEndpoint(), request, null); + final Future<Message> result = raftGroupClientService.getLeader(peer.getEndpoint(), request, null); try { final Message msg = result.get(timeoutMs, TimeUnit.MILLISECONDS); if (msg instanceof RpcRequests.ErrorResponse) { @@ -230,7 +230,7 @@ public class RouteTable { return st; } - public Status refreshConfiguration(final CliClientService cliClientService, final String groupId, + public Status refreshConfiguration(final RaftGroupClientService raftGroupClientService, final String groupId, final int timeoutMs) throws InterruptedException, TimeoutException { final Configuration conf = getConfiguration(groupId); if (conf == null) { @@ -240,14 +240,14 @@ public class RouteTable { final Status st = Status.OK(); PeerId leaderId = selectLeader(groupId); if (leaderId == null) { - refreshLeader(cliClientService, groupId, timeoutMs); + refreshLeader(raftGroupClientService, groupId, timeoutMs); leaderId = selectLeader(groupId); } if (leaderId == null) { st.setError(-1, "Fail to get leader of group %s", groupId); return st; } - if (!cliClientService.connect(leaderId.getEndpoint())) { + if (!raftGroupClientService.connect(leaderId.getEndpoint())) { st.setError(-1, "Fail to init channel to %s", leaderId); return st; } @@ -255,7 +255,7 @@ public class RouteTable { rb.setGroupId(groupId); rb.setLeaderId(leaderId.toString()); try { - final Message result = cliClientService.getPeers(leaderId.getEndpoint(), rb.build(), null).get(timeoutMs, + final Message result = raftGroupClientService.getPeers(leaderId.getEndpoint(), rb.build(), null).get(timeoutMs, TimeUnit.MILLISECONDS); if (result instanceof CliRequests.GetPeersResponse) { final CliRequests.GetPeersResponse resp = (CliRequests.GetPeersResponse) result; diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftGroupClientServiceTest.java similarity index 68% rename from modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java rename to modules/raft-client/src/test/java/org/apache/ignite/raft/RaftGroupClientServiceTest.java index 54404a8..f3257fb 100644 --- a/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftClientTest.java +++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/RaftGroupClientServiceTest.java @@ -7,23 +7,30 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.raft.closure.RpcResponseClosureAdapter; +import org.apache.ignite.raft.rpc.CustomRequest; +import org.apache.ignite.raft.rpc.CustomResponse; import org.apache.ignite.raft.rpc.Message; import org.apache.ignite.raft.rpc.MessageBuilderFactory; import org.apache.ignite.raft.rpc.RpcOptions; import org.apache.ignite.raft.rpc.RpcRequests; import org.apache.ignite.raft.rpc.RpcRequests.ErrorResponse; +import org.apache.ignite.raft.rpc.TestCustomRequestProcessor; import org.apache.ignite.raft.rpc.TestGetLeaderRequestProcessor; import org.apache.ignite.raft.rpc.TestPingRequestProcessor; import org.apache.ignite.raft.rpc.impl.LocalRpcServer; -import org.apache.ignite.raft.service.CliClientServiceImpl; +import org.apache.ignite.raft.service.RaftGroupClientServiceImpl; import org.apache.ignite.raft.service.RouteTable; import org.junit.After; import org.junit.Before; import org.junit.Test; -public class RaftClientTest { - private static final System.Logger LOG = System.getLogger(RaftClientTest.class.getName()); +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class RaftGroupClientServiceTest { + private static final System.Logger LOG = System.getLogger(RaftGroupClientServiceTest.class.getName()); private static final Configuration initConf = new Configuration(Arrays.asList( new PeerId("127.0.0.1", 8080), @@ -39,6 +46,7 @@ public class RaftClientTest { LocalRpcServer srv = new LocalRpcServer(peer.getEndpoint()); srv.registerProcessor(new TestPingRequestProcessor()); srv.registerProcessor(new TestGetLeaderRequestProcessor()); + srv.registerProcessor(new TestCustomRequestProcessor()); srv.init(new RpcOptions()); srvs.add(srv); } @@ -54,29 +62,42 @@ public class RaftClientTest { } @Test + public void testCustom() throws TimeoutException, InterruptedException, ExecutionException { + final RaftGroupClientServiceImpl cliClientService = new RaftGroupClientServiceImpl(); + cliClientService.init(new RpcOptions()); + + Future<Message> resp = cliClientService.sendCustom(initConf.getPeers().get(0).getEndpoint(), new CustomRequest(), null); + + assertTrue(resp.get() instanceof CustomResponse); + } + + @Test public void testPing() throws TimeoutException, InterruptedException, ExecutionException { String groupId = "unittest"; RouteTable.getInstance().updateConfiguration(groupId, initConf); - final CliClientServiceImpl cliClientService = new CliClientServiceImpl(); + final RaftGroupClientServiceImpl cliClientService = new RaftGroupClientServiceImpl(); cliClientService.init(new RpcOptions()); RpcRequests.PingRequest.Builder builder = MessageBuilderFactory.DEFAULT.createPingRequest(); builder.setSendTimestamp(System.currentTimeMillis()); RpcRequests.PingRequest req = builder.build(); + AtomicReference<Status> ref = new AtomicReference<>(); + RpcResponseClosureAdapter<ErrorResponse> done = new RpcResponseClosureAdapter<>() { @Override public void run(Status status) { - System.out.println(); + ref.set(status); } }; Future<Message> resp = cliClientService.ping(initConf.getPeers().get(0).getEndpoint(), req, done); - Message msg = resp.get(); + ErrorResponse msg = (ErrorResponse) resp.get(); - System.out.println(); + assertEquals(RaftError.SUCCESS.getNumber(), msg.getErrorCode()); + assertTrue(ref.get().isOk()); } @Test @@ -93,10 +114,10 @@ public class RaftClientTest { RouteTable.getInstance().updateConfiguration(groupId, initConf); - final CliClientServiceImpl cliClientService = new CliClientServiceImpl(); - cliClientService.init(new RpcOptions()); + final RaftGroupClientServiceImpl raftGroupClientService = new RaftGroupClientServiceImpl(); + raftGroupClientService.init(new RpcOptions()); - if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId, 1000).isOk()) { + if (!RouteTable.getInstance().refreshLeader(raftGroupClientService, groupId, 1000).isOk()) { throw new IllegalStateException("Refresh leader failed"); } diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/CustomRequest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/CustomRequest.java new file mode 100644 index 0000000..8f53d99 --- /dev/null +++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/CustomRequest.java @@ -0,0 +1,4 @@ +package org.apache.ignite.raft.rpc; + +public class CustomRequest implements Message { +} diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/CustomResponse.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/CustomResponse.java new file mode 100644 index 0000000..b795f2c --- /dev/null +++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/CustomResponse.java @@ -0,0 +1,4 @@ +package org.apache.ignite.raft.rpc; + +public class CustomResponse implements Message { +} diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestCustomRequestProcessor.java similarity index 60% copy from modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java copy to modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestCustomRequestProcessor.java index c9e76c2..a5c64a0 100644 --- a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java +++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestCustomRequestProcessor.java @@ -17,20 +17,17 @@ package org.apache.ignite.raft.rpc; -import org.apache.ignite.raft.rpc.CliRequests.GetLeaderRequest; -import org.apache.ignite.raft.rpc.RpcRequests.PingRequest; - /** * Ping request processor. */ -public class TestGetLeaderRequestProcessor implements RpcProcessor<GetLeaderRequest> { - @Override public void handleRequest(RpcContext rpcCtx, GetLeaderRequest request) { - CliRequests.GetLeaderResponse.Builder resp = MessageBuilderFactory.DEFAULT.createGetLeaderResponse(); - resp.setLeaderId("127.0.0.1:8081"); - rpcCtx.sendResponse(resp.build()); +public class TestCustomRequestProcessor implements RpcProcessor<CustomRequest> { + @Override + public void handleRequest(final RpcContext rpcCtx, final CustomRequest request) { + rpcCtx.sendResponse(new CustomResponse()); } - @Override public String interest() { - return GetLeaderRequest.class.getName(); + @Override + public String interest() { + return CustomRequest.class.getName(); } } diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java index c9e76c2..934a2ec 100644 --- a/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java +++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/rpc/TestGetLeaderRequestProcessor.java @@ -18,10 +18,9 @@ package org.apache.ignite.raft.rpc; import org.apache.ignite.raft.rpc.CliRequests.GetLeaderRequest; -import org.apache.ignite.raft.rpc.RpcRequests.PingRequest; /** - * Ping request processor. + * Get leader request processor. */ public class TestGetLeaderRequestProcessor implements RpcProcessor<GetLeaderRequest> { @Override public void handleRequest(RpcContext rpcCtx, GetLeaderRequest request) {