Repository: hbase
Updated Branches:
  refs/heads/master 3b6199a27 -> 061a31fad


HBASE-20159 Support using separate ZK quorums for client


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/061a31fa
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/061a31fa
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/061a31fa

Branch: refs/heads/master
Commit: 061a31fad1654d9ded96d118e04c14860413fa25
Parents: 3b6199a
Author: Yu Li <l...@apache.org>
Authored: Thu Mar 29 02:37:26 2018 +0800
Committer: Yu Li <l...@apache.org>
Committed: Thu Mar 29 02:37:26 2018 +0800

----------------------------------------------------------------------
 .../hbase/zookeeper/ReadOnlyZKClient.java       |   8 +-
 .../org/apache/hadoop/hbase/HConstants.java     |  17 +-
 .../apache/hadoop/hbase/zookeeper/ZKConfig.java |  24 +-
 .../org/apache/hadoop/hbase/master/HMaster.java |  47 +++-
 .../hbase/master/zksyncer/ClientZKSyncer.java   | 241 +++++++++++++++++
 .../master/zksyncer/MasterAddressSyncer.java    |  52 ++++
 .../master/zksyncer/MetaLocationSyncer.java     |  46 ++++
 .../hbase/regionserver/HRegionServer.java       |  14 +
 .../regionserver/ReplicationSink.java           |   4 +
 .../client/TestSeparateClientZKCluster.java     | 268 +++++++++++++++++++
 .../hbase/master/TestMasterNoCluster.java       |  37 +++
 .../hbase/zookeeper/MiniZooKeeperCluster.java   |   2 +-
 .../hadoop/hbase/zookeeper/ZKServerTool.java    |   2 +-
 .../apache/hadoop/hbase/zookeeper/ZKUtil.java   |   2 +-
 .../hadoop/hbase/zookeeper/ZKWatcher.java       |  37 ++-
 15 files changed, 781 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/061a31fa/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
index d2f4763..fc2d5f0 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
@@ -124,7 +124,13 @@ public final class ReadOnlyZKClient implements Closeable {
   }
 
   public ReadOnlyZKClient(Configuration conf) {
-    this.connectString = ZKConfig.getZKQuorumServersString(conf);
+    // We might use a different ZK for client access
+    String clientZkQuorumServers = 
ZKConfig.getClientZKQuorumServersString(conf);
+    if (clientZkQuorumServers != null) {
+      this.connectString = clientZkQuorumServers;
+    } else {
+      this.connectString = ZKConfig.getZKQuorumServersString(conf);
+    }
     this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, 
DEFAULT_ZK_SESSION_TIMEOUT);
     this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY);
     this.retryIntervalMs =

http://git-wip-us.apache.org/repos/asf/hbase/blob/061a31fa/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 9a43e7c..9241682 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -183,6 +183,19 @@ public final class HConstants {
   /** Name of ZooKeeper quorum configuration parameter. */
   public static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
 
+  /** Name of ZooKeeper quorum configuration parameter for client to locate 
meta. */
+  public static final String CLIENT_ZOOKEEPER_QUORUM = 
"hbase.client.zookeeper.quorum";
+
+  /** Client port of ZooKeeper for client to locate meta */
+  public static final String CLIENT_ZOOKEEPER_CLIENT_PORT =
+      "hbase.client.zookeeper.property.clientPort";
+
+  /** Indicate whether the client ZK are observer nodes of the server ZK */
+  public static final String CLIENT_ZOOKEEPER_OBSERVER_MODE =
+      "hbase.client.zookeeper.observer.mode";
+  /** Assuming client zk not in observer mode and master need to synchronize 
information */
+  public static final boolean DEFAULT_CLIENT_ZOOKEEPER_OBSERVER_MODE = false;
+
   /** Common prefix of ZooKeeper configuration properties */
   public static final String ZK_CFG_PROPERTY_PREFIX =
       "hbase.zookeeper.property.";
