Repository: tajo
Updated Branches:
  refs/heads/master 338a2b777 -> 7b78668b7


http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
index 3b5a747..e4109fe 100644
--- 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
+++ 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
@@ -18,17 +18,17 @@
 
 package org.apache.tajo.rpc;
 
-import com.google.protobuf.*;
 import com.google.protobuf.Descriptors.MethodDescriptor;
-
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
 import io.netty.channel.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.rpc.RpcProtos.RpcRequest;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 
-import io.netty.util.ReferenceCountUtil;
-
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 
@@ -57,7 +57,7 @@ public class AsyncRpcServer extends NettyServerBase {
   }
 
   @ChannelHandler.Sharable
-  private class ServerHandler extends ChannelInboundHandlerAdapter {
+  private class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
 
     @Override
     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
@@ -78,55 +78,46 @@ public class AsyncRpcServer extends NettyServerBase {
     }
 
     @Override
-    public void channelRead(final ChannelHandlerContext ctx, Object msg)
-        throws Exception {
-      if (msg instanceof RpcRequest) {
-        try {
-          final RpcRequest request = (RpcRequest) msg;
-
-          String methodName = request.getMethodName();
-          MethodDescriptor methodDescriptor = 
service.getDescriptorForType().findMethodByName(methodName);
+    protected void channelRead0(final ChannelHandlerContext ctx, final 
RpcRequest request) throws Exception {
 
-          if (methodDescriptor == null) {
-            throw new RemoteCallException(request.getId(), new 
NoSuchMethodException(methodName));
-          }
-
-          Message paramProto = null;
-          if (request.hasRequestMessage()) {
-            try {
-              paramProto = 
service.getRequestPrototype(methodDescriptor).newBuilderForType()
-                  .mergeFrom(request.getRequestMessage()).build();
-            } catch (Throwable t) {
-              throw new RemoteCallException(request.getId(), methodDescriptor, 
t);
-            }
-          }
+      String methodName = request.getMethodName();
+      MethodDescriptor methodDescriptor = 
service.getDescriptorForType().findMethodByName(methodName);
 
-          final RpcController controller = new NettyRpcController();
+      if (methodDescriptor == null) {
+        throw new RemoteCallException(request.getId(), new 
NoSuchMethodException(methodName));
+      }
 
-          RpcCallback<Message> callback = !request.hasId() ? null : new 
RpcCallback<Message>() {
+      Message paramProto = null;
+      if (request.hasRequestMessage()) {
+        try {
+          paramProto = 
service.getRequestPrototype(methodDescriptor).newBuilderForType()
+              .mergeFrom(request.getRequestMessage()).build();
+        } catch (Throwable t) {
+          throw new RemoteCallException(request.getId(), methodDescriptor, t);
+        }
+      }
 
-            public void run(Message returnValue) {
+      final RpcController controller = new NettyRpcController();
 
-              RpcResponse.Builder builder = 
RpcResponse.newBuilder().setId(request.getId());
+      RpcCallback<Message> callback = !request.hasId() ? null : new 
RpcCallback<Message>() {
 
-              if (returnValue != null) {
-                builder.setResponseMessage(returnValue.toByteString());
-              }
+        public void run(Message returnValue) {
 
-              if (controller.failed()) {
-                builder.setErrorMessage(controller.errorText());
-              }
+          RpcResponse.Builder builder = 
RpcResponse.newBuilder().setId(request.getId());
 
-              ctx.writeAndFlush(builder.build());
-            }
-          };
+          if (returnValue != null) {
+            builder.setResponseMessage(returnValue.toByteString());
+          }
 
-          service.callMethod(methodDescriptor, controller, paramProto, 
callback);
+          if (controller.failed()) {
+            builder.setErrorMessage(controller.errorText());
+          }
 
-        } finally {
-          ReferenceCountUtil.release(msg);
+          ctx.writeAndFlush(builder.build());
         }
-      }
+      };
+
+      service.callMethod(methodDescriptor, controller, paramProto, callback);
     }
 
     @Override
@@ -138,11 +129,6 @@ public class AsyncRpcServer extends NettyServerBase {
       } else {
         LOG.error(cause.getMessage());
       }
-      
-      if (ctx != null && ctx.channel().isActive()) {
-        ctx.channel().close();
-      }
     }
-    
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index 6a90330..c98f91f 100644
--- 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -18,24 +18,25 @@
 
 package org.apache.tajo.rpc;
 
-import com.google.protobuf.*;
+import com.google.protobuf.BlockingRpcChannel;
 import com.google.protobuf.Descriptors.MethodDescriptor;
-
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
 import io.netty.channel.*;
-import io.netty.util.concurrent.*;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
+import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey;
 import org.apache.tajo.rpc.RpcProtos.RpcRequest;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 
-import io.netty.util.ReferenceCountUtil;
-
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.concurrent.*;
-import java.util.concurrent.Future;
 
 public class BlockingRpcClient extends NettyClientBase {
   private static final Log LOG = LogFactory.getLog(RpcProtos.class);
@@ -52,12 +53,17 @@ public class BlockingRpcClient extends NettyClientBase {
    * new an instance through this constructor.
    */
   BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
+      throws NoSuchMethodException, ClassNotFoundException {
+    this(rpcConnectionKey, retries, 0);
+  }
+
+  BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries, int 
idleTimeSeconds)
       throws ClassNotFoundException, NoSuchMethodException {
     super(rpcConnectionKey, retries);
     stubMethod = getServiceClass().getMethod("newBlockingStub", 
BlockingRpcChannel.class);
     rpcChannel = new ProxyRpcChannel();
     inboundHandler = new ClientChannelInboundHandler();
-    init(new ProtoChannelInitializer(inboundHandler, 
RpcResponse.getDefaultInstance()));
+    init(new ProtoChannelInitializer(inboundHandler, 
RpcResponse.getDefaultInstance(), idleTimeSeconds));
   }
 
   @Override
@@ -151,39 +157,30 @@ public class BlockingRpcClient extends NettyClientBase {
   }
 
   @ChannelHandler.Sharable
-  private class ClientChannelInboundHandler extends 
ChannelInboundHandlerAdapter {
+  private class ClientChannelInboundHandler extends 
SimpleChannelInboundHandler<RpcResponse> {
 
     @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg)
-        throws Exception {
-
-      if (msg instanceof RpcResponse) {
-        try {
-          RpcResponse rpcResponse = (RpcResponse) msg;
-          ProtoCallFuture callback = requests.remove(rpcResponse.getId());
+    protected void channelRead0(ChannelHandlerContext ctx, RpcResponse 
rpcResponse) throws Exception {
+      ProtoCallFuture callback = requests.remove(rpcResponse.getId());
 
-          if (callback == null) {
-            LOG.warn("Dangling rpc call");
+      if (callback == null) {
+        LOG.warn("Dangling rpc call");
+      } else {
+        if (rpcResponse.hasErrorMessage()) {
+          callback.setFailed(rpcResponse.getErrorMessage(),
+              makeTajoServiceException(rpcResponse, new 
ServiceException(rpcResponse.getErrorTrace())));
+          throw new 
RemoteException(getErrorMessage(rpcResponse.getErrorMessage()));
+        } else {
+          Message responseMessage;
+
+          if (!rpcResponse.hasResponseMessage()) {
+            responseMessage = null;
           } else {
-            if (rpcResponse.hasErrorMessage()) {
-              callback.setFailed(rpcResponse.getErrorMessage(),
-                  makeTajoServiceException(rpcResponse, new 
ServiceException(rpcResponse.getErrorTrace())));
-              throw new 
RemoteException(getErrorMessage(rpcResponse.getErrorMessage()));
-            } else {
-              Message responseMessage;
-
-              if (!rpcResponse.hasResponseMessage()) {
-                responseMessage = null;
-              } else {
-                responseMessage = 
callback.returnType.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage())
-                    .build();
-              }
-
-              callback.setResponse(responseMessage);
-            }
+            responseMessage = 
callback.returnType.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage())
+                .build();
           }
-        } finally {
-          ReferenceCountUtil.release(msg);
+
+          callback.setResponse(responseMessage);
         }
       }
     }
@@ -200,8 +197,23 @@ public class BlockingRpcClient extends NettyClientBase {
       } else {
         LOG.error("RPC Exception:" + cause.getMessage());
       }
-      if (ctx != null && ctx.channel().isActive()) {
-        ctx.channel().close();
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+      super.channelActive(ctx);
+      LOG.info("Connection established successfully : " + 
ctx.channel().remoteAddress());
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) 
throws Exception {
+      if (evt instanceof IdleStateEvent) {
+        IdleStateEvent e = (IdleStateEvent) evt;
+        /* If all requests is done and event is triggered, channel will be 
closed. */
+        if (e.state() == IdleState.ALL_IDLE && requests.size() == 0) {
+          ctx.close();
+          LOG.warn("Idle connection closed successfully :" + 
ctx.channel().remoteAddress());
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
index 0ce359f..bb31367 100644
--- 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
+++ 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
@@ -22,15 +22,12 @@ import com.google.protobuf.BlockingService;
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
 import com.google.protobuf.RpcController;
-
 import io.netty.channel.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.rpc.RpcProtos.RpcRequest;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 
-import io.netty.util.ReferenceCountUtil;
-
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 
@@ -62,7 +59,7 @@ public class BlockingRpcServer extends NettyServerBase {
   }
 
   @ChannelHandler.Sharable
-  private class ServerHandler extends ChannelInboundHandlerAdapter {
+  private class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
 
     @Override
     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
@@ -83,52 +80,43 @@ public class BlockingRpcServer extends NettyServerBase {
     }
 
     @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg)
-        throws Exception {
+    protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) 
throws Exception {
 
-      if (msg instanceof RpcRequest) {
+      String methodName = request.getMethodName();
+      MethodDescriptor methodDescriptor = 
service.getDescriptorForType().findMethodByName(methodName);
+
+      if (methodDescriptor == null) {
+        throw new RemoteCallException(request.getId(), new 
NoSuchMethodException(methodName));
+      }
+      Message paramProto = null;
+      if (request.hasRequestMessage()) {
         try {
-          final RpcRequest request = (RpcRequest) msg;
-
-          String methodName = request.getMethodName();
-          MethodDescriptor methodDescriptor = 
service.getDescriptorForType().findMethodByName(methodName);
-
-          if (methodDescriptor == null) {
-            throw new RemoteCallException(request.getId(), new 
NoSuchMethodException(methodName));
-          }
-          Message paramProto = null;
-          if (request.hasRequestMessage()) {
-            try {
-              paramProto = 
service.getRequestPrototype(methodDescriptor).newBuilderForType()
-                  .mergeFrom(request.getRequestMessage()).build();
-
-            } catch (Throwable t) {
-              throw new RemoteCallException(request.getId(), methodDescriptor, 
t);
-            }
-          }
-          Message returnValue;
-          RpcController controller = new NettyRpcController();
-
-          try {
-            returnValue = service.callBlockingMethod(methodDescriptor, 
controller, paramProto);
-          } catch (Throwable t) {
-            throw new RemoteCallException(request.getId(), methodDescriptor, 
t);
-          }
-
-          RpcResponse.Builder builder = 
RpcResponse.newBuilder().setId(request.getId());
-
-          if (returnValue != null) {
-            builder.setResponseMessage(returnValue.toByteString());
-          }
-
-          if (controller.failed()) {
-            builder.setErrorMessage(controller.errorText());
-          }
-          ctx.writeAndFlush(builder.build());
-        } finally {
-          ReferenceCountUtil.release(msg);
+          paramProto = 
service.getRequestPrototype(methodDescriptor).newBuilderForType()
+              .mergeFrom(request.getRequestMessage()).build();
+
+        } catch (Throwable t) {
+          throw new RemoteCallException(request.getId(), methodDescriptor, t);
         }
       }
+      Message returnValue;
+      RpcController controller = new NettyRpcController();
+
+      try {
+        returnValue = service.callBlockingMethod(methodDescriptor, controller, 
paramProto);
+      } catch (Throwable t) {
+        throw new RemoteCallException(request.getId(), methodDescriptor, t);
+      }
+
+      RpcResponse.Builder builder = 
RpcResponse.newBuilder().setId(request.getId());
+
+      if (returnValue != null) {
+        builder.setResponseMessage(returnValue.toByteString());
+      }
+
+      if (controller.failed()) {
+        builder.setErrorMessage(controller.errorText());
+      }
+      ctx.writeAndFlush(builder.build());
     }
 
     @Override
@@ -137,11 +125,6 @@ public class BlockingRpcServer extends NettyServerBase {
         RemoteCallException callException = (RemoteCallException) cause;
         ctx.writeAndFlush(callException.getResponse());
       }
-      
-      if (ctx != null && ctx.channel().isActive()) {
-        ctx.channel().close();
-      }
     }
-    
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java
 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java
new file mode 100644
index 0000000..29c9772
--- /dev/null
+++ 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
+public class ConnectionCloseFutureListener implements GenericFutureListener {
+  private RpcClientManager.RpcConnectionKey key;
+
+  public ConnectionCloseFutureListener(RpcClientManager.RpcConnectionKey key) {
+    this.key = key;
+  }
+
+  @Override
+  public void operationComplete(Future future) throws Exception {
+    RpcClientManager.remove(key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index cdc4cc6..57e436b 100644
--- 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -18,23 +18,18 @@
 
 package org.apache.tajo.rpc;
 
-import io.netty.channel.*;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
-
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.*;
 import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey;
 
 import java.io.Closeable;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public abstract class NettyClientBase implements Closeable {
@@ -46,13 +41,11 @@ public abstract class NettyClientBase implements Closeable {
 
   private Bootstrap bootstrap;
   private volatile ChannelFuture channelFuture;
-  private volatile long lastConnected = -1;
 
   protected final Class<?> protocol;
   protected final AtomicInteger sequence = new AtomicInteger(0);
 
   private final RpcConnectionKey key;
-  private final AtomicInteger counter = new AtomicInteger(0);   // reference 
counter
 
   public NettyClientBase(RpcConnectionKey rpcConnectionKey, int numRetries)
       throws ClassNotFoundException, NoSuchMethodException {
@@ -65,6 +58,7 @@ public abstract class NettyClientBase implements Closeable {
   protected void init(ChannelInitializer<Channel> initializer) {
     this.bootstrap = new Bootstrap();
     this.bootstrap
+        .group(RpcChannelFactory.getSharedClientEventloopGroup())
       .channel(NioSocketChannel.class)
       .handler(initializer)
       .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
@@ -74,7 +68,7 @@ public abstract class NettyClientBase implements Closeable {
       .option(ChannelOption.TCP_NODELAY, true);
   }
 
-  public RpcConnectionPool.RpcConnectionKey getKey() {
+  public RpcClientManager.RpcConnectionKey getKey() {
     return key;
   }
 
@@ -94,21 +88,6 @@ public abstract class NettyClientBase implements Closeable {
 
   public abstract <T> T getStub();
 
-  public boolean acquire(long timeout) {
-    if (!checkConnection(timeout)) {
-      return false;
-    }
-    counter.incrementAndGet();
-    return true;
-  }
-
-  public boolean release() {
-    return counter.decrementAndGet() == 0;
-  }
-
-  private boolean checkConnection(long timeout) {
-    return isConnected() || handleConnectionInternally(key.addr, timeout);
-  }
 
   private InetSocketAddress resolveAddress(InetSocketAddress address) {
     if (address.isUnresolved()) {
@@ -117,83 +96,48 @@ public abstract class NettyClientBase implements Closeable 
{
     return address;
   }
 
-  private void connectUsingNetty(InetSocketAddress address, 
GenericFutureListener<ChannelFuture> listener) {
-    if (lastConnected > 0) {
-      LOG.warn("Try to reconnect : " + address);
-    }
-    this.channelFuture = 
bootstrap.clone().group(RpcChannelFactory.getSharedClientEventloopGroup())
-            .connect(address)
-            .addListener(listener);
+  private ChannelFuture doConnect(SocketAddress address) {
+    return this.channelFuture = bootstrap.clone().connect(address);
   }
 
-  // first attendant kicks connection
-  private final RpcUtils.Scrutineer<CountDownLatch> connect = new 
RpcUtils.Scrutineer<CountDownLatch>();
-
-  private boolean handleConnectionInternally(final InetSocketAddress addr, 
long timeout) {
-    final CountDownLatch ticket = new CountDownLatch(1);
-    final CountDownLatch granted = connect.check(ticket);
 
-    // basically, it's double checked lock
-    if (ticket == granted && isConnected()) {
-      granted.countDown();
-      return true;
-    }
+  public synchronized void connect() throws ConnectTimeoutException {
+    if (isConnected()) return;
 
-    if (ticket == granted) {
-      InetSocketAddress address = resolveAddress(addr);
-      connectUsingNetty(address, new RetryConnectionListener(address, 
granted));
-    }
-
-    try {
-      granted.await(timeout, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      // ignore
+    final AtomicInteger retries = new AtomicInteger();
+    InetSocketAddress address = key.addr;
+    if (address.isUnresolved()) {
+      address = resolveAddress(address);
     }
 
-    boolean success = channelFuture.isSuccess();
+    /* do not call await() inside handler */
+    ChannelFuture f = doConnect(address).awaitUninterruptibly();
+    retries.incrementAndGet();
 
-    if (granted.getCount() == 0) {
-      connect.clear(granted);
+    if (!f.isSuccess() && numRetries > 0) {
+      doReconnect(address, f, retries);
     }
-
-    return success;
   }
 
-  class RetryConnectionListener implements 
GenericFutureListener<ChannelFuture> {
-    private final AtomicInteger retryCount = new AtomicInteger();
-    private final InetSocketAddress address;
-    private final CountDownLatch latch;
-
-    RetryConnectionListener(InetSocketAddress address, CountDownLatch latch) {
-      this.address = address;
-      this.latch = latch;
-    }
-
-    @Override
-    public void operationComplete(ChannelFuture channelFuture) throws 
Exception {
-      if (!channelFuture.isSuccess()) {
-        channelFuture.channel().close();
-
-        if (numRetries > retryCount.getAndIncrement()) {
+  private void doReconnect(final InetSocketAddress address, ChannelFuture 
future, AtomicInteger retries)
+      throws ConnectTimeoutException {
 
-          RpcChannelFactory.getSharedClientEventloopGroup().schedule(new 
Runnable() {
-            @Override
-            public void run() {
-              connectUsingNetty(address, RetryConnectionListener.this);
-            }
-          }, PAUSE, TimeUnit.MILLISECONDS);
+    for (; ; ) {
+      if (numRetries >= retries.getAndIncrement()) {
 
-          LOG.debug("Connecting to " + address + " has been failed. Retrying 
to connect.");
+        LOG.warn(future.cause().getMessage() + " Try to reconnect");
+        try {
+          Thread.sleep(PAUSE);
+        } catch (InterruptedException e) {
         }
-        else {
-          latch.countDown();
 
-          LOG.error("Max retry count has been exceeded. attempts=" + 
numRetries);
+        this.channelFuture = doConnect(address).awaitUninterruptibly();
+        if (this.channelFuture.isDone() && this.channelFuture.isSuccess()) {
+          break;
         }
-      }
-      else {
-        latch.countDown();
-        lastConnected = System.currentTimeMillis();
+      } else {
+        throw new ConnectTimeoutException("Max retry count has been exceeded. 
attempts=" + numRetries
+            + " caused by: " + future.cause());
       }
     }
   }
@@ -217,7 +161,7 @@ public abstract class NettyClientBase implements Closeable {
     Channel channel = getChannel();
     if (channel != null && channel.isOpen()) {
       LOG.debug("Proxy will be disconnected from remote " + 
channel.remoteAddress());
-      channel.close();
+      channel.close().awaitUninterruptibly();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
index 6a340dc..74eb650 100644
--- 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
+++ 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.rpc;
 
+import com.google.protobuf.MessageLite;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
@@ -26,16 +27,21 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder;
 import io.netty.handler.codec.protobuf.ProtobufEncoder;
 import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
 import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
-
-import com.google.protobuf.MessageLite;
+import io.netty.handler.timeout.IdleStateHandler;
 
 class ProtoChannelInitializer extends ChannelInitializer<Channel> {
   private final MessageLite defaultInstance;
   private final ChannelHandler handler;
+  private final int idleTimeSeconds;
 
   public ProtoChannelInitializer(ChannelHandler handler, MessageLite 
defaultInstance) {
+    this(handler, defaultInstance, 0);
+  }
+
+  public ProtoChannelInitializer(ChannelHandler handler, MessageLite 
defaultInstance, int idleTimeSeconds) {
     this.handler = handler;
     this.defaultInstance = defaultInstance;
+    this.idleTimeSeconds = idleTimeSeconds;
   }
 
   @Override
@@ -45,6 +51,7 @@ class ProtoChannelInitializer extends 
ChannelInitializer<Channel> {
     pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
     pipeline.addLast("frameEncoder", new 
ProtobufVarint32LengthFieldPrepender());
     pipeline.addLast("protobufEncoder", new ProtobufEncoder());
+    pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, 
idleTimeSeconds)); //zero is disabling
     pipeline.addLast("handler", handler);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
new file mode 100644
index 0000000..f05fb97
--- /dev/null
+++ 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import io.netty.channel.ConnectTimeoutException;
+import io.netty.util.internal.logging.CommonsLoggerFactory;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@ThreadSafe
+public class RpcClientManager {
+  private static final Log LOG = LogFactory.getLog(RpcClientManager.class);
+
+  public static final int RPC_RETRIES = 3;
+
+  /* If all requests is done and client is idle state, client will be removed. 
*/
+  public static final int RPC_IDLE_TIMEOUT = 43200; // 12 hour
+
+  /* entries will be removed by ConnectionCloseFutureListener */
+  private static final Map<RpcConnectionKey, NettyClientBase>
+      clients = Collections.synchronizedMap(new HashMap<RpcConnectionKey, 
NettyClientBase>());
+
+  private static RpcClientManager instance;
+
+  static {
+    InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory());
+    instance = new RpcClientManager();
+  }
+
+  private RpcClientManager() {
+  }
+
+  public static RpcClientManager getInstance() {
+    return instance;
+  }
+
+  private NettyClientBase makeClient(RpcConnectionKey rpcConnectionKey)
+      throws NoSuchMethodException, ClassNotFoundException, 
ConnectTimeoutException {
+    NettyClientBase client;
+    if (rpcConnectionKey.asyncMode) {
+      client = new AsyncRpcClient(rpcConnectionKey, RPC_RETRIES, 
RPC_IDLE_TIMEOUT);
+    } else {
+      client = new BlockingRpcClient(rpcConnectionKey, RPC_RETRIES, 
RPC_IDLE_TIMEOUT);
+    }
+    return client;
+  }
+
+  /**
+   * Connect a {@link NettyClientBase} to the remote {@link NettyServerBase}, 
and returns rpc client by protocol.
+   * This client will be shared per protocol and address. Client is removed in 
shared map when a client is closed
+   * @param addr
+   * @param protocolClass
+   * @param asyncMode
+   * @return
+   * @throws NoSuchMethodException
+   * @throws ClassNotFoundException
+   * @throws ConnectTimeoutException
+   */
+  public NettyClientBase getClient(InetSocketAddress addr,
+                                   Class<?> protocolClass, boolean asyncMode)
+      throws NoSuchMethodException, ClassNotFoundException, 
ConnectTimeoutException {
+    RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, 
asyncMode);
+
+    NettyClientBase client;
+    synchronized (clients) {
+      client = clients.get(key);
+      if (client == null) {
+        clients.put(key, client = makeClient(key));
+      }
+    }
+
+    if (!client.isConnected()) {
+      client.connect();
+      client.getChannel().closeFuture().addListener(new 
ConnectionCloseFutureListener(key));
+    }
+    assert client.isConnected();
+    return client;
+  }
+
+  /**
+   * Request to close this clients
+   * After it is closed, it is removed from clients map.
+   */
+  public static void close() {
+    LOG.info("Closing RPC client manager");
+
+    for (NettyClientBase eachClient : clients.values()) {
+      try {
+        eachClient.close();
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
+  }
+
+  /**
+   * Close client manager and shutdown Netty RPC worker pool
+   * After it is shutdown it is not possible to reuse it again.
+   */
+  public static void shutdown() {
+    close();
+    RpcChannelFactory.shutdownGracefully();
+  }
+
+  protected static NettyClientBase remove(RpcConnectionKey key) {
+    LOG.debug("Removing shared rpc client :" + key);
+    return clients.remove(key);
+  }
+
+  protected static boolean contains(RpcConnectionKey key) {
+    return clients.containsKey(key);
+  }
+
+  public static void cleanup(NettyClientBase... clients) {
+    for (NettyClientBase client : clients) {
+      if (client != null) {
+        try {
+          client.close();
+        } catch (Exception e) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Exception in closing " + client.getKey(), e);
+          }
+        }
+      }
+    }
+  }
+
+  static class RpcConnectionKey {
+    final InetSocketAddress addr;
+    final Class<?> protocolClass;
+    final boolean asyncMode;
+
+    final String description;
+
+    public RpcConnectionKey(InetSocketAddress addr,
+                            Class<?> protocolClass, boolean asyncMode) {
+      this.addr = addr;
+      this.protocolClass = protocolClass;
+      this.asyncMode = asyncMode;
+      this.description = "[" + protocolClass + "] " + addr + "," + asyncMode;
+    }
+
+    @Override
+    public String toString() {
+      return description;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof RpcConnectionKey)) {
+        return false;
+      }
+
+      return toString().equals(obj.toString());
+    }
+
+    @Override
+    public int hashCode() {
+      return description.hashCode();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
deleted file mode 100644
index b0ff910..0000000
--- 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import io.netty.channel.ConnectTimeoutException;
-import io.netty.util.internal.logging.CommonsLoggerFactory;
-import io.netty.util.internal.logging.InternalLoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-
-public class RpcConnectionPool {
-  private static final Log LOG = LogFactory.getLog(RpcConnectionPool.class);
-
-  private Map<RpcConnectionKey, NettyClientBase> connections =
-      new HashMap<RpcConnectionKey, NettyClientBase>();
-
-  private static RpcConnectionPool instance;
-  private final Object lockObject = new Object();
-
-  public final static int RPC_RETRIES = 3;
-
-  private RpcConnectionPool() {
-  }
-
-  public synchronized static RpcConnectionPool getPool() {
-    if(instance == null) {
-      InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory());
-      instance = new RpcConnectionPool();
-    }
-    return instance;
-  }
-
-  private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey)
-      throws NoSuchMethodException, ClassNotFoundException, 
ConnectTimeoutException {
-    NettyClientBase client;
-    if(rpcConnectionKey.asyncMode) {
-      client = new AsyncRpcClient(rpcConnectionKey, RPC_RETRIES);
-    } else {
-      client = new BlockingRpcClient(rpcConnectionKey, RPC_RETRIES);
-    }
-    return client;
-  }
-
-  public static final long DEFAULT_TIMEOUT = 3000;
-  public static final long DEFAULT_INTERVAL = 500;
-
-  public NettyClientBase getConnection(InetSocketAddress addr,
-                                       Class<?> protocolClass, boolean 
asyncMode)
-      throws NoSuchMethodException, ClassNotFoundException, 
ConnectTimeoutException {
-    return getConnection(addr, protocolClass, asyncMode, DEFAULT_TIMEOUT, 
DEFAULT_INTERVAL);
-  }
-
-  public NettyClientBase getConnection(InetSocketAddress addr,
-      Class<?> protocolClass, boolean asyncMode, long timeout, long interval)
-      throws NoSuchMethodException, ClassNotFoundException, 
ConnectTimeoutException {
-    RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, 
asyncMode);
-
-    RpcUtils.Timer timer = new RpcUtils.Timer(timeout);
-    for (; !timer.isTimedOut(); timer.elapsed()) {
-      NettyClientBase client;
-      synchronized (lockObject) {
-        client = connections.get(key);
-        if (client == null) {
-          connections.put(key, client = makeConnection(key));
-        }
-      }
-      if (client.acquire(timer.remaining())) {
-        return client;
-      }
-      timer.interval(interval);
-    }
-
-    throw new ConnectTimeoutException("Failed to get connection for " + 
timeout + " msec");
-  }
-
-  public void releaseConnection(NettyClientBase client) {
-    if (client != null) {
-      release(client, false);
-    }
-  }
-
-  public void closeConnection(NettyClientBase client) {
-    if (client != null) {
-      release(client, true);
-    }
-  }
-
-  private void release(NettyClientBase client, boolean close) {
-    try {
-      if (returnToPool(client, close)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Closing connection [" + client.getKey() + "]");
-        }
-        client.close();
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Current Connections in pool [" + connections.size() + "]");
-      }
-    } catch (Exception e) {
-      LOG.error("Can't close connection:" + client.getKey() + ":" + 
e.getMessage(), e);
-    }
-  }
-
-  // return true if the connection should be closed
-  private boolean returnToPool(NettyClientBase client, boolean close) {
-    synchronized (lockObject) {
-      if (client.release() && (close || !client.isConnected())) {
-        connections.remove(client.getKey());
-        return true;
-      }
-    }
-    return false;
-  }
-
-  public void close() {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Pool Closed");
-    }
-
-    synchronized (lockObject) {
-      for (NettyClientBase eachClient : connections.values()) {
-        try {
-          eachClient.close();
-        } catch (Exception e) {
-          LOG.error("close client pool error", e);
-        }
-      }
-      connections.clear();
-    }
-  }
-
-  public void shutdown(){
-    close();
-    RpcChannelFactory.shutdownGracefully();
-  }
-
-  static class RpcConnectionKey {
-    final InetSocketAddress addr;
-    final Class<?> protocolClass;
-    final boolean asyncMode;
-
-    final String description;
-
-    public RpcConnectionKey(InetSocketAddress addr,
-                            Class<?> protocolClass, boolean asyncMode) {
-      this.addr = addr;
-      this.protocolClass = protocolClass;
-      this.asyncMode = asyncMode;
-      this.description = "["+ protocolClass + "] " + addr + "," + asyncMode;
-    }
-
-    @Override
-    public String toString() {
-      return description;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if(!(obj instanceof RpcConnectionKey)) {
-        return false;
-      }
-
-      return toString().equals(obj.toString());
-    }
-
-    @Override
-    public int hashCode() {
-      return description.hashCode();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java
 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java
index fb1cec2..2804a03 100644
--- 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java
+++ 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java
@@ -18,13 +18,11 @@
 
 package org.apache.tajo.rpc;
 
+import com.google.protobuf.ServiceException;
+
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.protobuf.ServiceException;
 
 public abstract class ServerCallable<T> {
   protected InetSocketAddress addr;
@@ -33,21 +31,16 @@ public abstract class ServerCallable<T> {
   protected Class<?> protocol;
   protected boolean asyncMode;
   protected boolean closeConn;
-  protected RpcConnectionPool connPool;
+  protected RpcClientManager manager;
 
   public abstract T call(NettyClientBase client) throws Exception;
 
-  public ServerCallable(RpcConnectionPool connPool,  InetSocketAddress addr, 
Class<?> protocol, boolean asyncMode) {
-    this(connPool, addr, protocol, asyncMode, false);
-  }
-
-  public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, 
Class<?> protocol,
-                        boolean asyncMode, boolean closeConn) {
-    this.connPool = connPool;
+  public ServerCallable(RpcClientManager manager, InetSocketAddress addr, 
Class<?> protocol,
+                        boolean asyncMode) {
+    this.manager = manager;
     this.addr = addr;
     this.protocol = protocol;
     this.asyncMode = asyncMode;
-    this.closeConn = closeConn;
   }
 
   public void beforeCall() {
@@ -74,26 +67,24 @@ public abstract class ServerCallable<T> {
    * Run this instance with retries, timed waits,
    * and refinds of missing regions.
    *
-   * @param <T> the type of the return value
    * @return an object of type T
    * @throws com.google.protobuf.ServiceException if a remote or network 
exception occurs
    */
+
   public T withRetries() throws ServiceException {
     //TODO configurable
     final long pause = 500; //ms
     final int numRetries = 3;
-    List<Throwable> exceptions = new ArrayList<Throwable>();
 
     for (int tries = 0; tries < numRetries; tries++) {
       NettyClientBase client = null;
       try {
         beforeCall();
         if(addr != null) {
-          client = connPool.getConnection(addr, protocol, asyncMode);
+          client = manager.getClient(addr, protocol, asyncMode);
         }
         return call(client);
       } catch (IOException ioe) {
-        exceptions.add(ioe);
         if(abort) {
           throw new ServiceException(ioe.getMessage(), ioe);
         }
@@ -105,9 +96,7 @@ public abstract class ServerCallable<T> {
       } finally {
         afterCall();
         if(closeConn) {
-          connPool.closeConnection(client);
-        } else {
-          connPool.releaseConnection(client);
+          RpcClientManager.cleanup(client);
         }
       }
       try {
@@ -122,7 +111,6 @@ public abstract class ServerCallable<T> {
 
   /**
    * Run this instance against the server once.
-   * @param <T> the type of the return value
    * @return an object of type T
    * @throws java.io.IOException if a remote or network exception occurs
    * @throws RuntimeException other unspecified error
@@ -131,7 +119,7 @@ public abstract class ServerCallable<T> {
     NettyClientBase client = null;
     try {
       beforeCall();
-      client = connPool.getConnection(addr, protocol, asyncMode);
+      client = manager.getClient(addr, protocol, asyncMode);
       return call(client);
     } catch (Throwable t) {
       Throwable t2 = translateException(t);
@@ -143,9 +131,7 @@ public abstract class ServerCallable<T> {
     } finally {
       afterCall();
       if(closeConn) {
-        connPool.closeConnection(client);
-      } else {
-        connPool.releaseConnection(client);
+        RpcClientManager.cleanup(client);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
 
b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
index a974a65..68f170c 100644
--- 
a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
+++ 
b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.rpc;
 
 import com.google.protobuf.RpcCallback;
+import io.netty.channel.ConnectTimeoutException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.rpc.test.DummyProtocol;
@@ -123,12 +124,12 @@ public class TestAsyncRpc {
   public void setUpRpcClient() throws Exception {
     retries = 1;
 
-    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
-          new RpcConnectionPool.RpcConnectionKey(
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+          new RpcClientManager.RpcConnectionKey(
               RpcUtils.getConnectAddress(server.getListenAddress()),
               DummyProtocol.class, true);
     client = new AsyncRpcClient(rpcConnectionKey, retries);
-    client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT);
+    client.connect();
     stub = client.getStub();
   }
 
@@ -298,10 +299,11 @@ public class TestAsyncRpc {
     });
     serverThread.start();
 
-    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
-          new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, 
true);
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+          new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, 
true);
     client = new AsyncRpcClient(rpcConnectionKey, retries);
-    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
+    client.connect();
+    assertTrue(client.isConnected());
     stub = client.getStub();
     stub.echo(future.getController(), echoMessage, future);
 
@@ -313,25 +315,32 @@ public class TestAsyncRpc {
   @Test
   public void testConnectionFailure() throws Exception {
     InetSocketAddress address = new InetSocketAddress("test", 0);
+    boolean expected = false;
     try {
-      RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
-          new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, 
true);
+      RpcClientManager.RpcConnectionKey rpcConnectionKey =
+          new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, 
true);
       NettyClientBase client = new AsyncRpcClient(rpcConnectionKey, retries);
-      assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
+      client.connect();
+      fail();
+    } catch (ConnectTimeoutException e) {
+      expected = true;
     } catch (Throwable throwable) {
       fail();
     }
+    assertTrue(expected);
+
   }
 
   @Test
   @SetupRpcConnection(setupRpcClient=false)
   public void testUnresolvedAddress() throws Exception {
     String hostAndPort = 
RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
-    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
-          new RpcConnectionPool.RpcConnectionKey(
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+          new RpcClientManager.RpcConnectionKey(
               RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, 
true);
     client = new AsyncRpcClient(rpcConnectionKey, retries);
-    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
+    client.connect();
+    assertTrue(client.isConnected());
     Interface stub = client.getStub();
     EchoMessage echoMessage = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
@@ -342,4 +351,43 @@ public class TestAsyncRpc {
     assertEquals(future.get(), echoMessage);
     assertTrue(future.isDone());
   }
+
+  @Test
+  public void testIdleTimeout() throws Exception {
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), 
DummyProtocol.class, true);
+    AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries, 1); 
//1 sec idle timeout
+    client.connect();
+    assertTrue(client.isConnected());
+
+    Thread.sleep(2000);
+    assertFalse(client.isConnected());
+
+    client.connect(); // try to reconnect
+    assertTrue(client.isConnected());
+    client.close();
+    assertFalse(client.isConnected());
+  }
+
+  @Test
+  public void testIdleTimeoutWithActiveRequest() throws Exception {
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), 
DummyProtocol.class, true);
+    AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries, 1); 
//1 sec idle timeout
+    client.connect();
+
+    assertTrue(client.isConnected());
+    Interface stub = client.getStub();
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+    stub.deley(null, echoMessage, future); //3 sec delay
+
+    assertFalse(future.isDone());
+    assertEquals(future.get(), echoMessage);
+    assertTrue(future.isDone());
+
+    Thread.sleep(2000);
+    assertFalse(client.isConnected());
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
 
b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 10dd766..c114985 100644
--- 
a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ 
b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.rpc;
 
+import io.netty.channel.ConnectTimeoutException;
 import org.apache.tajo.rpc.test.DummyProtocol;
 import 
org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface;
 import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
@@ -115,12 +116,13 @@ public class TestBlockingRpc {
   public void setUpRpcClient() throws Exception {
     retries = 1;
 
-    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
-          new RpcConnectionPool.RpcConnectionKey(
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+          new RpcClientManager.RpcConnectionKey(
               RpcUtils.getConnectAddress(server.getListenAddress()),
               DummyProtocol.class, false);
     client = new BlockingRpcClient(rpcConnectionKey, retries);
-    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
+    client.connect();
+    assertTrue(client.isConnected());
     stub = client.getStub();
   }
 
@@ -162,7 +164,7 @@ public class TestBlockingRpc {
   @Test
   @SetupRpcConnection(setupRpcClient=false)
   public void testRpcWithServiceCallable() throws Exception {
-    RpcConnectionPool pool = RpcConnectionPool.getPool();
+    RpcClientManager manager = RpcClientManager.getInstance();
     final SumRequest request = SumRequest.newBuilder()
         .setX1(1)
         .setX2(2)
@@ -170,7 +172,7 @@ public class TestBlockingRpc {
         .setX4(2.0f).build();
 
     SumResponse response =
-    new ServerCallable<SumResponse>(pool,
+    new ServerCallable<SumResponse>(manager,
         server.getListenAddress(), DummyProtocol.class, false) {
       @Override
       public SumResponse call(NettyClientBase client) throws Exception {
@@ -183,7 +185,7 @@ public class TestBlockingRpc {
     assertEquals(8.15d, response.getResult(), 1e-15);
 
     response =
-        new ServerCallable<SumResponse>(pool,
+        new ServerCallable<SumResponse>(manager,
             server.getListenAddress(), DummyProtocol.class, false) {
           @Override
           public SumResponse call(NettyClientBase client) throws Exception {
@@ -194,7 +196,7 @@ public class TestBlockingRpc {
         }.withoutRetries();
 
     assertTrue(8.15d == response.getResult());
-    pool.close();
+    RpcClientManager.close();
   }
 
   @Test
@@ -241,10 +243,11 @@ public class TestBlockingRpc {
     });
     serverThread.start();
 
-    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
-          new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, 
false);
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+          new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, 
false);
     client = new BlockingRpcClient(rpcConnectionKey, retries);
-    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
+    client.connect();
+    assertTrue(client.isConnected());
     stub = client.getStub();
 
     EchoMessage response = stub.echo(null, message);
@@ -254,22 +257,22 @@ public class TestBlockingRpc {
   @Test
   public void testConnectionFailed() throws Exception {
     NettyClientBase client = null;
-    
+    boolean expected = false;
     try {
       int port = server.getListenAddress().getPort() + 1;
-      RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
-          new RpcConnectionPool.RpcConnectionKey(
+      RpcClientManager.RpcConnectionKey rpcConnectionKey =
+          new RpcClientManager.RpcConnectionKey(
               RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", 
port)),
               DummyProtocol.class, false);
       client = new BlockingRpcClient(rpcConnectionKey, retries);
-      assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
-      client.close();
-    } catch (Throwable ce){
-      if (client != null) {
-        client.close();
-      }
+      client.connect();
       fail();
+    } catch (ConnectTimeoutException e) {
+      expected = true;
+    } catch (Throwable e) {
+      fail(e.getMessage());
     }
+    assertTrue(expected);
   }
 
   @Test
@@ -334,11 +337,12 @@ public class TestBlockingRpc {
   @SetupRpcConnection(setupRpcClient=false)
   public void testUnresolvedAddress() throws Exception {
     String hostAndPort = 
RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
-    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
-          new RpcConnectionPool.RpcConnectionKey(
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+          new RpcClientManager.RpcConnectionKey(
               RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, 
false);
     client = new BlockingRpcClient(rpcConnectionKey, retries);
-    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
+    client.connect();
+    assertTrue(client.isConnected());
     BlockingInterface stub = client.getStub();
 
     EchoMessage message = EchoMessage.newBuilder()
@@ -346,4 +350,41 @@ public class TestBlockingRpc {
     EchoMessage response2 = stub.echo(null, message);
     assertEquals(MESSAGE, response2.getMessage());
   }
+
+  @Test
+  public void testIdleTimeout() throws Exception {
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), 
DummyProtocol.class, false);
+    BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, 
retries, 1); //1 sec idle timeout
+    client.connect();
+    assertTrue(client.isConnected());
+
+    Thread.sleep(2000);
+    assertFalse(client.isConnected());
+
+    client.connect(); // try to reconnect
+    assertTrue(client.isConnected());
+    client.close();
+    assertFalse(client.isConnected());
+  }
+
+  @Test
+  public void testIdleTimeoutWithActiveRequest() throws Exception {
+    RpcClientManager.RpcConnectionKey rpcConnectionKey =
+        new RpcClientManager.RpcConnectionKey(server.getListenAddress(), 
DummyProtocol.class, false);
+    BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, 
retries, 1); //1 sec idle timeout
+
+    client.connect();
+
+    assertTrue(client.isConnected());
+    BlockingInterface stub = client.getStub();
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+
+    EchoMessage message = stub.deley(null, echoMessage); //3 sec delay
+    assertEquals(message, echoMessage);
+
+    Thread.sleep(2000);
+    assertFalse(client.isConnected());
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/7b78668b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
 
b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
new file mode 100644
index 0000000..5f86518
--- /dev/null
+++ 
b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import org.apache.tajo.rpc.test.DummyProtocol;
+import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestRpcClientManager {
+
+  @Test
+  public void testRaceCondition() throws Exception {
+    final int parallelCount = 50;
+    final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
+    NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
+        service, new InetSocketAddress("127.0.0.1", 0), parallelCount);
+    server.start();
+
+    final InetSocketAddress address = server.getListenAddress();
+    final RpcClientManager manager = RpcClientManager.getInstance();
+
+    ExecutorService executor = Executors.newFixedThreadPool(parallelCount);
+    List<Future> tasks = new ArrayList<Future>();
+    for (int i = 0; i < parallelCount; i++) {
+      tasks.add(executor.submit(new Runnable() {
+            @Override
+            public void run() {
+              NettyClientBase client = null;
+              try {
+                client = manager.getClient(address, DummyProtocol.class, 
false);
+              } catch (Throwable e) {
+                fail(e.getMessage());
+              }
+              assertTrue(client.isConnected());
+            }
+          })
+      );
+    }
+
+    for (Future future : tasks) {
+      future.get();
+    }
+
+    NettyClientBase clientBase = manager.getClient(address, 
DummyProtocol.class, false);
+    RpcClientManager.cleanup(clientBase);
+    server.shutdown();
+    executor.shutdown();
+  }
+
+  @Test
+  public void testCloseFuture() throws Exception {
+    final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl();
+    NettyServerBase server = new AsyncRpcServer(DummyProtocol.class,
+        service, new InetSocketAddress("127.0.0.1", 0), 3);
+    server.start();
+
+    final RpcClientManager manager = RpcClientManager.getInstance();
+
+    NettyClientBase client = manager.getClient(server.getListenAddress(), 
DummyProtocol.class, true);
+    assertTrue(client.isConnected());
+    assertTrue(client.getChannel().isWritable());
+
+    RpcClientManager.RpcConnectionKey key = client.getKey();
+    assertTrue(RpcClientManager.contains(key));
+
+    client.close();
+    assertFalse(RpcClientManager.contains(key));
+    server.shutdown();
+  }
+}
\ No newline at end of file

Reply via email to