Repository: tajo Updated Branches: refs/heads/master 338a2b777 -> 7b78668b7
http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java index 3b5a747..e4109fe 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java @@ -18,17 +18,17 @@ package org.apache.tajo.rpc; -import com.google.protobuf.*; import com.google.protobuf.Descriptors.MethodDescriptor; - +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; import io.netty.channel.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.rpc.RpcProtos.RpcRequest; import org.apache.tajo.rpc.RpcProtos.RpcResponse; -import io.netty.util.ReferenceCountUtil; - import java.lang.reflect.Method; import java.net.InetSocketAddress; @@ -57,7 +57,7 @@ public class AsyncRpcServer extends NettyServerBase { } @ChannelHandler.Sharable - private class ServerHandler extends ChannelInboundHandlerAdapter { + private class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { @@ -78,55 +78,46 @@ public class AsyncRpcServer extends NettyServerBase { } @Override - public void channelRead(final ChannelHandlerContext ctx, Object msg) - throws Exception { - if (msg instanceof RpcRequest) { - try { - final RpcRequest request = (RpcRequest) msg; - - String methodName = request.getMethodName(); - MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); + protected void channelRead0(final ChannelHandlerContext ctx, final RpcRequest request) throws Exception { - if (methodDescriptor == null) { - throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)); - } - - Message paramProto = null; - if (request.hasRequestMessage()) { - try { - paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType() - .mergeFrom(request.getRequestMessage()).build(); - } catch (Throwable t) { - throw new RemoteCallException(request.getId(), methodDescriptor, t); - } - } + String methodName = request.getMethodName(); + MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); - final RpcController controller = new NettyRpcController(); + if (methodDescriptor == null) { + throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)); + } - RpcCallback<Message> callback = !request.hasId() ? null : new RpcCallback<Message>() { + Message paramProto = null; + if (request.hasRequestMessage()) { + try { + paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType() + .mergeFrom(request.getRequestMessage()).build(); + } catch (Throwable t) { + throw new RemoteCallException(request.getId(), methodDescriptor, t); + } + } - public void run(Message returnValue) { + final RpcController controller = new NettyRpcController(); - RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId()); + RpcCallback<Message> callback = !request.hasId() ? null : new RpcCallback<Message>() { - if (returnValue != null) { - builder.setResponseMessage(returnValue.toByteString()); - } + public void run(Message returnValue) { - if (controller.failed()) { - builder.setErrorMessage(controller.errorText()); - } + RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId()); - ctx.writeAndFlush(builder.build()); - } - }; + if (returnValue != null) { + builder.setResponseMessage(returnValue.toByteString()); + } - service.callMethod(methodDescriptor, controller, paramProto, callback); + if (controller.failed()) { + builder.setErrorMessage(controller.errorText()); + } - } finally { - ReferenceCountUtil.release(msg); + ctx.writeAndFlush(builder.build()); } - } + }; + + service.callMethod(methodDescriptor, controller, paramProto, callback); } @Override @@ -138,11 +129,6 @@ public class AsyncRpcServer extends NettyServerBase { } else { LOG.error(cause.getMessage()); } - - if (ctx != null && ctx.channel().isActive()) { - ctx.channel().close(); - } } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java index 6a90330..c98f91f 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java @@ -18,24 +18,25 @@ package org.apache.tajo.rpc; -import com.google.protobuf.*; +import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.Descriptors.MethodDescriptor; - +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; import io.netty.channel.*; -import io.netty.util.concurrent.*; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.concurrent.GenericFutureListener; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey; +import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey; import org.apache.tajo.rpc.RpcProtos.RpcRequest; import org.apache.tajo.rpc.RpcProtos.RpcResponse; -import io.netty.util.ReferenceCountUtil; - import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.*; -import java.util.concurrent.Future; public class BlockingRpcClient extends NettyClientBase { private static final Log LOG = LogFactory.getLog(RpcProtos.class); @@ -52,12 +53,17 @@ public class BlockingRpcClient extends NettyClientBase { * new an instance through this constructor. */ BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries) + throws NoSuchMethodException, ClassNotFoundException { + this(rpcConnectionKey, retries, 0); + } + + BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries, int idleTimeSeconds) throws ClassNotFoundException, NoSuchMethodException { super(rpcConnectionKey, retries); stubMethod = getServiceClass().getMethod("newBlockingStub", BlockingRpcChannel.class); rpcChannel = new ProxyRpcChannel(); inboundHandler = new ClientChannelInboundHandler(); - init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance())); + init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance(), idleTimeSeconds)); } @Override @@ -151,39 +157,30 @@ public class BlockingRpcClient extends NettyClientBase { } @ChannelHandler.Sharable - private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter { + private class ClientChannelInboundHandler extends SimpleChannelInboundHandler<RpcResponse> { @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { - - if (msg instanceof RpcResponse) { - try { - RpcResponse rpcResponse = (RpcResponse) msg; - ProtoCallFuture callback = requests.remove(rpcResponse.getId()); + protected void channelRead0(ChannelHandlerContext ctx, RpcResponse rpcResponse) throws Exception { + ProtoCallFuture callback = requests.remove(rpcResponse.getId()); - if (callback == null) { - LOG.warn("Dangling rpc call"); + if (callback == null) { + LOG.warn("Dangling rpc call"); + } else { + if (rpcResponse.hasErrorMessage()) { + callback.setFailed(rpcResponse.getErrorMessage(), + makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace()))); + throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage())); + } else { + Message responseMessage; + + if (!rpcResponse.hasResponseMessage()) { + responseMessage = null; } else { - if (rpcResponse.hasErrorMessage()) { - callback.setFailed(rpcResponse.getErrorMessage(), - makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace()))); - throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage())); - } else { - Message responseMessage; - - if (!rpcResponse.hasResponseMessage()) { - responseMessage = null; - } else { - responseMessage = callback.returnType.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage()) - .build(); - } - - callback.setResponse(responseMessage); - } + responseMessage = callback.returnType.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage()) + .build(); } - } finally { - ReferenceCountUtil.release(msg); + + callback.setResponse(responseMessage); } } } @@ -200,8 +197,23 @@ public class BlockingRpcClient extends NettyClientBase { } else { LOG.error("RPC Exception:" + cause.getMessage()); } - if (ctx != null && ctx.channel().isActive()) { - ctx.channel().close(); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + LOG.info("Connection established successfully : " + ctx.channel().remoteAddress()); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent e = (IdleStateEvent) evt; + /* If all requests is done and event is triggered, channel will be closed. */ + if (e.state() == IdleState.ALL_IDLE && requests.size() == 0) { + ctx.close(); + LOG.warn("Idle connection closed successfully :" + ctx.channel().remoteAddress()); + } } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java index 0ce359f..bb31367 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java @@ -22,15 +22,12 @@ import com.google.protobuf.BlockingService; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.RpcController; - import io.netty.channel.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.rpc.RpcProtos.RpcRequest; import org.apache.tajo.rpc.RpcProtos.RpcResponse; -import io.netty.util.ReferenceCountUtil; - import java.lang.reflect.Method; import java.net.InetSocketAddress; @@ -62,7 +59,7 @@ public class BlockingRpcServer extends NettyServerBase { } @ChannelHandler.Sharable - private class ServerHandler extends ChannelInboundHandlerAdapter { + private class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { @@ -83,52 +80,43 @@ public class BlockingRpcServer extends NettyServerBase { } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { + protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception { - if (msg instanceof RpcRequest) { + String methodName = request.getMethodName(); + MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); + + if (methodDescriptor == null) { + throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)); + } + Message paramProto = null; + if (request.hasRequestMessage()) { try { - final RpcRequest request = (RpcRequest) msg; - - String methodName = request.getMethodName(); - MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); - - if (methodDescriptor == null) { - throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)); - } - Message paramProto = null; - if (request.hasRequestMessage()) { - try { - paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType() - .mergeFrom(request.getRequestMessage()).build(); - - } catch (Throwable t) { - throw new RemoteCallException(request.getId(), methodDescriptor, t); - } - } - Message returnValue; - RpcController controller = new NettyRpcController(); - - try { - returnValue = service.callBlockingMethod(methodDescriptor, controller, paramProto); - } catch (Throwable t) { - throw new RemoteCallException(request.getId(), methodDescriptor, t); - } - - RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId()); - - if (returnValue != null) { - builder.setResponseMessage(returnValue.toByteString()); - } - - if (controller.failed()) { - builder.setErrorMessage(controller.errorText()); - } - ctx.writeAndFlush(builder.build()); - } finally { - ReferenceCountUtil.release(msg); + paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType() + .mergeFrom(request.getRequestMessage()).build(); + + } catch (Throwable t) { + throw new RemoteCallException(request.getId(), methodDescriptor, t); } } + Message returnValue; + RpcController controller = new NettyRpcController(); + + try { + returnValue = service.callBlockingMethod(methodDescriptor, controller, paramProto); + } catch (Throwable t) { + throw new RemoteCallException(request.getId(), methodDescriptor, t); + } + + RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId()); + + if (returnValue != null) { + builder.setResponseMessage(returnValue.toByteString()); + } + + if (controller.failed()) { + builder.setErrorMessage(controller.errorText()); + } + ctx.writeAndFlush(builder.build()); } @Override @@ -137,11 +125,6 @@ public class BlockingRpcServer extends NettyServerBase { RemoteCallException callException = (RemoteCallException) cause; ctx.writeAndFlush(callException.getResponse()); } - - if (ctx != null && ctx.channel().isActive()) { - ctx.channel().close(); - } } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java new file mode 100644 index 0000000..29c9772 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; + +public class ConnectionCloseFutureListener implements GenericFutureListener { + private RpcClientManager.RpcConnectionKey key; + + public ConnectionCloseFutureListener(RpcClientManager.RpcConnectionKey key) { + this.key = key; + } + + @Override + public void operationComplete(Future future) throws Exception { + RpcClientManager.remove(key); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java index cdc4cc6..57e436b 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java @@ -18,23 +18,18 @@ package org.apache.tajo.rpc; -import io.netty.channel.*; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey; - import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.*; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.util.concurrent.GenericFutureListener; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey; import java.io.Closeable; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public abstract class NettyClientBase implements Closeable { @@ -46,13 +41,11 @@ public abstract class NettyClientBase implements Closeable { private Bootstrap bootstrap; private volatile ChannelFuture channelFuture; - private volatile long lastConnected = -1; protected final Class<?> protocol; protected final AtomicInteger sequence = new AtomicInteger(0); private final RpcConnectionKey key; - private final AtomicInteger counter = new AtomicInteger(0); // reference counter public NettyClientBase(RpcConnectionKey rpcConnectionKey, int numRetries) throws ClassNotFoundException, NoSuchMethodException { @@ -65,6 +58,7 @@ public abstract class NettyClientBase implements Closeable { protected void init(ChannelInitializer<Channel> initializer) { this.bootstrap = new Bootstrap(); this.bootstrap + .group(RpcChannelFactory.getSharedClientEventloopGroup()) .channel(NioSocketChannel.class) .handler(initializer) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) @@ -74,7 +68,7 @@ public abstract class NettyClientBase implements Closeable { .option(ChannelOption.TCP_NODELAY, true); } - public RpcConnectionPool.RpcConnectionKey getKey() { + public RpcClientManager.RpcConnectionKey getKey() { return key; } @@ -94,21 +88,6 @@ public abstract class NettyClientBase implements Closeable { public abstract <T> T getStub(); - public boolean acquire(long timeout) { - if (!checkConnection(timeout)) { - return false; - } - counter.incrementAndGet(); - return true; - } - - public boolean release() { - return counter.decrementAndGet() == 0; - } - - private boolean checkConnection(long timeout) { - return isConnected() || handleConnectionInternally(key.addr, timeout); - } private InetSocketAddress resolveAddress(InetSocketAddress address) { if (address.isUnresolved()) { @@ -117,83 +96,48 @@ public abstract class NettyClientBase implements Closeable { return address; } - private void connectUsingNetty(InetSocketAddress address, GenericFutureListener<ChannelFuture> listener) { - if (lastConnected > 0) { - LOG.warn("Try to reconnect : " + address); - } - this.channelFuture = bootstrap.clone().group(RpcChannelFactory.getSharedClientEventloopGroup()) - .connect(address) - .addListener(listener); + private ChannelFuture doConnect(SocketAddress address) { + return this.channelFuture = bootstrap.clone().connect(address); } - // first attendant kicks connection - private final RpcUtils.Scrutineer<CountDownLatch> connect = new RpcUtils.Scrutineer<CountDownLatch>(); - - private boolean handleConnectionInternally(final InetSocketAddress addr, long timeout) { - final CountDownLatch ticket = new CountDownLatch(1); - final CountDownLatch granted = connect.check(ticket); - // basically, it's double checked lock - if (ticket == granted && isConnected()) { - granted.countDown(); - return true; - } + public synchronized void connect() throws ConnectTimeoutException { + if (isConnected()) return; - if (ticket == granted) { - InetSocketAddress address = resolveAddress(addr); - connectUsingNetty(address, new RetryConnectionListener(address, granted)); - } - - try { - granted.await(timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // ignore + final AtomicInteger retries = new AtomicInteger(); + InetSocketAddress address = key.addr; + if (address.isUnresolved()) { + address = resolveAddress(address); } - boolean success = channelFuture.isSuccess(); + /* do not call await() inside handler */ + ChannelFuture f = doConnect(address).awaitUninterruptibly(); + retries.incrementAndGet(); - if (granted.getCount() == 0) { - connect.clear(granted); + if (!f.isSuccess() && numRetries > 0) { + doReconnect(address, f, retries); } - - return success; } - class RetryConnectionListener implements GenericFutureListener<ChannelFuture> { - private final AtomicInteger retryCount = new AtomicInteger(); - private final InetSocketAddress address; - private final CountDownLatch latch; - - RetryConnectionListener(InetSocketAddress address, CountDownLatch latch) { - this.address = address; - this.latch = latch; - } - - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - if (!channelFuture.isSuccess()) { - channelFuture.channel().close(); - - if (numRetries > retryCount.getAndIncrement()) { + private void doReconnect(final InetSocketAddress address, ChannelFuture future, AtomicInteger retries) + throws ConnectTimeoutException { - RpcChannelFactory.getSharedClientEventloopGroup().schedule(new Runnable() { - @Override - public void run() { - connectUsingNetty(address, RetryConnectionListener.this); - } - }, PAUSE, TimeUnit.MILLISECONDS); + for (; ; ) { + if (numRetries >= retries.getAndIncrement()) { - LOG.debug("Connecting to " + address + " has been failed. Retrying to connect."); + LOG.warn(future.cause().getMessage() + " Try to reconnect"); + try { + Thread.sleep(PAUSE); + } catch (InterruptedException e) { } - else { - latch.countDown(); - LOG.error("Max retry count has been exceeded. attempts=" + numRetries); + this.channelFuture = doConnect(address).awaitUninterruptibly(); + if (this.channelFuture.isDone() && this.channelFuture.isSuccess()) { + break; } - } - else { - latch.countDown(); - lastConnected = System.currentTimeMillis(); + } else { + throw new ConnectTimeoutException("Max retry count has been exceeded. attempts=" + numRetries + + " caused by: " + future.cause()); } } } @@ -217,7 +161,7 @@ public abstract class NettyClientBase implements Closeable { Channel channel = getChannel(); if (channel != null && channel.isOpen()) { LOG.debug("Proxy will be disconnected from remote " + channel.remoteAddress()); - channel.close(); + channel.close().awaitUninterruptibly(); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java index 6a340dc..74eb650 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java @@ -18,6 +18,7 @@ package org.apache.tajo.rpc; +import com.google.protobuf.MessageLite; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; @@ -26,16 +27,21 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; - -import com.google.protobuf.MessageLite; +import io.netty.handler.timeout.IdleStateHandler; class ProtoChannelInitializer extends ChannelInitializer<Channel> { private final MessageLite defaultInstance; private final ChannelHandler handler; + private final int idleTimeSeconds; public ProtoChannelInitializer(ChannelHandler handler, MessageLite defaultInstance) { + this(handler, defaultInstance, 0); + } + + public ProtoChannelInitializer(ChannelHandler handler, MessageLite defaultInstance, int idleTimeSeconds) { this.handler = handler; this.defaultInstance = defaultInstance; + this.idleTimeSeconds = idleTimeSeconds; } @Override @@ -45,6 +51,7 @@ class ProtoChannelInitializer extends ChannelInitializer<Channel> { pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance)); pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast("protobufEncoder", new ProtobufEncoder()); + pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTimeSeconds)); //zero is disabling pipeline.addLast("handler", handler); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java new file mode 100644 index 0000000..f05fb97 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import io.netty.channel.ConnectTimeoutException; +import io.netty.util.internal.logging.CommonsLoggerFactory; +import io.netty.util.internal.logging.InternalLoggerFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.annotation.concurrent.ThreadSafe; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +@ThreadSafe +public class RpcClientManager { + private static final Log LOG = LogFactory.getLog(RpcClientManager.class); + + public static final int RPC_RETRIES = 3; + + /* If all requests is done and client is idle state, client will be removed. */ + public static final int RPC_IDLE_TIMEOUT = 43200; // 12 hour + + /* entries will be removed by ConnectionCloseFutureListener */ + private static final Map<RpcConnectionKey, NettyClientBase> + clients = Collections.synchronizedMap(new HashMap<RpcConnectionKey, NettyClientBase>()); + + private static RpcClientManager instance; + + static { + InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory()); + instance = new RpcClientManager(); + } + + private RpcClientManager() { + } + + public static RpcClientManager getInstance() { + return instance; + } + + private NettyClientBase makeClient(RpcConnectionKey rpcConnectionKey) + throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { + NettyClientBase client; + if (rpcConnectionKey.asyncMode) { + client = new AsyncRpcClient(rpcConnectionKey, RPC_RETRIES, RPC_IDLE_TIMEOUT); + } else { + client = new BlockingRpcClient(rpcConnectionKey, RPC_RETRIES, RPC_IDLE_TIMEOUT); + } + return client; + } + + /** + * Connect a {@link NettyClientBase} to the remote {@link NettyServerBase}, and returns rpc client by protocol. + * This client will be shared per protocol and address. Client is removed in shared map when a client is closed + * @param addr + * @param protocolClass + * @param asyncMode + * @return + * @throws NoSuchMethodException + * @throws ClassNotFoundException + * @throws ConnectTimeoutException + */ + public NettyClientBase getClient(InetSocketAddress addr, + Class<?> protocolClass, boolean asyncMode) + throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { + RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode); + + NettyClientBase client; + synchronized (clients) { + client = clients.get(key); + if (client == null) { + clients.put(key, client = makeClient(key)); + } + } + + if (!client.isConnected()) { + client.connect(); + client.getChannel().closeFuture().addListener(new ConnectionCloseFutureListener(key)); + } + assert client.isConnected(); + return client; + } + + /** + * Request to close this clients + * After it is closed, it is removed from clients map. + */ + public static void close() { + LOG.info("Closing RPC client manager"); + + for (NettyClientBase eachClient : clients.values()) { + try { + eachClient.close(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + } + + /** + * Close client manager and shutdown Netty RPC worker pool + * After it is shutdown it is not possible to reuse it again. + */ + public static void shutdown() { + close(); + RpcChannelFactory.shutdownGracefully(); + } + + protected static NettyClientBase remove(RpcConnectionKey key) { + LOG.debug("Removing shared rpc client :" + key); + return clients.remove(key); + } + + protected static boolean contains(RpcConnectionKey key) { + return clients.containsKey(key); + } + + public static void cleanup(NettyClientBase... clients) { + for (NettyClientBase client : clients) { + if (client != null) { + try { + client.close(); + } catch (Exception e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Exception in closing " + client.getKey(), e); + } + } + } + } + } + + static class RpcConnectionKey { + final InetSocketAddress addr; + final Class<?> protocolClass; + final boolean asyncMode; + + final String description; + + public RpcConnectionKey(InetSocketAddress addr, + Class<?> protocolClass, boolean asyncMode) { + this.addr = addr; + this.protocolClass = protocolClass; + this.asyncMode = asyncMode; + this.description = "[" + protocolClass + "] " + addr + "," + asyncMode; + } + + @Override + public String toString() { + return description; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof RpcConnectionKey)) { + return false; + } + + return toString().equals(obj.toString()); + } + + @Override + public int hashCode() { + return description.hashCode(); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java deleted file mode 100644 index b0ff910..0000000 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java +++ /dev/null @@ -1,191 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import io.netty.channel.ConnectTimeoutException; -import io.netty.util.internal.logging.CommonsLoggerFactory; -import io.netty.util.internal.logging.InternalLoggerFactory; - -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.Map; - -public class RpcConnectionPool { - private static final Log LOG = LogFactory.getLog(RpcConnectionPool.class); - - private Map<RpcConnectionKey, NettyClientBase> connections = - new HashMap<RpcConnectionKey, NettyClientBase>(); - - private static RpcConnectionPool instance; - private final Object lockObject = new Object(); - - public final static int RPC_RETRIES = 3; - - private RpcConnectionPool() { - } - - public synchronized static RpcConnectionPool getPool() { - if(instance == null) { - InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory()); - instance = new RpcConnectionPool(); - } - return instance; - } - - private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey) - throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { - NettyClientBase client; - if(rpcConnectionKey.asyncMode) { - client = new AsyncRpcClient(rpcConnectionKey, RPC_RETRIES); - } else { - client = new BlockingRpcClient(rpcConnectionKey, RPC_RETRIES); - } - return client; - } - - public static final long DEFAULT_TIMEOUT = 3000; - public static final long DEFAULT_INTERVAL = 500; - - public NettyClientBase getConnection(InetSocketAddress addr, - Class<?> protocolClass, boolean asyncMode) - throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { - return getConnection(addr, protocolClass, asyncMode, DEFAULT_TIMEOUT, DEFAULT_INTERVAL); - } - - public NettyClientBase getConnection(InetSocketAddress addr, - Class<?> protocolClass, boolean asyncMode, long timeout, long interval) - throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { - RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode); - - RpcUtils.Timer timer = new RpcUtils.Timer(timeout); - for (; !timer.isTimedOut(); timer.elapsed()) { - NettyClientBase client; - synchronized (lockObject) { - client = connections.get(key); - if (client == null) { - connections.put(key, client = makeConnection(key)); - } - } - if (client.acquire(timer.remaining())) { - return client; - } - timer.interval(interval); - } - - throw new ConnectTimeoutException("Failed to get connection for " + timeout + " msec"); - } - - public void releaseConnection(NettyClientBase client) { - if (client != null) { - release(client, false); - } - } - - public void closeConnection(NettyClientBase client) { - if (client != null) { - release(client, true); - } - } - - private void release(NettyClientBase client, boolean close) { - try { - if (returnToPool(client, close)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Closing connection [" + client.getKey() + "]"); - } - client.close(); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Current Connections in pool [" + connections.size() + "]"); - } - } catch (Exception e) { - LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e); - } - } - - // return true if the connection should be closed - private boolean returnToPool(NettyClientBase client, boolean close) { - synchronized (lockObject) { - if (client.release() && (close || !client.isConnected())) { - connections.remove(client.getKey()); - return true; - } - } - return false; - } - - public void close() { - if(LOG.isDebugEnabled()) { - LOG.debug("Pool Closed"); - } - - synchronized (lockObject) { - for (NettyClientBase eachClient : connections.values()) { - try { - eachClient.close(); - } catch (Exception e) { - LOG.error("close client pool error", e); - } - } - connections.clear(); - } - } - - public void shutdown(){ - close(); - RpcChannelFactory.shutdownGracefully(); - } - - static class RpcConnectionKey { - final InetSocketAddress addr; - final Class<?> protocolClass; - final boolean asyncMode; - - final String description; - - public RpcConnectionKey(InetSocketAddress addr, - Class<?> protocolClass, boolean asyncMode) { - this.addr = addr; - this.protocolClass = protocolClass; - this.asyncMode = asyncMode; - this.description = "["+ protocolClass + "] " + addr + "," + asyncMode; - } - - @Override - public String toString() { - return description; - } - - @Override - public boolean equals(Object obj) { - if(!(obj instanceof RpcConnectionKey)) { - return false; - } - - return toString().equals(obj.toString()); - } - - @Override - public int hashCode() { - return description.hashCode(); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java index fb1cec2..2804a03 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java @@ -18,13 +18,11 @@ package org.apache.tajo.rpc; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; - -import com.google.protobuf.ServiceException; public abstract class ServerCallable<T> { protected InetSocketAddress addr; @@ -33,21 +31,16 @@ public abstract class ServerCallable<T> { protected Class<?> protocol; protected boolean asyncMode; protected boolean closeConn; - protected RpcConnectionPool connPool; + protected RpcClientManager manager; public abstract T call(NettyClientBase client) throws Exception; - public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol, boolean asyncMode) { - this(connPool, addr, protocol, asyncMode, false); - } - - public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol, - boolean asyncMode, boolean closeConn) { - this.connPool = connPool; + public ServerCallable(RpcClientManager manager, InetSocketAddress addr, Class<?> protocol, + boolean asyncMode) { + this.manager = manager; this.addr = addr; this.protocol = protocol; this.asyncMode = asyncMode; - this.closeConn = closeConn; } public void beforeCall() { @@ -74,26 +67,24 @@ public abstract class ServerCallable<T> { * Run this instance with retries, timed waits, * and refinds of missing regions. * - * @param <T> the type of the return value * @return an object of type T * @throws com.google.protobuf.ServiceException if a remote or network exception occurs */ + public T withRetries() throws ServiceException { //TODO configurable final long pause = 500; //ms final int numRetries = 3; - List<Throwable> exceptions = new ArrayList<Throwable>(); for (int tries = 0; tries < numRetries; tries++) { NettyClientBase client = null; try { beforeCall(); if(addr != null) { - client = connPool.getConnection(addr, protocol, asyncMode); + client = manager.getClient(addr, protocol, asyncMode); } return call(client); } catch (IOException ioe) { - exceptions.add(ioe); if(abort) { throw new ServiceException(ioe.getMessage(), ioe); } @@ -105,9 +96,7 @@ public abstract class ServerCallable<T> { } finally { afterCall(); if(closeConn) { - connPool.closeConnection(client); - } else { - connPool.releaseConnection(client); + RpcClientManager.cleanup(client); } } try { @@ -122,7 +111,6 @@ public abstract class ServerCallable<T> { /** * Run this instance against the server once. - * @param <T> the type of the return value * @return an object of type T * @throws java.io.IOException if a remote or network exception occurs * @throws RuntimeException other unspecified error @@ -131,7 +119,7 @@ public abstract class ServerCallable<T> { NettyClientBase client = null; try { beforeCall(); - client = connPool.getConnection(addr, protocol, asyncMode); + client = manager.getClient(addr, protocol, asyncMode); return call(client); } catch (Throwable t) { Throwable t2 = translateException(t); @@ -143,9 +131,7 @@ public abstract class ServerCallable<T> { } finally { afterCall(); if(closeConn) { - connPool.closeConnection(client); - } else { - connPool.releaseConnection(client); + RpcClientManager.cleanup(client); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java index a974a65..68f170c 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java @@ -19,6 +19,7 @@ package org.apache.tajo.rpc; import com.google.protobuf.RpcCallback; +import io.netty.channel.ConnectTimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.rpc.test.DummyProtocol; @@ -123,12 +124,12 @@ public class TestAsyncRpc { public void setUpRpcClient() throws Exception { retries = 1; - RpcConnectionPool.RpcConnectionKey rpcConnectionKey = - new RpcConnectionPool.RpcConnectionKey( + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey( RpcUtils.getConnectAddress(server.getListenAddress()), DummyProtocol.class, true); client = new AsyncRpcClient(rpcConnectionKey, retries); - client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT); + client.connect(); stub = client.getStub(); } @@ -298,10 +299,11 @@ public class TestAsyncRpc { }); serverThread.start(); - RpcConnectionPool.RpcConnectionKey rpcConnectionKey = - new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, true); + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true); client = new AsyncRpcClient(rpcConnectionKey, retries); - assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); + client.connect(); + assertTrue(client.isConnected()); stub = client.getStub(); stub.echo(future.getController(), echoMessage, future); @@ -313,25 +315,32 @@ public class TestAsyncRpc { @Test public void testConnectionFailure() throws Exception { InetSocketAddress address = new InetSocketAddress("test", 0); + boolean expected = false; try { - RpcConnectionPool.RpcConnectionKey rpcConnectionKey = - new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, true); + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true); NettyClientBase client = new AsyncRpcClient(rpcConnectionKey, retries); - assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); + client.connect(); + fail(); + } catch (ConnectTimeoutException e) { + expected = true; } catch (Throwable throwable) { fail(); } + assertTrue(expected); + } @Test @SetupRpcConnection(setupRpcClient=false) public void testUnresolvedAddress() throws Exception { String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress()); - RpcConnectionPool.RpcConnectionKey rpcConnectionKey = - new RpcConnectionPool.RpcConnectionKey( + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey( RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, true); client = new AsyncRpcClient(rpcConnectionKey, retries); - assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); + client.connect(); + assertTrue(client.isConnected()); Interface stub = client.getStub(); EchoMessage echoMessage = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); @@ -342,4 +351,43 @@ public class TestAsyncRpc { assertEquals(future.get(), echoMessage); assertTrue(future.isDone()); } + + @Test + public void testIdleTimeout() throws Exception { + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); + AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout + client.connect(); + assertTrue(client.isConnected()); + + Thread.sleep(2000); + assertFalse(client.isConnected()); + + client.connect(); // try to reconnect + assertTrue(client.isConnected()); + client.close(); + assertFalse(client.isConnected()); + } + + @Test + public void testIdleTimeoutWithActiveRequest() throws Exception { + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); + AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout + client.connect(); + + assertTrue(client.isConnected()); + Interface stub = client.getStub(); + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); + stub.deley(null, echoMessage, future); //3 sec delay + + assertFalse(future.isDone()); + assertEquals(future.get(), echoMessage); + assertTrue(future.isDone()); + + Thread.sleep(2000); + assertFalse(client.isConnected()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java index 10dd766..c114985 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java @@ -18,6 +18,7 @@ package org.apache.tajo.rpc; +import io.netty.channel.ConnectTimeoutException; import org.apache.tajo.rpc.test.DummyProtocol; import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface; import org.apache.tajo.rpc.test.TestProtos.EchoMessage; @@ -115,12 +116,13 @@ public class TestBlockingRpc { public void setUpRpcClient() throws Exception { retries = 1; - RpcConnectionPool.RpcConnectionKey rpcConnectionKey = - new RpcConnectionPool.RpcConnectionKey( + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey( RpcUtils.getConnectAddress(server.getListenAddress()), DummyProtocol.class, false); client = new BlockingRpcClient(rpcConnectionKey, retries); - assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); + client.connect(); + assertTrue(client.isConnected()); stub = client.getStub(); } @@ -162,7 +164,7 @@ public class TestBlockingRpc { @Test @SetupRpcConnection(setupRpcClient=false) public void testRpcWithServiceCallable() throws Exception { - RpcConnectionPool pool = RpcConnectionPool.getPool(); + RpcClientManager manager = RpcClientManager.getInstance(); final SumRequest request = SumRequest.newBuilder() .setX1(1) .setX2(2) @@ -170,7 +172,7 @@ public class TestBlockingRpc { .setX4(2.0f).build(); SumResponse response = - new ServerCallable<SumResponse>(pool, + new ServerCallable<SumResponse>(manager, server.getListenAddress(), DummyProtocol.class, false) { @Override public SumResponse call(NettyClientBase client) throws Exception { @@ -183,7 +185,7 @@ public class TestBlockingRpc { assertEquals(8.15d, response.getResult(), 1e-15); response = - new ServerCallable<SumResponse>(pool, + new ServerCallable<SumResponse>(manager, server.getListenAddress(), DummyProtocol.class, false) { @Override public SumResponse call(NettyClientBase client) throws Exception { @@ -194,7 +196,7 @@ public class TestBlockingRpc { }.withoutRetries(); assertTrue(8.15d == response.getResult()); - pool.close(); + RpcClientManager.close(); } @Test @@ -241,10 +243,11 @@ public class TestBlockingRpc { }); serverThread.start(); - RpcConnectionPool.RpcConnectionKey rpcConnectionKey = - new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, false); + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, false); client = new BlockingRpcClient(rpcConnectionKey, retries); - assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); + client.connect(); + assertTrue(client.isConnected()); stub = client.getStub(); EchoMessage response = stub.echo(null, message); @@ -254,22 +257,22 @@ public class TestBlockingRpc { @Test public void testConnectionFailed() throws Exception { NettyClientBase client = null; - + boolean expected = false; try { int port = server.getListenAddress().getPort() + 1; - RpcConnectionPool.RpcConnectionKey rpcConnectionKey = - new RpcConnectionPool.RpcConnectionKey( + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey( RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), DummyProtocol.class, false); client = new BlockingRpcClient(rpcConnectionKey, retries); - assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); - client.close(); - } catch (Throwable ce){ - if (client != null) { - client.close(); - } + client.connect(); fail(); + } catch (ConnectTimeoutException e) { + expected = true; + } catch (Throwable e) { + fail(e.getMessage()); } + assertTrue(expected); } @Test @@ -334,11 +337,12 @@ public class TestBlockingRpc { @SetupRpcConnection(setupRpcClient=false) public void testUnresolvedAddress() throws Exception { String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress()); - RpcConnectionPool.RpcConnectionKey rpcConnectionKey = - new RpcConnectionPool.RpcConnectionKey( + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey( RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, false); client = new BlockingRpcClient(rpcConnectionKey, retries); - assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); + client.connect(); + assertTrue(client.isConnected()); BlockingInterface stub = client.getStub(); EchoMessage message = EchoMessage.newBuilder() @@ -346,4 +350,41 @@ public class TestBlockingRpc { EchoMessage response2 = stub.echo(null, message); assertEquals(MESSAGE, response2.getMessage()); } + + @Test + public void testIdleTimeout() throws Exception { + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false); + BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout + client.connect(); + assertTrue(client.isConnected()); + + Thread.sleep(2000); + assertFalse(client.isConnected()); + + client.connect(); // try to reconnect + assertTrue(client.isConnected()); + client.close(); + assertFalse(client.isConnected()); + } + + @Test + public void testIdleTimeoutWithActiveRequest() throws Exception { + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false); + BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout + + client.connect(); + + assertTrue(client.isConnected()); + BlockingInterface stub = client.getStub(); + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + + EchoMessage message = stub.deley(null, echoMessage); //3 sec delay + assertEquals(message, echoMessage); + + Thread.sleep(2000); + assertFalse(client.isConnected()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java new file mode 100644 index 0000000..5f86518 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import org.apache.tajo.rpc.test.DummyProtocol; +import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestRpcClientManager { + + @Test + public void testRaceCondition() throws Exception { + final int parallelCount = 50; + final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl(); + NettyServerBase server = new AsyncRpcServer(DummyProtocol.class, + service, new InetSocketAddress("127.0.0.1", 0), parallelCount); + server.start(); + + final InetSocketAddress address = server.getListenAddress(); + final RpcClientManager manager = RpcClientManager.getInstance(); + + ExecutorService executor = Executors.newFixedThreadPool(parallelCount); + List<Future> tasks = new ArrayList<Future>(); + for (int i = 0; i < parallelCount; i++) { + tasks.add(executor.submit(new Runnable() { + @Override + public void run() { + NettyClientBase client = null; + try { + client = manager.getClient(address, DummyProtocol.class, false); + } catch (Throwable e) { + fail(e.getMessage()); + } + assertTrue(client.isConnected()); + } + }) + ); + } + + for (Future future : tasks) { + future.get(); + } + + NettyClientBase clientBase = manager.getClient(address, DummyProtocol.class, false); + RpcClientManager.cleanup(clientBase); + server.shutdown(); + executor.shutdown(); + } + + @Test + public void testCloseFuture() throws Exception { + final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl(); + NettyServerBase server = new AsyncRpcServer(DummyProtocol.class, + service, new InetSocketAddress("127.0.0.1", 0), 3); + server.start(); + + final RpcClientManager manager = RpcClientManager.getInstance(); + + NettyClientBase client = manager.getClient(server.getListenAddress(), DummyProtocol.class, true); + assertTrue(client.isConnected()); + assertTrue(client.getChannel().isWritable()); + + RpcClientManager.RpcConnectionKey key = client.getKey(); + assertTrue(RpcClientManager.contains(key)); + + client.close(); + assertFalse(RpcClientManager.contains(key)); + server.shutdown(); + } +} \ No newline at end of file