@@ -201,7 +214,7 @@ public final class HConstants {
       ZK_CFG_PROPERTY_PREFIX + CLIENT_PORT_STR;
 
   /** Default client port that the zookeeper listens on */
-  public static final int DEFAULT_ZOOKEPER_CLIENT_PORT = 2181;
+  public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181;
 
   /** Parameter name for the root dir in ZK for this cluster */
   public static final String ZOOKEEPER_ZNODE_PARENT = "zookeeper.znode.parent";
@@ -224,7 +237,7 @@ public final class HConstants {
       ZK_CFG_PROPERTY_PREFIX + "tickTime";
 
   /** Default limit on concurrent client-side zookeeper connections */
-  public static final int DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS = 300;
+  public static final int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 300;
 
   /** Configuration key for ZooKeeper session timeout */
   public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout";

http://git-wip-us.apache.org/repos/asf/hbase/blob/061a31fa/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
index 9891726..f324ec6 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
@@ -24,6 +24,7 @@ import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -85,7 +86,7 @@ public final class ZKConfig {
     // If clientPort is not set, assign the default.
     if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) {
       zkProperties.put(HConstants.CLIENT_PORT_STR,
-          HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+          HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT);
     }
 
     // Create the server.X properties.
@@ -119,7 +120,7 @@ public final class ZKConfig {
    */
   private static String getZKQuorumServersStringFromHbaseConfig(Configuration 
conf) {
     String defaultClientPort = Integer.toString(
-        conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 
HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT));
+        conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 
HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT));
 
     // Build the ZK quorum server string with "server:clientport" list, 
separated by ','
     final String[] serverHosts =
@@ -310,4 +311,23 @@ public final class ZKConfig {
       return znodeParent;
     }
   }
+
+  /**
+   * Get the client ZK Quorum servers string
+   * @param conf the configuration to read
+   * @return Client quorum servers, or null if not specified
+   */
+  public static String getClientZKQuorumServersString(Configuration conf) {
+    String clientQuromServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
+    if (clientQuromServers == null) {
+      return null;
+    }
+    int defaultClientPort =
+        conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 
HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT);
+    String clientZkClientPort =
+        Integer.toString(conf.getInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, 
defaultClientPort));
+    // Build the ZK quorum server string with "server:clientport" list, 
separated by ','
+    final String[] serverHosts = StringUtils.getStrings(clientQuromServers);
+    return buildZKQuorumServerString(serverHosts, clientZkClientPort);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/061a31fa/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index f5bd0de..9dd685d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -138,6 +138,8 @@ import 
org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
 import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
 import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
