Repository: hbase Updated Branches: refs/heads/master a7f9668c3 -> e0eea94c9
HBASE-19772 ReadOnlyZKClient improvements Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/70515f53 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/70515f53 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/70515f53 Branch: refs/heads/master Commit: 70515f53112599997348aee1d748838f7a78a7fd Parents: a7f9668 Author: Duo Zhang <palomino...@gmail.com> Authored: Thu Jan 11 11:33:36 2018 -0800 Committer: Michael Stack <st...@apache.org> Committed: Thu Jan 11 11:37:27 2018 -0800 ---------------------------------------------------------------------- .../hbase/zookeeper/ReadOnlyZKClient.java | 58 ++++++++++++-------- 1 file changed, 35 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/70515f53/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java index 82c011b..0a9544d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -116,6 +117,8 @@ public final class ReadOnlyZKClient implements Closeable { private ZooKeeper zookeeper; + private int pendingRequests = 0; + private String getId() { return String.format("0x%08x", System.identityHashCode(this)); } @@ -127,12 +130,12 @@ public final class ReadOnlyZKClient implements Closeable { this.retryIntervalMs = conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS); this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS); - LOG.info("Start read only zookeeper connection " + getId() + " to " + connectString + - ", session timeout " + sessionTimeoutMs + " ms, retries " + maxRetries + - ", retry interval " + retryIntervalMs + " ms, keep alive " + keepAliveTimeMs + " ms"); - Thread t = new Thread(this::run, "ReadOnlyZKClient"); - t.setDaemon(true); - t.start(); + LOG.info( + "Start read only zookeeper connection {} to {}, " + "session timeout {} ms, retries {}, " + + "retry interval {} ms, keep alive {} ms", + getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs); + Threads.setDaemonThreadRunning(new Thread(this::run), + "ReadOnlyZKClient-" + connectString + "@" + getId()); } private abstract class ZKTask<T> extends Task { @@ -156,6 +159,7 @@ public final class ReadOnlyZKClient implements Closeable { @Override public void exec(ZooKeeper alwaysNull) { + pendingRequests--; Code code = Code.get(rc); if (code == Code.OK) { future.complete(ret); @@ -169,19 +173,19 @@ public final class ReadOnlyZKClient implements Closeable { future.completeExceptionally(KeeperException.create(code, path)); } else { if (code == Code.SESSIONEXPIRED) { - LOG.warn(getId() + " session expired, close and reconnect"); + LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString); try { zk.close(); } catch (InterruptedException e) { } } if (ZKTask.this.delay(retryIntervalMs, maxRetries)) { - LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " + - code + ", retries = " + ZKTask.this.retries); + LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(), + connectString, operationType, path, code, ZKTask.this.retries); tasks.add(ZKTask.this); } else { - LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " + - code + ", retries = " + ZKTask.this.retries + ", give up"); + LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(), + connectString, operationType, path, code, ZKTask.this.retries); future.completeExceptionally(KeeperException.create(code, path)); } } @@ -205,6 +209,14 @@ public final class ReadOnlyZKClient implements Closeable { return true; } + protected abstract void doExec(ZooKeeper zk); + + @Override + public final void exec(ZooKeeper zk) { + pendingRequests++; + doExec(zk); + } + public boolean delay(long intervalMs, int maxRetries) { if (retries >= maxRetries) { return false; @@ -217,14 +229,12 @@ public final class ReadOnlyZKClient implements Closeable { @Override public void connectFailed(IOException e) { if (delay(retryIntervalMs, maxRetries)) { - LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path + - ", retries = " + retries, - e); + LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(), + connectString, operationType, path, retries, e); tasks.add(this); } else { - LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path + - ", retries = " + retries + ", give up", - e); + LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(), + connectString, operationType, path, retries, e); future.completeExceptionally(e); } } @@ -249,7 +259,7 @@ public final class ReadOnlyZKClient implements Closeable { tasks.add(new ZKTask<byte[]>(path, future, "get") { @Override - public void exec(ZooKeeper zk) { + protected void doExec(ZooKeeper zk) { zk.getData(path, false, (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), null); } @@ -265,7 +275,7 @@ public final class ReadOnlyZKClient implements Closeable { tasks.add(new ZKTask<Stat>(path, future, "exists") { @Override - public void exec(ZooKeeper zk) { + protected void doExec(ZooKeeper zk) { zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null); } }); @@ -311,9 +321,11 @@ public final class ReadOnlyZKClient implements Closeable { if (task == CLOSE) { break; } - if (task == null) { - LOG.info(getId() + " no activities for " + keepAliveTimeMs + - " ms, close active connection. Will reconnect next time when there are new requests."); + if (task == null && pendingRequests == 0) { + LOG.info( + "{} to {} no activities for {} ms, close active connection. " + + "Will reconnect next time when there are new requests", + getId(), connectString, keepAliveTimeMs); closeZk(); continue; } @@ -339,7 +351,7 @@ public final class ReadOnlyZKClient implements Closeable { @Override public void close() { if (closed.compareAndSet(false, true)) { - LOG.info("Close zookeeper connection " + getId() + " to " + connectString); + LOG.info("Close zookeeper connection {} to {}", getId(), connectString); tasks.add(CLOSE); } }