TAJO-1050: RPC client does not retry during connecting. (Jihun Kang via jinho)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/5b31fc42 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/5b31fc42 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/5b31fc42 Branch: refs/heads/block_iteration Commit: 5b31fc4207ef1de2418cafbc6d3e714fa03849d0 Parents: 28282b5 Author: jhkim <[email protected]> Authored: Sat Sep 20 16:21:33 2014 +0900 Committer: jhkim <[email protected]> Committed: Sat Sep 20 16:21:33 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../main/java/org/apache/tajo/cli/TajoCli.java | 21 +++-- .../java/org/apache/tajo/client/TajoClient.java | 20 +++-- .../tajo/master/TajoMasterClientService.java | 2 + .../org/apache/tajo/rpc/AsyncRpcClient.java | 11 +-- .../org/apache/tajo/rpc/AsyncRpcServer.java | 4 +- .../org/apache/tajo/rpc/BlockingRpcClient.java | 13 +-- .../org/apache/tajo/rpc/NettyClientBase.java | 86 +++++++++++------- .../org/apache/tajo/rpc/RpcConnectionPool.java | 16 +++- .../java/org/apache/tajo/rpc/TestAsyncRpc.java | 91 +++++++++++++++++++- .../org/apache/tajo/rpc/TestBlockingRpc.java | 73 +++++++++++++--- 11 files changed, 261 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 5d92881..7b2f77e 100644 --- a/CHANGES +++ b/CHANGES @@ -143,6 +143,9 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-1050: RPC client does not retry during connecting. + (Jihun Kang via jinho) + TAJO-948: 'INSERT INTO' statement to non existence table casuses NPE. (Jongyoung Park via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java index 7c96e34..d5040bd 100644 --- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java +++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java @@ -21,8 +21,6 @@ package org.apache.tajo.cli; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.protobuf.ServiceException; -import jline.TerminalFactory; -import jline.TerminalFactory.Flavor; import jline.UnsupportedTerminal; import jline.console.ConsoleReader; import org.apache.commons.cli.*; @@ -36,13 +34,15 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.util.FileUtil; -import org.apache.tajo.util.HAServiceUtil; import java.io.*; import java.lang.reflect.Constructor; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.*; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; import static org.apache.tajo.cli.ParsedResult.StatementType.META; import static org.apache.tajo.cli.ParsedResult.StatementType.STATEMENT; @@ -401,6 +401,7 @@ public class TajoCli { history.addStatement(parsed.getHistoryStatement() + (parsed.getType() == STATEMENT ? ";" : "")); } } + exitCode = executeParsedResults(parsedResults); currentPrompt = updatePrompt(parser.getState()); @@ -494,7 +495,17 @@ public class TajoCli { private int executeQuery(String statement) throws ServiceException, IOException { checkMasterStatus(); long startTime = System.currentTimeMillis(); - ClientProtos.SubmitQueryResponse response = client.executeQuery(statement); + ClientProtos.SubmitQueryResponse response = null; + try{ + response = client.executeQuery(statement); + } catch (ServiceException e){ + displayFormatter.printErrorMessage(sout, e.getMessage()); + wasError = true; + } catch(Throwable te){ + displayFormatter.printErrorMessage(sout, te); + wasError = true; + } + if (response == null) { displayFormatter.printErrorMessage(sout, "response is null"); wasError = true; http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java index cc993f3..ab3d874 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java @@ -61,6 +61,7 @@ import java.sql.SQLException; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.SerializedResultSet; @@ -82,6 +83,8 @@ public class TajoClient implements Closeable { private volatile TajoIdProtos.SessionIdProto sessionId; + private AtomicBoolean closed = new AtomicBoolean(false); + public TajoClient(TajoConf conf) throws IOException { this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), null); } @@ -115,11 +118,14 @@ public class TajoClient implements Closeable { } public boolean isConnected() { - try { - return connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false).isConnected(); - } catch (Exception e) { - return false; + if(!closed.get()){ + try { + return connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false).isConnected(); + } catch (Throwable e) { + return false; + } } + return false; } public TajoClient(InetSocketAddress addr) throws IOException { @@ -152,12 +158,16 @@ public class TajoClient implements Closeable { @Override public void close() { + if(closed.getAndSet(true)){ + return; + } + // remove session try { NettyClientBase client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false); TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub(); tajoMaster.removeSession(null, sessionId); - } catch (Exception e) { + } catch (Throwable e) { } if(connPool != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index e69393a..738b643 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -264,6 +264,8 @@ public class TajoMasterClientService extends AbstractService { } catch (Exception e) { LOG.error(e.getMessage(), e); SubmitQueryResponse.Builder responseBuilder = ClientProtos.SubmitQueryResponse.newBuilder(); + responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); + responseBuilder.setIsForwarded(true); responseBuilder.setUserName(context.getConf().getVar(ConfVars.USERNAME)); responseBuilder.setResultCode(ResultCode.ERROR); if (e.getMessage() != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java index c84d6b6..7a416a8 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java @@ -57,13 +57,8 @@ public class AsyncRpcClient extends NettyClientBase { * new an instance through this constructor. */ AsyncRpcClient(final Class<?> protocol, - final InetSocketAddress addr) throws Exception { - this(protocol, addr, RpcChannelFactory.getSharedClientChannelFactory()); - } - - AsyncRpcClient(final Class<?> protocol, - final InetSocketAddress addr, ClientSocketChannelFactory factory) - throws Exception { + final InetSocketAddress addr, ClientSocketChannelFactory factory, int retries) + throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException { this.protocol = protocol; String serviceClassName = protocol.getName() + "$" @@ -74,7 +69,7 @@ public class AsyncRpcClient extends NettyClientBase { this.handler = new ClientChannelUpstreamHandler(); pipeFactory = new ProtoPipelineFactory(handler, RpcResponse.getDefaultInstance()); - super.init(addr, pipeFactory, factory); + super.init(addr, pipeFactory, factory, retries); rpcChannel = new ProxyRpcChannel(); this.key = new RpcConnectionKey(addr, protocol, true); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java index b7e3cb6..f9c5d3b 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java @@ -126,11 +126,13 @@ public class AsyncRpcServer extends NettyServerBase { @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception{ + if (e.getCause() instanceof RemoteCallException) { RemoteCallException callException = (RemoteCallException) e.getCause(); e.getChannel().write(callException.getResponse()); + } else { + LOG.error(e.getCause()); } - throw new RemoteException(serviceName, e.getCause()); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java index 3d6989a..03d5d3e 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java @@ -60,13 +60,8 @@ public class BlockingRpcClient extends NettyClientBase { * new an instance through this constructor. */ BlockingRpcClient(final Class<?> protocol, - final InetSocketAddress addr) throws Exception { - this(protocol, addr, RpcChannelFactory.getSharedClientChannelFactory()); - } - - BlockingRpcClient(final Class<?> protocol, - final InetSocketAddress addr, ClientSocketChannelFactory factory) - throws Exception { + final InetSocketAddress addr, ClientSocketChannelFactory factory, int retries) + throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException { this.protocol = protocol; String serviceClassName = protocol.getName() + "$" @@ -78,7 +73,7 @@ public class BlockingRpcClient extends NettyClientBase { this.handler = new ClientChannelUpstreamHandler(); pipeFactory = new ProtoPipelineFactory(handler, RpcResponse.getDefaultInstance()); - super.init(addr, pipeFactory, factory); + super.init(addr, pipeFactory, factory, retries); rpcChannel = new ProxyRpcChannel(); this.key = new RpcConnectionKey(addr, protocol, false); @@ -227,7 +222,7 @@ public class BlockingRpcClient extends NettyClientBase { } } - class ProtoCallFuture implements Future<Message> { + static class ProtoCallFuture implements Future<Message> { private Semaphore sem = new Semaphore(0); private Message response = null; private Message returnType; http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java index 711c527..d0002de 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java @@ -18,25 +18,25 @@ package org.apache.tajo.rpc; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.util.NetUtils; import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.*; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import java.io.Closeable; -import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; public abstract class NettyClientBase implements Closeable { private static Log LOG = LogFactory.getLog(NettyClientBase.class); private static final int CLIENT_CONNECTION_TIMEOUT_SEC = 60; + private static final long PAUSE = 1000; // 1 sec + private int numRetries; protected ClientBootstrap bootstrap; private ChannelFuture channelFuture; @@ -46,40 +46,54 @@ public abstract class NettyClientBase implements Closeable { public abstract <T> T getStub(); public abstract RpcConnectionPool.RpcConnectionKey getKey(); + + public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory, + int numRetries) throws ConnectTimeoutException { + this.numRetries = numRetries; + + init(addr, pipeFactory, factory); + } public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory) - throws IOException { - try { - - this.bootstrap = new ClientBootstrap(factory); - this.bootstrap.setPipelineFactory(pipeFactory); - // TODO - should be configurable - this.bootstrap.setOption("connectTimeoutMillis", 10000); - this.bootstrap.setOption("connectResponseTimeoutMillis", 10000); - this.bootstrap.setOption("receiveBufferSize", 1048576 * 10); - this.bootstrap.setOption("tcpNoDelay", true); - this.bootstrap.setOption("keepAlive", true); - - connect(addr); - } catch (IOException e) { - close(); - throw e; - } catch (Throwable t) { - throw new IOException("Connect error to " + addr + " cause " + t.getMessage(), t.getCause()); - } + throws ConnectTimeoutException { + this.bootstrap = new ClientBootstrap(factory); + this.bootstrap.setPipelineFactory(pipeFactory); + // TODO - should be configurable + this.bootstrap.setOption("connectTimeoutMillis", 10000); + this.bootstrap.setOption("connectResponseTimeoutMillis", 10000); + this.bootstrap.setOption("receiveBufferSize", 1048576 * 10); + this.bootstrap.setOption("tcpNoDelay", true); + this.bootstrap.setOption("keepAlive", true); + + connect(addr); } - - public void connect(InetSocketAddress addr) throws Exception { - if(addr.isUnresolved()){ - addr = NetUtils.createSocketAddr(addr.getHostName(), addr.getPort()); - } + + private void handleConnectionInternally(final InetSocketAddress addr) throws ConnectTimeoutException { this.channelFuture = bootstrap.connect(addr); final CountDownLatch latch = new CountDownLatch(1); this.channelFuture.addListener(new ChannelFutureListener() { + private final AtomicInteger retryCount = new AtomicInteger(); + @Override public void operationComplete(ChannelFuture future) throws Exception { - latch.countDown(); + if (!future.isSuccess()) { + if (numRetries > retryCount.getAndIncrement()) { + Thread.sleep(PAUSE); + channelFuture = bootstrap.connect(addr); + channelFuture.addListener(this); + + LOG.debug("Connecting to " + addr + " has been failed. Retrying to connect."); + } + else { + latch.countDown(); + + LOG.error("Max retry count has been exceeded. attempts=" + numRetries); + } + } + else { + latch.countDown(); + } } }); @@ -88,12 +102,20 @@ public abstract class NettyClientBase implements Closeable { } catch (InterruptedException e) { } - if (!channelFuture.isSuccess()) { - throw new RuntimeException(channelFuture.getCause()); + throw new ConnectTimeoutException("Connect error to " + addr + + " caused by " + ExceptionUtils.getMessage(channelFuture.getCause())); } } + public void connect(InetSocketAddress addr) throws ConnectTimeoutException { + if(addr.isUnresolved()){ + addr = NetUtils.createSocketAddr(addr.getHostName(), addr.getPort()); + } + + handleConnectionInternally(addr); + } + public boolean isConnected() { return getChannel().isConnected(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java index aba9c63..2f3d433 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java @@ -22,9 +22,12 @@ import com.google.common.base.Objects; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.conf.TajoConf; +import org.jboss.netty.channel.ConnectTimeoutException; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.DefaultChannelGroup; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; +import org.jboss.netty.logging.CommonsLoggerFactory; +import org.jboss.netty.logging.InternalLoggerFactory; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; @@ -42,6 +45,8 @@ public class RpcConnectionPool { private final ClientSocketChannelFactory channelFactory; private final TajoConf conf; + public final static int RPC_RETRIES = 3; + private RpcConnectionPool(TajoConf conf, ClientSocketChannelFactory channelFactory) { this.conf = conf; this.channelFactory = channelFactory; @@ -49,6 +54,7 @@ public class RpcConnectionPool { public synchronized static RpcConnectionPool getPool(TajoConf conf) { if(instance == null) { + InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory()); instance = new RpcConnectionPool(conf, RpcChannelFactory.getSharedClientChannelFactory()); } return instance; @@ -58,19 +64,21 @@ public class RpcConnectionPool { return new RpcConnectionPool(conf, RpcChannelFactory.createClientChannelFactory(poolName, workerNum)); } - private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey) throws Exception { + private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey) + throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { NettyClientBase client; if(rpcConnectionKey.asyncMode) { - client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory); + client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory, RPC_RETRIES); } else { - client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory); + client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory, RPC_RETRIES); } accepted.add(client.getChannel()); return client; } public NettyClientBase getConnection(InetSocketAddress addr, - Class protocolClass, boolean asyncMode) throws Exception { + Class protocolClass, boolean asyncMode) + throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode); NettyClientBase client = connections.get(key); http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java index 72223c1..7c8246a 100644 --- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java +++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java @@ -28,6 +28,8 @@ import org.apache.tajo.rpc.test.TestProtos.SumRequest; import org.apache.tajo.rpc.test.TestProtos.SumResponse; import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl; import org.apache.tajo.util.NetUtils; +import org.jboss.netty.channel.ConnectTimeoutException; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -50,15 +52,20 @@ public class TestAsyncRpc { static AsyncRpcClient client; static Interface stub; static DummyProtocolAsyncImpl service; + ClientSocketChannelFactory clientChannelFactory; + int retries; @Before public void setUp() throws Exception { + retries = 1; + + clientChannelFactory = RpcChannelFactory.createClientChannelFactory("TestAsyncRpc", 2); service = new DummyProtocolAsyncImpl(); server = new AsyncRpcServer(DummyProtocol.class, service, new InetSocketAddress("127.0.0.1", 0), 2); server.start(); client = new AsyncRpcClient(DummyProtocol.class, - NetUtils.getConnectAddress(server.getListenAddress())); + NetUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries); stub = client.getStub(); } @@ -67,9 +74,14 @@ public class TestAsyncRpc { if(client != null) { client.close(); } + if(server != null) { server.shutdown(); } + + if (clientChannelFactory != null) { + clientChannelFactory.releaseExternalResources(); + } } boolean calledMarker = false; @@ -170,9 +182,83 @@ public class TestAsyncRpc { } @Test + public void testStubDisconnected() throws Exception { + + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); + + server.shutdown(); + server = null; + + stub = client.getStub(); + stub.echo(future.getController(), echoMessage, future); + EchoMessage response = future.get(); + + assertNull(response); + assertTrue(future.getController().failed()); + assertTrue(future.getController().errorText() != null); + } + + @Test + public void testConnectionRetry() throws Exception { + retries = 10; + final InetSocketAddress address = server.getListenAddress(); + tearDown(); + + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); + + //lazy startup + Thread serverThread = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(100); + server = new AsyncRpcServer(DummyProtocol.class, + service, address, 2); + } catch (Exception e) { + fail(e.getMessage()); + } + server.start(); + } + }); + serverThread.start(); + + clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2); + client = new AsyncRpcClient(DummyProtocol.class, address, clientChannelFactory, retries); + stub = client.getStub(); + stub.echo(future.getController(), echoMessage, future); + + assertFalse(future.isDone()); + assertEquals(echoMessage, future.get()); + assertTrue(future.isDone()); + } + + @Test + public void testConnectionFailure() throws Exception { + InetSocketAddress address = new InetSocketAddress("test", 0); + boolean expected = false; + try { + new AsyncRpcClient(DummyProtocol.class, address, clientChannelFactory, retries); + fail(); + } catch (ConnectTimeoutException e) { + expected = true; + } catch (Throwable throwable) { + fail(); + } + assertTrue(expected); + } + + @Test public void testUnresolvedAddress() throws Exception { + client.close(); + client = null; + String hostAndPort = NetUtils.normalizeInetSocketAddress(server.getListenAddress()); - AsyncRpcClient client = new AsyncRpcClient(DummyProtocol.class, NetUtils.createUnresolved(hostAndPort)); + client = new AsyncRpcClient(DummyProtocol.class, + NetUtils.createUnresolved(hostAndPort), clientChannelFactory, retries); Interface stub = client.getStub(); EchoMessage echoMessage = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); @@ -182,6 +268,5 @@ public class TestAsyncRpc { assertFalse(future.isDone()); assertEquals(future.get(), echoMessage); assertTrue(future.isDone()); - client.close(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java index 3fc51c6..28a3fad 100644 --- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java +++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java @@ -26,11 +26,11 @@ import org.apache.tajo.rpc.test.TestProtos.SumRequest; import org.apache.tajo.rpc.test.TestProtos.SumResponse; import org.apache.tajo.rpc.test.impl.DummyProtocolBlockingImpl; import org.apache.tajo.util.NetUtils; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; @@ -45,15 +45,21 @@ public class TestBlockingRpc { private BlockingRpcClient client; private BlockingInterface stub; private DummyProtocolBlockingImpl service; + private int retries; + private ClientSocketChannelFactory clientChannelFactory; @Before public void setUp() throws Exception { + retries = 1; + + clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2); + service = new DummyProtocolBlockingImpl(); server = new BlockingRpcServer(DummyProtocol.class, service, new InetSocketAddress("127.0.0.1", 0), 2); server.start(); client = new BlockingRpcClient(DummyProtocol.class, - NetUtils.getConnectAddress(server.getListenAddress())); + NetUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries); stub = client.getStub(); } @@ -62,9 +68,14 @@ public class TestBlockingRpc { if(client != null) { client.close(); } + if(server != null) { server.shutdown(); } + + if(clientChannelFactory != null){ + clientChannelFactory.releaseExternalResources(); + } } @Test @@ -85,6 +96,7 @@ public class TestBlockingRpc { @Test public void testRpcWithServiceCallable() throws Exception { + RpcConnectionPool pool = RpcConnectionPool.newPool(new TajoConf(), getClass().getSimpleName(), 2); final SumRequest request = SumRequest.newBuilder() .setX1(1) .setX2(2) @@ -92,7 +104,7 @@ public class TestBlockingRpc { .setX4(2.0f).build(); SumResponse response = - new ServerCallable<SumResponse>(RpcConnectionPool.newPool(new TajoConf(), getClass().getSimpleName(), 2), + new ServerCallable<SumResponse>(pool, server.getListenAddress(), DummyProtocol.class, false) { @Override public SumResponse call(NettyClientBase client) throws Exception { @@ -105,7 +117,7 @@ public class TestBlockingRpc { assertEquals(8.15d, response.getResult(), 1e-15); response = - new ServerCallable<SumResponse>(RpcConnectionPool.newPool(new TajoConf(), getClass().getSimpleName(), 2), + new ServerCallable<SumResponse>(pool, server.getListenAddress(), DummyProtocol.class, false) { @Override public SumResponse call(NettyClientBase client) throws Exception { @@ -116,6 +128,7 @@ public class TestBlockingRpc { }.withoutRetries(); assertTrue(8.15d == response.getResult()); + pool.close(); } @Test @@ -137,17 +150,51 @@ public class TestBlockingRpc { } @Test + public void testConnectionRetry() throws Exception { + retries = 10; + final InetSocketAddress address = server.getListenAddress(); + tearDown(); + + EchoMessage message = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + + //lazy startup + Thread serverThread = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(100); + server = new BlockingRpcServer(DummyProtocol.class, service, address, 2); + } catch (Exception e) { + fail(e.getMessage()); + } + server.start(); + } + }); + serverThread.start(); + + clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2); + client = new BlockingRpcClient(DummyProtocol.class, address, clientChannelFactory, retries); + stub = client.getStub(); + + EchoMessage response = stub.echo(null, message); + assertEquals(MESSAGE, response.getMessage()); + } + + @Test public void testConnectionFailed() throws Exception { + boolean expected = false; try { int port = server.getListenAddress().getPort() + 1; new BlockingRpcClient(DummyProtocol.class, - NetUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port))); + NetUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), clientChannelFactory, retries); fail("Connection should be failed."); - } catch (Throwable t) { - assertTrue(t instanceof IOException); - assertNotNull(t.getCause()); - assertTrue(t.getCause() instanceof ConnectException); + } catch (ConnectException ce) { + expected = true; + } catch (Throwable ce){ + fail(); } + assertTrue(expected); } @Test @@ -167,7 +214,6 @@ public class TestBlockingRpc { .build(); stub.deley(null, message); } catch (Exception e) { - e.printStackTrace(); error.append(e.getMessage()); } synchronized(error) { @@ -211,14 +257,17 @@ public class TestBlockingRpc { @Test public void testUnresolvedAddress() throws Exception { + client.close(); + client = null; + String hostAndPort = NetUtils.normalizeInetSocketAddress(server.getListenAddress()); - BlockingRpcClient client = new BlockingRpcClient(DummyProtocol.class, NetUtils.createUnresolved(hostAndPort)); + client = new BlockingRpcClient(DummyProtocol.class, + NetUtils.createUnresolved(hostAndPort), clientChannelFactory, retries); BlockingInterface stub = client.getStub(); EchoMessage message = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); EchoMessage response2 = stub.echo(null, message); assertEquals(MESSAGE, response2.getMessage()); - client.close(); } } \ No newline at end of file