+import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -300,6 +302,10 @@ public class HMaster extends HRegionServer implements 
MasterServices {
   private DrainingServerTracker drainingServerTracker;
   // Tracker for load balancer state
   LoadBalancerTracker loadBalancerTracker;
+  // Tracker for meta location, if any client ZK quorum specified
+  MetaLocationSyncer metaLocationSyncer;
+  // Tracker for active master location, if any client ZK quorum specified
+  MasterAddressSyncer masterAddressSyncer;
 
   // Tracker for split and merge state
   private SplitOrMergeTracker splitOrMergeTracker;
@@ -556,18 +562,20 @@ public class HMaster extends HRegionServer implements 
MasterServices {
   public void run() {
     try {
       if (!conf.getBoolean("hbase.testing.nocluster", false)) {
-        try {
-          int infoPort = putUpJettyServer();
-          startActiveMasterManager(infoPort);
-        } catch (Throwable t) {
-          // Make sure we log the exception.
-          String error = "Failed to become Active Master";
-          LOG.error(error, t);
-          // Abort should have been called already.
-          if (!isAborted()) {
-            abort(error, t);
+        Threads.setDaemonThreadRunning(new Thread(() -> {
+          try {
+            int infoPort = putUpJettyServer();
+            startActiveMasterManager(infoPort);
+          } catch (Throwable t) {
+            // Make sure we log the exception.
+            String error = "Failed to become Active Master";
+            LOG.error(error, t);
+            // Abort should have been called already.
+            if (!isAborted()) {
+              abort(error, t);
+            }
           }
-        }
+        }));
       }
       // Fall in here even if we have been aborted. Need to run the shutdown 
services and
       // the super run call will do this for us.
@@ -749,6 +757,23 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     this.maintenanceModeTracker = new MasterMaintenanceModeTracker(zooKeeper);
     this.maintenanceModeTracker.start();
 
+    String clientQuorumServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
+    boolean clientZkObserverMode = 
conf.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE,
+      HConstants.DEFAULT_CLIENT_ZOOKEEPER_OBSERVER_MODE);
+    if (clientQuorumServers != null && !clientZkObserverMode) {
+      // we need to take care of the ZK information synchronization
+      // if given client ZK are not observer nodes
+      ZKWatcher clientZkWatcher = new ZKWatcher(conf,
+          getProcessName() + ":" + rpcServices.getSocketAddress().getPort() + 
"-clientZK", this,
+          false, true);
+      this.metaLocationSyncer = new MetaLocationSyncer(zooKeeper, 
clientZkWatcher, this);
+      this.metaLocationSyncer.start();
+      this.masterAddressSyncer = new MasterAddressSyncer(zooKeeper, 
clientZkWatcher, this);
+      this.masterAddressSyncer.start();
+      // set cluster id is a one-go effort
+      ZKClusterId.setClusterId(clientZkWatcher, 
fileSystemManager.getClusterId());
+    }
+
     // Set the cluster as up.  If new RSs, they'll be waiting on this before
     // going ahead with their startup.
     boolean wasUp = this.clusterStatusTracker.isClusterUp();

http://git-wip-us.apache.org/repos/asf/hbase/blob/061a31fa/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/ClientZKSyncer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/ClientZKSyncer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/ClientZKSyncer.java
new file mode 100644
index 0000000..8f735bd
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/ClientZKSyncer.java
@@ -0,0 +1,241 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.zksyncer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKListener;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Tracks the target znode(s) on server ZK cluster and synchronize them to 
client ZK cluster if
+ * changed
+ * <p/>
+ * The target znode(s) is given through {@link #getNodesToWatch()} method
+ */
+@InterfaceAudience.Private
+public abstract class ClientZKSyncer extends ZKListener {
+  private static final Log LOG = LogFactory.getLog(ClientZKSyncer.class);
+  private final Server server;
+  private final ZKWatcher clientZkWatcher;
+  // We use queues and daemon threads to synchronize the data to client ZK 
cluster
+  // to avoid blocking the single event thread for watchers
+  private final Map<String, BlockingQueue<byte[]>> queues;
+
+  public ClientZKSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, Server 
server) {
+    super(watcher);
+    this.server = server;
+    this.clientZkWatcher = clientZkWatcher;
+    this.queues = new HashMap<>();
+  }
+
+  /**
+   * Starts the syncer
+   * @throws KeeperException if error occurs when trying to create base nodes 
on client ZK
+   */
+  public void start() throws KeeperException {
+    LOG.debug("Starting " + getClass().getSimpleName());
+    this.watcher.registerListener(this);
+    // create base znode on remote ZK
+    ZKUtil.createWithParents(clientZkWatcher, watcher.znodePaths.baseZNode);
+    // set meta znodes for client ZK
+    Collection<String> nodes = getNodesToWatch();
+    LOG.debug("Znodes to watch: " + nodes);
+    // initialize queues and threads
+    for (String node : nodes) {
+      BlockingQueue<byte[]> queue = new ArrayBlockingQueue<>(1);
+      queues.put(node, queue);
+      Thread updater = new ClientZkUpdater(node, queue);
+      updater.setDaemon(true);
+      updater.start();
+      watchAndCheckExists(node);
+    }
+  }
+
+  private void watchAndCheckExists(String node) {
+    try {
+      if (ZKUtil.watchAndCheckExists(watcher, node)) {
+        byte[] data = ZKUtil.getDataAndWatch(watcher, node);
+        if (data != null) {
+          // put the data into queue
+          upsertQueue(node, data);
+        } else {
+          // It existed but now does not, should has been tracked by our 
watcher, ignore
+          LOG.debug("Found no data from " + node);
+          watchAndCheckExists(node);
+        }
+      } else {
+        // cleanup stale ZNodes on client ZK to avoid invalid requests to 
server
+        ZKUtil.deleteNodeFailSilent(clientZkWatcher, node);
+      }
+    } catch (KeeperException e) {
+      server.abort("Unexpected exception during initialization, aborting", e);
+    }
+  }
+
+  /**
+   * Update the value of the single element in queue if any, or else insert.
+   * <p/>
+   * We only need to synchronize the latest znode value to client ZK rather 
than synchronize each
+   * time
+   * @param data the data to write to queue
+   */
+  private void upsertQueue(String node, byte[] data) {
+    BlockingQueue<byte[]> queue = queues.get(node);
+    synchronized (queue) {
+      queue.poll();
+      queue.offer(data);
+    }
+  }
+
+  /**
+   * Set data for client ZK and retry until succeed. Be very careful to 
prevent dead loop when
+   * modifying this method
+   * @param node the znode to set on client ZK
+   * @param data the data to set to client ZK
+   * @throws InterruptedException if the thread is interrupted during process
+   */
+  private final void setDataForClientZkUntilSuccess(String node, byte[] data)
+      throws InterruptedException {
+    while (!server.isStopped()) {
+      try {
+        LOG.debug("Set data for remote " + node + ", client zk wather: " + 
clientZkWatcher);
+        ZKUtil.setData(clientZkWatcher, node, data);
+        break;
+      } catch (KeeperException.NoNodeException nne) {
+        // Node doesn't exist, create it and set value
+        try {
+          ZKUtil.createNodeIfNotExistsNoWatch(clientZkWatcher, node, data, 
CreateMode.PERSISTENT);
+          break;
+        } catch (KeeperException.ConnectionLossException
+            | KeeperException.SessionExpiredException ee) {
+          reconnectAfterExpiration();
+        } catch (KeeperException e) {
+          LOG.warn(
+            "Failed to create znode " + node + " due to: " + e.getMessage() + 
", will retry later");
+        }
+      } catch (KeeperException.ConnectionLossException
+          | KeeperException.SessionExpiredException ee) {
+        reconnectAfterExpiration();
+      } catch (KeeperException e) {
+        LOG.debug("Failed to set data to client ZK, will retry later", e);
+      }
+      Threads.sleep(HConstants.SOCKET_RETRY_WAIT_MS);
+    }
+  }
+
+  private final void reconnectAfterExpiration() throws InterruptedException {
+    LOG.warn("ZK session expired or lost. Retry a new connection...");
+    try {
+      clientZkWatcher.reconnectAfterExpiration();
+    } catch (IOException | KeeperException e) {
+      LOG.warn("Failed to reconnect to client zk after session expiration, 
will retry later", e);
+    }
+  }
+
+  @Override
+  public void nodeCreated(String path) {
+    if (!validate(path)) {
+      return;
+    }
+    try {
+      byte[] data = ZKUtil.getDataAndWatch(watcher, path);
+      upsertQueue(path, data);
+    } catch (KeeperException e) {
+      LOG.warn("Unexpected exception handling nodeCreated event", e);
+    }
+  }
+
+  @Override
+  public void nodeDataChanged(String path) {
+    if (validate(path)) {
+      nodeCreated(path);
+    }
+  }
+
+  @Override
+  public synchronized void nodeDeleted(String path) {
+    if (validate(path)) {
+      try {
+        if (ZKUtil.watchAndCheckExists(watcher, path)) {
+          nodeCreated(path);
+        }
+      } catch (KeeperException e) {
+        LOG.warn("Unexpected exception handling nodeDeleted event for path: " 
+ path, e);
+      }
+    }
+  }
+
+  /**
+   * Validate whether a znode path is watched by us
+   * @param path the path to validate
+   * @return true if the znode is watched by us
+   */
+  abstract boolean validate(String path);
+
+  /**
+   * @return the znode(s) to watch
+   */
+  abstract Collection<String> getNodesToWatch();
+
+  /**
+   * Thread to synchronize znode data to client ZK cluster
+   */
+  class ClientZkUpdater extends Thread {
+    final String znode;
+    final BlockingQueue<byte[]> queue;
+
+    public ClientZkUpdater(String znode, BlockingQueue<byte[]> queue) {
+      this.znode = znode;
+      this.queue = queue;
+      setName("ClientZKUpdater-" + znode);
+    }
+
+    @Override
+    public void run() {
+      while (!server.isStopped()) {
+        try {
+          byte[] data = queue.take();
+          setDataForClientZkUntilSuccess(znode, data);
+        } catch (InterruptedException e) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(
+              "Interrupted while checking whether need to update meta location 
to client zk");
+          }
+          Thread.currentThread().interrupt();
+          break;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/061a31fa/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MasterAddressSyncer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MasterAddressSyncer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MasterAddressSyncer.java
new file mode 100644
index 0000000..3da8558
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MasterAddressSyncer.java
@@ -0,0 +1,52 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.zksyncer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Tracks the active master address on server ZK cluster and synchronize them 
to client ZK cluster
+ * if changed
+ */
+@InterfaceAudience.Private
+public class MasterAddressSyncer extends ClientZKSyncer {
+  private final String masterAddressZNode;
+
+  public MasterAddressSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, 
Server server) {
+    super(watcher, clientZkWatcher, server);
+    masterAddressZNode = watcher.znodePaths.masterAddressZNode;
+  }
+
+  @Override
+  boolean validate(String path) {
+    return path.equals(masterAddressZNode);
+  }
+
+  @Override
+  Collection<String> getNodesToWatch() {
+    ArrayList<String> toReturn = new ArrayList<>();
+    toReturn.add(masterAddressZNode);
+    return toReturn;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/061a31fa/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MetaLocationSyncer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MetaLocationSyncer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MetaLocationSyncer.java
new file mode 100644
index 0000000..68f7fc4
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/zksyncer/MetaLocationSyncer.java
@@ -0,0 +1,46 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.zksyncer;
+
+import java.util.Collection;
+
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Tracks the meta region locations on server ZK cluster and synchronize them 
to client ZK cluster
+ * if changed
+ */
+@InterfaceAudience.Private
+public class MetaLocationSyncer extends ClientZKSyncer {
+  public MetaLocationSyncer(ZKWatcher watcher, ZKWatcher clientZkWatcher, 
Server server) {
+    super(watcher, clientZkWatcher, server);
+  }
+
+  @Override
+  boolean validate(String path) {
+    return watcher.znodePaths.isAnyMetaReplicaZNode(path);
+  }
+
+  @Override
+  Collection<String> getNodesToWatch() {
+    return watcher.znodePaths.metaReplicaZNodes.values();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/061a31fa/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index d29bce7..54f3c8f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -325,6 +325,8 @@ public class HRegionServer extends HasThread implements
 
   volatile boolean killed = false;
 
+  private volatile boolean shutDown = false;
+
   protected final Configuration conf;
 
   private Path rootDir;
@@ -777,6 +779,13 @@ public class HRegionServer extends HasThread implements
    */
   @VisibleForTesting
   protected ClusterConnection createClusterConnection() throws IOException {
+    Configuration conf = this.conf;
+    if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
+      // Use server ZK cluster for server-issued connections, so we clone
+      // the conf and unset the client ZK related properties
+      conf = new Configuration(this.conf);
+      conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
+    }
     // Create a cluster connection that when appropriate, can short-circuit 
and go directly to the
     // local server if the request is to the local server bypassing RPC. Can 
be used for both local
     // and remote invocations.
@@ -1150,6 +1159,7 @@ public class HRegionServer extends HasThread implements
     if (this.zooKeeper != null) {
       this.zooKeeper.close();
     }
+    this.shutDown = true;
     LOG.info("Exiting; stopping=" + this.serverName + "; zookeeper connection 
closed.");
   }
 
@@ -3782,4 +3792,8 @@ public class HRegionServer extends HasThread implements
       }
     }
   }
+
+  public boolean isShutDown() {
+    return shutDown;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/061a31fa/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 5a05660..fb4e0f9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -146,6 +146,10 @@ public class ReplicationSink {
     if (StringUtils.isNotEmpty(replicationCodec)) {
       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
     }
+    // use server ZK cluster for replication, so we unset the client ZK 
related properties if any
+    if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
+      this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
+    }
    }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/061a31fa/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSeparateClientZKCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSeparateClientZKCluster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSeparateClientZKCluster.java
new file mode 100644
index 0000000..d7caac6
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSeparateClientZKCluster.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestSeparateClientZKCluster {
+  private static final Log LOG = 
LogFactory.getLog(TestSeparateClientZKCluster.class);
+  private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  private static final File clientZkDir = new 
File("/tmp/TestSeparateClientZKCluster");
+  private static final int ZK_SESSION_TIMEOUT = 5000;
+  private static MiniZooKeeperCluster clientZkCluster;
+
+  private final byte[] family = Bytes.toBytes("cf");
+  private final byte[] qualifier = Bytes.toBytes("c1");
+  private final byte[] row = Bytes.toBytes("row");
+  private final byte[] value = Bytes.toBytes("v1");
+  private final byte[] newVal = Bytes.toBytes("v2");
+
+  @Rule
+  public TestName name = new TestName();
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestSeparateClientZKCluster.class);
+
+  @BeforeClass
+  public static void beforeAllTests() throws Exception {
+    int clientZkPort = 21828;
+    clientZkCluster = new MiniZooKeeperCluster(TEST_UTIL.getConfiguration());
+    clientZkCluster.setDefaultClientPort(clientZkPort);
+    clientZkCluster.startup(clientZkDir);
+    // reduce the retry number and start log counter
+    
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+    
TEST_UTIL.getConfiguration().setInt("hbase.client.start.log.errors.counter", 
-1);
+    TEST_UTIL.getConfiguration().setInt("zookeeper.recovery.retry", 1);
+    // core settings for testing client ZK cluster
+    TEST_UTIL.getConfiguration().set(HConstants.CLIENT_ZOOKEEPER_QUORUM, 
HConstants.LOCALHOST);
+    
TEST_UTIL.getConfiguration().setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, 
clientZkPort);
+    // reduce zk session timeout to easier trigger session expiration
+    TEST_UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, 
ZK_SESSION_TIMEOUT);
+    // Start a cluster with 2 masters and 3 regionservers.
+    TEST_UTIL.startMiniCluster(2, 3);
+  }
+
+  @AfterClass
+  public static void afterAllTests() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+    clientZkCluster.shutdown();
+    FileUtils.deleteDirectory(clientZkDir);
+  }
+
+  @Test(timeout = 60000)
+  public void testBasicOperation() throws Exception {
+    TableName tn = TableName.valueOf(name.getMethodName());
+    // create table
+    Connection conn = TEST_UTIL.getConnection();
+    Admin admin = conn.getAdmin();
+    HTable table = (HTable) conn.getTable(tn);
+    try {
+      ColumnFamilyDescriptorBuilder cfDescBuilder =
+          ColumnFamilyDescriptorBuilder.newBuilder(family);
+      TableDescriptorBuilder tableDescBuilder =
+          
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
+      admin.createTable(tableDescBuilder.build());
+      // test simple get and put
+      Put put = new Put(row);
+      put.addColumn(family, qualifier, value);
+      table.put(put);
+      Get get = new Get(row);
+      Result result = table.get(get);
+      LOG.debug("Result: " + Bytes.toString(result.getValue(family, 
qualifier)));
+      Assert.assertArrayEquals(value, result.getValue(family, qualifier));
+    } finally {
+      admin.close();
+      table.close();
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testMasterSwitch() throws Exception {
+    // get an admin instance and issue some request first
+    Connection conn = TEST_UTIL.getConnection();
+    Admin admin = conn.getAdmin();
+    LOG.debug("Tables: " + admin.listTableDescriptors());
+    try {
+      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+      // switch active master
+      HMaster master = cluster.getMaster();
+      master.stopMaster();
+      while (!master.isShutDown()) {
+        Thread.sleep(200);
+      }
+      while (!cluster.getMaster().isInitialized()) {
+        Thread.sleep(200);
+      }
+      // confirm client access still works
+      Assert.assertTrue(admin.balance(false));
+    } finally {
+      admin.close();
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testMetaRegionMove() throws Exception {
+    TableName tn = TableName.valueOf(name.getMethodName());
+    // create table
+    Connection conn = TEST_UTIL.getConnection();
+    Admin admin = conn.getAdmin();
+    HTable table = (HTable) conn.getTable(tn);
+    try {
+      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+      ColumnFamilyDescriptorBuilder cfDescBuilder =
+          ColumnFamilyDescriptorBuilder.newBuilder(family);
+      TableDescriptorBuilder tableDescBuilder =
+          
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
+      admin.createTable(tableDescBuilder.build());
+      // issue some requests to cache the region location
+      Put put = new Put(row);
+      put.addColumn(family, qualifier, value);
+      table.put(put);
+      Get get = new Get(row);
+      Result result = table.get(get);
+      // move meta region and confirm client could detect
+      byte[] destServerName = null;
+      for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
+        ServerName name = rst.getRegionServer().getServerName();
+        if (!name.equals(cluster.getServerHoldingMeta())) {
+          destServerName = Bytes.toBytes(name.getServerName());
+          break;
+        }
+      }
+      
admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), 
destServerName);
+      LOG.debug("Finished moving meta");
+      // invalidate client cache
+      RegionInfo region =
+          table.getRegionLocator().getRegionLocation(row).getRegion();
+      ServerName currentServer = cluster.getServerHoldingRegion(tn, 
region.getRegionName());
+      for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
+        ServerName name = rst.getRegionServer().getServerName();
+        if (!name.equals(currentServer)) {
+          destServerName = Bytes.toBytes(name.getServerName());
+          break;
+        }
+      }
+      admin.move(region.getEncodedNameAsBytes(), destServerName);
+      LOG.debug("Finished moving user region");
+      put = new Put(row);
+      put.addColumn(family, qualifier, newVal);
+      table.put(put);
+      result = table.get(get);
+      LOG.debug("Result: " + Bytes.toString(result.getValue(family, 
qualifier)));
+      Assert.assertArrayEquals(newVal, result.getValue(family, qualifier));
+    } finally {
+      admin.close();
+      table.close();
+    }
+  }
+
+  @Test(timeout = 120000)
+  public void testMetaMoveDuringClientZkClusterRestart() throws Exception {
+    TableName tn = TableName.valueOf(name.getMethodName());
+    // create table
+    ClusterConnection conn = (ClusterConnection) TEST_UTIL.getConnection();
+    Admin admin = conn.getAdmin();
+    HTable table = (HTable) conn.getTable(tn);
+    try {
+      ColumnFamilyDescriptorBuilder cfDescBuilder =
+          ColumnFamilyDescriptorBuilder.newBuilder(family);
+      TableDescriptorBuilder tableDescBuilder =
+          
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
+      admin.createTable(tableDescBuilder.build());
+      // put some data
+      Put put = new Put(row);
+      put.addColumn(family, qualifier, value);
+      table.put(put);
+      // invalid connection cache
+      conn.clearRegionCache();
+      // stop client zk cluster
+      clientZkCluster.shutdown();
+      // stop current meta server and confirm the server shutdown process
+      // is not affected by client ZK crash
+      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+      int metaServerId = cluster.getServerWithMeta();
+      HRegionServer metaServer = cluster.getRegionServer(metaServerId);
+      metaServer.stop("Stop current RS holding meta region");
+      while (!metaServer.isShutDown()) {
+        Thread.sleep(200);
+      }
+      // wait for meta region online
+      cluster.getMaster().getAssignmentManager()
+          .waitForAssignment(RegionInfoBuilder.FIRST_META_REGIONINFO);
+      // wait some long time to make sure we will retry sync data to client ZK 
until data set
+      Thread.sleep(10000);
+      clientZkCluster.startup(clientZkDir);
+      // new request should pass
+      Get get = new Get(row);
+      Result result = table.get(get);
+      LOG.debug("Result: " + Bytes.toString(result.getValue(family, 
qualifier)));
+      Assert.assertArrayEquals(value, result.getValue(family, qualifier));
+    } finally {
+      admin.close();
+      table.close();
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testAsyncTable() throws Exception {
+    TableName tn = TableName.valueOf(name.getMethodName());
+    ColumnFamilyDescriptorBuilder cfDescBuilder = 
ColumnFamilyDescriptorBuilder.newBuilder(family);
+    TableDescriptorBuilder tableDescBuilder =
+        
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
+    try (AsyncConnection ASYNC_CONN =
+        
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
+      ASYNC_CONN.getAdmin().createTable(tableDescBuilder.build()).get();
+      AsyncTable<?> table = ASYNC_CONN.getTable(tn);
+      // put some data
+      Put put = new Put(row);
+      put.addColumn(family, qualifier, value);
+      table.put(put).get();
+      // get and verify
+      Get get = new Get(row);
+      Result result = table.get(get).get();
+      LOG.debug("Result: " + Bytes.toString(result.getValue(family, 
qualifier)));
+      Assert.assertArrayEquals(value, result.getValue(family, qualifier));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/061a31fa/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
index 83e69d2..87e45c5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Ignore;
@@ -318,4 +319,40 @@ public class TestMasterNoCluster {
       master.join();
     }
   }
+
+  @Test(timeout = 60000)
+  public void testMasterInitWithSameClientServerZKQuorum() throws Exception {
+    Configuration conf = new Configuration(TESTUTIL.getConfiguration());
+    conf.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
+    conf.setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, 
TESTUTIL.getZkCluster().getClientPort());
+    HMaster master = new HMaster(conf);
+    master.start();
+    // the master will abort due to IllegalArgumentException so we should 
finish within 60 seconds
+    master.join();
+  }
+
+  @Test(timeout = 60000)
+  public void testMasterInitWithObserverModeClientZKQuorum() throws Exception {
+    Configuration conf = new Configuration(TESTUTIL.getConfiguration());
+    
Assert.assertFalse(Boolean.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE));
+    // set client ZK to some non-existing address and make sure server won't 
access client ZK
+    // (server start should not be affected)
+    conf.set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
+    conf.setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT,
+      TESTUTIL.getZkCluster().getClientPort() + 1);
+    // settings to allow us not to start additional RS
+    conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
+    conf.setBoolean(LoadBalancer.TABLES_ON_MASTER, true);
+    // main setting for this test case
+    conf.setBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE, true);
+    HMaster master = new HMaster(conf);
+    master.start();
+    while (!master.isInitialized()) {
+      Threads.sleep(200);
+    }
+    Assert.assertNull(master.metaLocationSyncer);
+    Assert.assertNull(master.masterAddressSyncer);
+    master.stopMaster();
+    master.join();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/061a31fa/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
 
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
index 010ec8c..b264563 100644
--- 
a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
+++ 
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
@@ -238,7 +238,7 @@ public class MiniZooKeeperCluster {
           standaloneServerFactory.configure(
             new InetSocketAddress(currentClientPort),
             configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
-                    HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS));
+                    HConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS));
         } catch (BindException e) {
           LOG.debug("Failed binding ZK Server to client port: " +
               currentClientPort, e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/061a31fa/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
----------------------------------------------------------------------
diff --git 
a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
 
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
index dd71dee..0db6205 100644
--- 
a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
+++ 
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
@@ -46,7 +46,7 @@ public final class ZKServerTool {
     for (String value : values) {
       String[] parts = value.split(":");
       String host = parts[0];
-      int port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
+      int port = HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT;
       if (parts.length > 1) {
         port = Integer.parseInt(parts[1]);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/061a31fa/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java 
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index 75ad0cb..f1cacbf 100644
--- 
a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ 
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -1952,7 +1952,7 @@ public final class ZKUtil {
 
     String host = sp[0];
     int port = sp.length > 1 ? Integer.parseInt(sp[1])
-        : HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
+        : HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT;
 
     InetSocketAddress sockAddr = new InetSocketAddress(host, port);
     try (Socket socket = new Socket()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/061a31fa/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
----------------------------------------------------------------------
diff --git 
a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
 
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
index 3aac946..c3cac5f 100644
--- 
a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
+++ 
b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java
@@ -116,8 +116,43 @@ public class ZKWatcher implements Watcher, Abortable, 
Closeable {
   public ZKWatcher(Configuration conf, String identifier,
                    Abortable abortable, boolean canCreateBaseZNode)
   throws IOException, ZooKeeperConnectionException {
+    this(conf, identifier, abortable, canCreateBaseZNode, false);
+  }
+
+  /**
+   * Instantiate a ZooKeeper connection and watcher.
+   * @param conf the configuration to use
+   * @param identifier string that is passed to RecoverableZookeeper to be 
used as identifier for
+   *          this instance. Use null for default.
+   * @param abortable Can be null if there is on error there is no host to 
abort: e.g. client
+   *          context.
+   * @param canCreateBaseZNode true if a base ZNode can be created
+   * @param clientZK whether this watcher is set to access client ZK
+   * @throws IOException if the connection to ZooKeeper fails
+   * @throws ZooKeeperConnectionException if the connection to Zookeeper fails 
when create base
+   *           ZNodes
+   */
+  public ZKWatcher(Configuration conf, String identifier, Abortable abortable,
+      boolean canCreateBaseZNode, boolean clientZK)
+      throws IOException, ZooKeeperConnectionException {
     this.conf = conf;
-    this.quorum = ZKConfig.getZKQuorumServersString(conf);
+    if (clientZK) {
+      String clientZkQuorumServers = 
ZKConfig.getClientZKQuorumServersString(conf);
+      String serverZkQuorumServers = ZKConfig.getZKQuorumServersString(conf);
+      if (clientZkQuorumServers != null) {
+        if (clientZkQuorumServers.equals(serverZkQuorumServers)) {
+          // Don't allow same settings to avoid dead loop when master trying
+          // to sync meta information from server ZK to client ZK
+          throw new IllegalArgumentException(
+              "The quorum settings for client ZK should be different from 
those for server");
+        }
+        this.quorum = clientZkQuorumServers;
+      } else {
+        this.quorum = serverZkQuorumServers;
+      }
+    } else {
+      this.quorum = ZKConfig.getZKQuorumServersString(conf);
+    }
     this.prefix = identifier;
     // Identifier will get the sessionid appended later below down when we
     // handle the syncconnect event.

Reply via email to