Repository: hbase Updated Branches: refs/heads/branch-1 e1f972170 -> d0af30ea3
HBASE-13093 Local mode HBase instance doesn't shut down. Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d0af30ea Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d0af30ea Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d0af30ea Branch: refs/heads/branch-1 Commit: d0af30ea3cc33c970c323917d6d522b4536071da Parents: e1f9721 Author: Andrey Stepachev <oct...@gmail.com> Authored: Tue Mar 17 14:47:11 2015 +0000 Committer: Andrey Stepachev <oct...@gmail.com> Committed: Tue Mar 17 14:47:11 2015 +0000 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java | 8 ++++---- .../java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java | 11 +++++++++-- 2 files changed, 13 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d0af30ea/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java index 8414290..3fb547b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java @@ -262,7 +262,7 @@ public class AsyncRpcChannel { handleSaslConnectionFailure(retryCount, cause, realTicket); // Try to reconnect - AsyncRpcClient.WHEEL_TIMER.newTimeout(new TimerTask() { + client.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { connect(bootstrap); @@ -289,7 +289,7 @@ public class AsyncRpcChannel { */ private void retryOrClose(final Bootstrap bootstrap, int connectCounter, Throwable e) { if (connectCounter < client.maxRetries) { - AsyncRpcClient.WHEEL_TIMER.newTimeout(new TimerTask() { + client.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { connect(bootstrap); } @@ -339,7 +339,7 @@ public class AsyncRpcChannel { // Add timeout for cleanup if none is present if (cleanupTimer == null && call.getRpcTimeout() > 0) { cleanupTimer = - AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, call.getRpcTimeout(), + client.newTimeout(timeoutTask, call.getRpcTimeout(), TimeUnit.MILLISECONDS); } if (!connected) { @@ -601,7 +601,7 @@ public class AsyncRpcChannel { } if (nextCleanupTaskDelay > 0) { cleanupTimer = - AsyncRpcClient.WHEEL_TIMER.newTimeout(timeoutTask, nextCleanupTaskDelay, + client.newTimeout(timeoutTask, nextCleanupTaskDelay, TimeUnit.MILLISECONDS); } else { cleanupTimer = null; http://git-wip-us.apache.org/repos/asf/hbase/blob/d0af30ea/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java index 3bf9aad..e55a7eb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java @@ -28,6 +28,8 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; @@ -70,8 +72,9 @@ public class AsyncRpcClient extends AbstractRpcClient { public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.nativetransport"; public static final String USE_GLOBAL_EVENT_LOOP_GROUP = "hbase.rpc.client.globaleventloopgroup"; - public static final HashedWheelTimer WHEEL_TIMER = - new HashedWheelTimer(100, TimeUnit.MILLISECONDS); + private static final HashedWheelTimer WHEEL_TIMER = + new HashedWheelTimer(Threads.newDaemonThreadFactory("AsyncRpcChannel-timer"), + 100, TimeUnit.MILLISECONDS); private static final ChannelInitializer<SocketChannel> DEFAULT_CHANNEL_INITIALIZER = new ChannelInitializer<SocketChannel>() { @@ -458,4 +461,8 @@ public class AsyncRpcClient extends AbstractRpcClient { this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done); } } + + Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { + return WHEEL_TIMER.newTimeout(task, delay, unit); + } }