HBASE-14866 VerifyReplication and ReplicationAdmin should use full peer configuration for peer connection
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c1e0fcc2 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c1e0fcc2 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c1e0fcc2 Branch: refs/heads/hbase-12439 Commit: c1e0fcc26d7e7b10f6ce609e1ff0e4e9378dcf4b Parents: ba3aa9a Author: Gary Helmling <ga...@apache.org> Authored: Wed Dec 9 15:52:27 2015 -0800 Committer: Gary Helmling <ga...@apache.org> Committed: Wed Dec 9 15:52:27 2015 -0800 ---------------------------------------------------------------------- .../client/replication/ReplicationAdmin.java | 14 +- .../replication/ReplicationPeersZKImpl.java | 7 +- .../replication/ReplicationStateZKBase.java | 3 +- .../apache/hadoop/hbase/zookeeper/ZKConfig.java | 155 ---------- .../apache/hadoop/hbase/zookeeper/ZKUtil.java | 124 -------- .../hadoop/hbase/zookeeper/TestZKUtil.java | 11 - .../apache/hadoop/hbase/HBaseConfiguration.java | 78 ++++- .../apache/hadoop/hbase/zookeeper/ZKConfig.java | 301 +++++++++++++++++++ .../hadoop/hbase/TestHBaseConfiguration.java | 10 +- .../hadoop/hbase/zookeeper/TestZKConfig.java | 126 ++++++++ .../hadoop/hbase/mapreduce/SyncTable.java | 15 +- .../hbase/mapreduce/TableMapReduceUtil.java | 34 ++- .../hbase/mapreduce/TableOutputFormat.java | 22 +- .../replication/VerifyReplication.java | 25 +- .../hbase/util/ServerRegionReplicaUtil.java | 4 +- .../org/apache/hadoop/hbase/TestZooKeeper.java | 65 ---- .../replication/TestReplicationAdmin.java | 36 ++- .../replication/TestReplicationEndpoint.java | 10 +- .../replication/TestReplicationStateBasic.java | 4 +- .../replication/TestReplicationStateZKImpl.java | 5 +- .../TestRegionReplicaReplicationEndpoint.java | 8 +- .../hadoop/hbase/zookeeper/TestZKConfig.java | 45 --- 22 files changed, 620 insertions(+), 482 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 8bd1267..a0bea8b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; @@ -626,7 +625,8 @@ public class ReplicationAdmin implements Closeable { } } - private List<ReplicationPeer> listValidReplicationPeers() { + @VisibleForTesting + List<ReplicationPeer> listValidReplicationPeers() { Map<String, ReplicationPeerConfig> peers = listPeerConfigs(); if (peers == null || peers.size() <= 0) { return null; @@ -634,18 +634,16 @@ public class ReplicationAdmin implements Closeable { List<ReplicationPeer> validPeers = new ArrayList<ReplicationPeer>(peers.size()); for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) { String peerId = peerEntry.getKey(); - String clusterKey = peerEntry.getValue().getClusterKey(); - Configuration peerConf = new Configuration(this.connection.getConfiguration()); Stat s = null; try { - ZKUtil.applyClusterKeyToConf(peerConf, clusterKey); Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId); + Configuration peerConf = pair.getSecond(); ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst()); s = zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT), null); if (null == s) { - LOG.info(peerId + ' ' + clusterKey + " is invalid now."); + LOG.info(peerId + ' ' + pair.getFirst().getClusterKey() + " is invalid now."); continue; } validPeers.add(peer); @@ -664,10 +662,6 @@ public class ReplicationAdmin implements Closeable { LOG.debug("Failure details to get valid replication peers.", e); Thread.currentThread().interrupt(); continue; - } catch (IOException e) { - LOG.warn("Failed to get valid replication peers due to IOException."); - LOG.debug("Failure details to get valid replication peers.", e); - continue; } } return validPeers; http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 1884469..63f9ac3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; @@ -318,11 +319,9 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re return null; } - Configuration otherConf = new Configuration(this.conf); + Configuration otherConf; try { - if (peerConfig.getClusterKey() != null && !peerConfig.getClusterKey().isEmpty()) { - ZKUtil.applyClusterKeyToConf(otherConf, peerConfig.getClusterKey()); - } + otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey()); } catch (IOException e) { LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e); return null; http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java index 1691b3f..4fbac0f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -69,7 +70,7 @@ public abstract class ReplicationStateZKBase { String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); - this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf); + this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf); this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName); this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName); http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java deleted file mode 100644 index a8f1182..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java +++ /dev/null @@ -1,155 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.zookeeper; - -import java.util.Map.Entry; -import java.util.Properties; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -/** - * Utility methods for reading, and building the ZooKeeper configuration. - * - * The order and priority for reading the config are as follows: - * (1). Property with "hbase.zookeeper.property." prefix from HBase XML - * (2). other zookeeper related properties in HBASE XML - */ -@InterfaceAudience.Private -public class ZKConfig { - - private static final String VARIABLE_START = "${"; - - /** - * Make a Properties object holding ZooKeeper config. - * Parses the corresponding config options from the HBase XML configs - * and generates the appropriate ZooKeeper properties. - * @param conf Configuration to read from. - * @return Properties holding mappings representing ZooKeeper config file. - */ - public static Properties makeZKProps(Configuration conf) { - return makeZKPropsFromHbaseConfig(conf); - } - - /** - * Make a Properties object holding ZooKeeper config. - * Parses the corresponding config options from the HBase XML configs - * and generates the appropriate ZooKeeper properties. - * - * @param conf Configuration to read from. - * @return Properties holding mappings representing ZooKeeper config file. - */ - private static Properties makeZKPropsFromHbaseConfig(Configuration conf) { - Properties zkProperties = new Properties(); - - // Directly map all of the hbase.zookeeper.property.KEY properties. - // Synchronize on conf so no loading of configs while we iterate - synchronized (conf) { - for (Entry<String, String> entry : conf) { - String key = entry.getKey(); - if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) { - String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN); - String value = entry.getValue(); - // If the value has variables substitutions, need to do a get. - if (value.contains(VARIABLE_START)) { - value = conf.get(key); - } - zkProperties.setProperty(zkKey, value); - } - } - } - - // If clientPort is not set, assign the default. - if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) { - zkProperties.put(HConstants.CLIENT_PORT_STR, - HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT); - } - - // Create the server.X properties. - int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888); - int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888); - - final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM, - HConstants.LOCALHOST); - String serverHost; - String address; - String key; - for (int i = 0; i < serverHosts.length; ++i) { - if (serverHosts[i].contains(":")) { - serverHost = serverHosts[i].substring(0, serverHosts[i].indexOf(':')); - } else { - serverHost = serverHosts[i]; - } - address = serverHost + ":" + peerPort + ":" + leaderPort; - key = "server." + i; - zkProperties.put(key, address); - } - - return zkProperties; - } - - /** - * Return the ZK Quorum servers string given the specified configuration - * - * @param conf - * @return Quorum servers String - */ - private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) { - String defaultClientPort = Integer.toString( - conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT)); - - // Build the ZK quorum server string with "server:clientport" list, separated by ',' - final String[] serverHosts = - conf.getStrings(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST); - return buildQuorumServerString(serverHosts, defaultClientPort); - } - - /** - * Build the ZK quorum server string with "server:clientport" list, separated by ',' - * - * @param serverHosts a list of servers for ZK quorum - * @param clientPort the default client port - * @return the string for a list of "server:port" separated by "," - */ - public static String buildQuorumServerString(String[] serverHosts, String clientPort) { - StringBuilder quorumStringBuilder = new StringBuilder(); - String serverHost; - for (int i = 0; i < serverHosts.length; ++i) { - if (serverHosts[i].contains(":")) { - serverHost = serverHosts[i]; // just use the port specified from the input - } else { - serverHost = serverHosts[i] + ":" + clientPort; - } - if (i > 0) { - quorumStringBuilder.append(','); - } - quorumStringBuilder.append(serverHost); - } - return quorumStringBuilder.toString(); - } - - /** - * Return the ZK Quorum servers string given the specified configuration. - * @return Quorum servers - */ - public static String getZKQuorumServersString(Configuration conf) { - return getZKQuorumServersStringFromHbaseConfig(conf); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 633525f..c268268 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -76,7 +76,6 @@ import org.apache.zookeeper.proto.DeleteRequest; import org.apache.zookeeper.proto.SetDataRequest; import org.apache.zookeeper.server.ZooKeeperSaslServer; -import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.InvalidProtocolBufferException; /** @@ -96,25 +95,6 @@ public class ZKUtil { public static final char ZNODE_PATH_SEPARATOR = '/'; private static int zkDumpConnectionTimeOut; - // The Quorum for the ZK cluster can have one the following format (see examples below): - // (1). s1,s2,s3 (no client port in the list, the client port could be obtained from clientPort) - // (2). s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server, - // in this case, the clientPort would be ignored) - // (3). s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use - // the clientPort; otherwise, it would use the specified port) - @VisibleForTesting - public static class ZKClusterKey { - public String quorumString; - public int clientPort; - public String znodeParent; - - ZKClusterKey(String quorumString, int clientPort, String znodeParent) { - this.quorumString = quorumString; - this.clientPort = clientPort; - this.znodeParent = znodeParent; - } - } - /** * Creates a new connection to ZooKeeper, pulling settings and ensemble config * from the specified configuration object using methods from {@link ZKConfig}. @@ -361,110 +341,6 @@ public class ZKUtil { return path.substring(path.lastIndexOf("/")+1); } - /** - * Get the key to the ZK ensemble for this configuration without - * adding a name at the end - * @param conf Configuration to use to build the key - * @return ensemble key without a name - */ - public static String getZooKeeperClusterKey(Configuration conf) { - return getZooKeeperClusterKey(conf, null); - } - - /** - * Get the key to the ZK ensemble for this configuration and append - * a name at the end - * @param conf Configuration to use to build the key - * @param name Name that should be appended at the end if not empty or null - * @return ensemble key with a name (if any) - */ - public static String getZooKeeperClusterKey(Configuration conf, String name) { - String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM).replaceAll( - "[\\t\\n\\x0B\\f\\r]", ""); - StringBuilder builder = new StringBuilder(ensemble); - builder.append(":"); - builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)); - builder.append(":"); - builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); - if (name != null && !name.isEmpty()) { - builder.append(","); - builder.append(name); - } - return builder.toString(); - } - - /** - * Apply the settings in the given key to the given configuration, this is - * used to communicate with distant clusters - * @param conf configuration object to configure - * @param key string that contains the 3 required configuratins - * @throws IOException - */ - public static void applyClusterKeyToConf(Configuration conf, String key) - throws IOException{ - ZKClusterKey zkClusterKey = transformClusterKey(key); - conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.quorumString); - conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.clientPort); - conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.znodeParent); - } - - /** - * Separate the given key into the three configurations it should contain: - * hbase.zookeeper.quorum, hbase.zookeeper.client.port - * and zookeeper.znode.parent - * @param key - * @return the three configuration in the described order - * @throws IOException - */ - public static ZKClusterKey transformClusterKey(String key) throws IOException { - String[] parts = key.split(":"); - - if (parts.length == 3) { - return new ZKClusterKey(parts [0], Integer.parseInt(parts [1]), parts [2]); - } - - if (parts.length > 3) { - // The quorum could contain client port in server:clientport format, try to transform more. - String zNodeParent = parts [parts.length - 1]; - String clientPort = parts [parts.length - 2]; - - // The first part length is the total length minus the lengths of other parts and minus 2 ":" - int endQuorumIndex = key.length() - zNodeParent.length() - clientPort.length() - 2; - String quorumStringInput = key.substring(0, endQuorumIndex); - String[] serverHosts = quorumStringInput.split(","); - - // The common case is that every server has its own client port specified - this means - // that (total parts - the ZNodeParent part - the ClientPort part) is equal to - // (the number of "," + 1) - "+ 1" because the last server has no ",". - if ((parts.length - 2) == (serverHosts.length + 1)) { - return new ZKClusterKey(quorumStringInput, Integer.parseInt(clientPort), zNodeParent); - } - - // For the uncommon case that some servers has no port specified, we need to build the - // server:clientport list using default client port for servers without specified port. - return new ZKClusterKey( - ZKConfig.buildQuorumServerString(serverHosts, clientPort), - Integer.parseInt(clientPort), - zNodeParent); - } - - throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" + - HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_CLIENT_PORT + ":" - + HConstants.ZOOKEEPER_ZNODE_PARENT); - } - - /** - * Standardize the ZK quorum string: make it a "server:clientport" list, separated by ',' - * @param quorumStringInput a string contains a list of servers for ZK quorum - * @param clientPort the default client port - * @return the string for a list of "server:port" separated by "," - */ - @VisibleForTesting - public static String standardizeQuorumServerString(String quorumStringInput, String clientPort) { - String[] serverHosts = quorumStringInput.split(","); - return ZKConfig.buildQuorumServerString(serverHosts, clientPort); - } - // // Existence checks and watches // http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java index 72de935..eb629f2 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java @@ -41,17 +41,6 @@ import org.junit.experimental.categories.Category; public class TestZKUtil { @Test - public void testGetZooKeeperClusterKey() { - Configuration conf = HBaseConfiguration.create(); - conf.set(HConstants.ZOOKEEPER_QUORUM, "\tlocalhost\n"); - conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "3333"); - conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "hbase"); - String clusterKey = ZKUtil.getZooKeeperClusterKey(conf, "test"); - Assert.assertTrue(!clusterKey.contains("\t") && !clusterKey.contains("\n")); - Assert.assertEquals("localhost:3333:hbase,test", clusterKey); - } - - @Test public void testCreateACL() throws ZooKeeperConnectionException, IOException { Configuration conf = HBaseConfiguration.create(); conf.set(Superusers.SUPERUSER_CONF_KEY, "user1,@group1,user2,@group2,user3"); http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java index 7a037f4..7b94c3d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.Map.Entry; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; import org.apache.hadoop.hbase.util.VersionInfo; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; /** * Adds HBase configuration files to a Configuration @@ -115,7 +116,7 @@ public class HBaseConfiguration extends Configuration { * @param srcConf the source configuration **/ public static void merge(Configuration destConf, Configuration srcConf) { - for (Entry<String, String> e : srcConf) { + for (Map.Entry<String, String> e : srcConf) { destConf.set(e.getKey(), e.getValue()); } } @@ -129,7 +130,7 @@ public class HBaseConfiguration extends Configuration { */ public static Configuration subset(Configuration srcConf, String prefix) { Configuration newConf = new Configuration(false); - for (Entry<String, String> entry : srcConf) { + for (Map.Entry<String, String> entry : srcConf) { if (entry.getKey().startsWith(prefix)) { String newKey = entry.getKey().substring(prefix.length()); // avoid entries that would produce an empty key @@ -142,6 +143,18 @@ public class HBaseConfiguration extends Configuration { } /** + * Sets all the entries in the provided {@code Map<String, String>} as properties in the + * given {@code Configuration}. Each property will have the specified prefix prepended, + * so that the configuration entries are keyed by {@code prefix + entry.getKey()}. + */ + public static void setWithPrefix(Configuration conf, String prefix, + Iterable<Map.Entry<String, String>> properties) { + for (Map.Entry<String, String> entry : properties) { + conf.set(prefix + entry.getKey(), entry.getValue()); + } + } + + /** * @return whether to show HBase Configuration in servlet */ public static boolean isShowConfInServlet() { @@ -236,6 +249,65 @@ public class HBaseConfiguration extends Configuration { } /** + * Generates a {@link Configuration} instance by applying the ZooKeeper cluster key + * to the base Configuration. Note that additional configuration properties may be needed + * for a remote cluster, so it is preferable to use + * {@link #createClusterConf(Configuration, String, String)}. + * + * @param baseConf the base configuration to use, containing prefixed override properties + * @param clusterKey the ZooKeeper quorum cluster key to apply, or {@code null} if none + * + * @return the merged configuration with override properties and cluster key applied + * + * @see #createClusterConf(Configuration, String, String) + */ + public static Configuration createClusterConf(Configuration baseConf, String clusterKey) + throws IOException { + return createClusterConf(baseConf, clusterKey, null); + } + + /** + * Generates a {@link Configuration} instance by applying property overrides prefixed by + * a cluster profile key to the base Configuration. Override properties are extracted by + * the {@link #subset(Configuration, String)} method, then the merged on top of the base + * Configuration and returned. + * + * @param baseConf the base configuration to use, containing prefixed override properties + * @param clusterKey the ZooKeeper quorum cluster key to apply, or {@code null} if none + * @param overridePrefix the property key prefix to match for override properties, + * or {@code null} if none + * @return the merged configuration with override properties and cluster key applied + */ + public static Configuration createClusterConf(Configuration baseConf, String clusterKey, + String overridePrefix) throws IOException { + Configuration clusterConf = HBaseConfiguration.create(baseConf); + if (clusterKey != null && !clusterKey.isEmpty()) { + applyClusterKeyToConf(clusterConf, clusterKey); + } + + if (overridePrefix != null && !overridePrefix.isEmpty()) { + Configuration clusterSubset = HBaseConfiguration.subset(clusterConf, overridePrefix); + HBaseConfiguration.merge(clusterConf, clusterSubset); + } + return clusterConf; + } + + /** + * Apply the settings in the given key to the given configuration, this is + * used to communicate with distant clusters + * @param conf configuration object to configure + * @param key string that contains the 3 required configuratins + * @throws IOException + */ + private static void applyClusterKeyToConf(Configuration conf, String key) + throws IOException{ + ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key); + conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.getQuorumString()); + conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.getClientPort()); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.getZnodeParent()); + } + + /** * For debugging. Dump configurations to system output as xml format. * Master and RS configurations can also be dumped using * http services. e.g. "curl http://master:16010/dump" http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java new file mode 100644 index 0000000..fe7396a --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java @@ -0,0 +1,301 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.util.Map.Entry; +import java.util.Properties; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Utility methods for reading, and building the ZooKeeper configuration. + * + * The order and priority for reading the config are as follows: + * (1). Property with "hbase.zookeeper.property." prefix from HBase XML + * (2). other zookeeper related properties in HBASE XML + */ +@InterfaceAudience.Private +public final class ZKConfig { + + private static final String VARIABLE_START = "${"; + + private ZKConfig() { + } + + /** + * Make a Properties object holding ZooKeeper config. + * Parses the corresponding config options from the HBase XML configs + * and generates the appropriate ZooKeeper properties. + * @param conf Configuration to read from. + * @return Properties holding mappings representing ZooKeeper config file. + */ + public static Properties makeZKProps(Configuration conf) { + return makeZKPropsFromHbaseConfig(conf); + } + + /** + * Make a Properties object holding ZooKeeper config. + * Parses the corresponding config options from the HBase XML configs + * and generates the appropriate ZooKeeper properties. + * + * @param conf Configuration to read from. + * @return Properties holding mappings representing ZooKeeper config file. + */ + private static Properties makeZKPropsFromHbaseConfig(Configuration conf) { + Properties zkProperties = new Properties(); + + // Directly map all of the hbase.zookeeper.property.KEY properties. + // Synchronize on conf so no loading of configs while we iterate + synchronized (conf) { + for (Entry<String, String> entry : conf) { + String key = entry.getKey(); + if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) { + String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN); + String value = entry.getValue(); + // If the value has variables substitutions, need to do a get. + if (value.contains(VARIABLE_START)) { + value = conf.get(key); + } + zkProperties.setProperty(zkKey, value); + } + } + } + + // If clientPort is not set, assign the default. + if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) { + zkProperties.put(HConstants.CLIENT_PORT_STR, + HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT); + } + + // Create the server.X properties. + int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888); + int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888); + + final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM, + HConstants.LOCALHOST); + String serverHost; + String address; + String key; + for (int i = 0; i < serverHosts.length; ++i) { + if (serverHosts[i].contains(":")) { + serverHost = serverHosts[i].substring(0, serverHosts[i].indexOf(':')); + } else { + serverHost = serverHosts[i]; + } + address = serverHost + ":" + peerPort + ":" + leaderPort; + key = "server." + i; + zkProperties.put(key, address); + } + + return zkProperties; + } + + /** + * Return the ZK Quorum servers string given the specified configuration + * + * @param conf + * @return Quorum servers String + */ + private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) { + String defaultClientPort = Integer.toString( + conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT)); + + // Build the ZK quorum server string with "server:clientport" list, separated by ',' + final String[] serverHosts = + conf.getStrings(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST); + return buildZKQuorumServerString(serverHosts, defaultClientPort); + } + + /** + * Return the ZK Quorum servers string given the specified configuration. + * @return Quorum servers + */ + public static String getZKQuorumServersString(Configuration conf) { + return getZKQuorumServersStringFromHbaseConfig(conf); + } + + /** + * Build the ZK quorum server string with "server:clientport" list, separated by ',' + * + * @param serverHosts a list of servers for ZK quorum + * @param clientPort the default client port + * @return the string for a list of "server:port" separated by "," + */ + public static String buildZKQuorumServerString(String[] serverHosts, String clientPort) { + StringBuilder quorumStringBuilder = new StringBuilder(); + String serverHost; + for (int i = 0; i < serverHosts.length; ++i) { + if (serverHosts[i].contains(":")) { + serverHost = serverHosts[i]; // just use the port specified from the input + } else { + serverHost = serverHosts[i] + ":" + clientPort; + } + if (i > 0) { + quorumStringBuilder.append(','); + } + quorumStringBuilder.append(serverHost); + } + return quorumStringBuilder.toString(); + } + + /** + * Verifies that the given key matches the expected format for a ZooKeeper cluster key. + * The Quorum for the ZK cluster can have one the following formats (see examples below): + * + * <ol> + * <li>s1,s2,s3 (no client port in the list, the client port could be obtained from + * clientPort)</li> + * <li>s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server, + * in this case, the clientPort would be ignored)</li> + * <li>s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use + * the clientPort; otherwise, it would use the specified port)</li> + * </ol> + * + * @param key the cluster key to validate + * @throws IOException if the key could not be parsed + */ + public static void validateClusterKey(String key) throws IOException { + transformClusterKey(key); + } + + /** + * Separate the given key into the three configurations it should contain: + * hbase.zookeeper.quorum, hbase.zookeeper.client.port + * and zookeeper.znode.parent + * @param key + * @return the three configuration in the described order + * @throws IOException + */ + public static ZKClusterKey transformClusterKey(String key) throws IOException { + String[] parts = key.split(":"); + + if (parts.length == 3) { + return new ZKClusterKey(parts [0], Integer.parseInt(parts [1]), parts [2]); + } + + if (parts.length > 3) { + // The quorum could contain client port in server:clientport format, try to transform more. + String zNodeParent = parts [parts.length - 1]; + String clientPort = parts [parts.length - 2]; + + // The first part length is the total length minus the lengths of other parts and minus 2 ":" + int endQuorumIndex = key.length() - zNodeParent.length() - clientPort.length() - 2; + String quorumStringInput = key.substring(0, endQuorumIndex); + String[] serverHosts = quorumStringInput.split(","); + + // The common case is that every server has its own client port specified - this means + // that (total parts - the ZNodeParent part - the ClientPort part) is equal to + // (the number of "," + 1) - "+ 1" because the last server has no ",". + if ((parts.length - 2) == (serverHosts.length + 1)) { + return new ZKClusterKey(quorumStringInput, Integer.parseInt(clientPort), zNodeParent); + } + + // For the uncommon case that some servers has no port specified, we need to build the + // server:clientport list using default client port for servers without specified port. + return new ZKClusterKey( + buildZKQuorumServerString(serverHosts, clientPort), + Integer.parseInt(clientPort), + zNodeParent); + } + + throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" + + HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_CLIENT_PORT + ":" + + HConstants.ZOOKEEPER_ZNODE_PARENT); + } + + /** + * Get the key to the ZK ensemble for this configuration without + * adding a name at the end + * @param conf Configuration to use to build the key + * @return ensemble key without a name + */ + public static String getZooKeeperClusterKey(Configuration conf) { + return getZooKeeperClusterKey(conf, null); + } + + /** + * Get the key to the ZK ensemble for this configuration and append + * a name at the end + * @param conf Configuration to use to build the key + * @param name Name that should be appended at the end if not empty or null + * @return ensemble key with a name (if any) + */ + public static String getZooKeeperClusterKey(Configuration conf, String name) { + String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM).replaceAll( + "[\\t\\n\\x0B\\f\\r]", ""); + StringBuilder builder = new StringBuilder(ensemble); + builder.append(":"); + builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)); + builder.append(":"); + builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + if (name != null && !name.isEmpty()) { + builder.append(","); + builder.append(name); + } + return builder.toString(); + } + + /** + * Standardize the ZK quorum string: make it a "server:clientport" list, separated by ',' + * @param quorumStringInput a string contains a list of servers for ZK quorum + * @param clientPort the default client port + * @return the string for a list of "server:port" separated by "," + */ + @VisibleForTesting + public static String standardizeZKQuorumServerString(String quorumStringInput, + String clientPort) { + String[] serverHosts = quorumStringInput.split(","); + return buildZKQuorumServerString(serverHosts, clientPort); + } + + // The Quorum for the ZK cluster can have one the following format (see examples below): + // (1). s1,s2,s3 (no client port in the list, the client port could be obtained from clientPort) + // (2). s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server, + // in this case, the clientPort would be ignored) + // (3). s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use + // the clientPort; otherwise, it would use the specified port) + @VisibleForTesting + public static class ZKClusterKey { + private String quorumString; + private int clientPort; + private String znodeParent; + + ZKClusterKey(String quorumString, int clientPort, String znodeParent) { + this.quorumString = quorumString; + this.clientPort = clientPort; + this.znodeParent = znodeParent; + } + + public String getQuorumString() { + return quorumString; + } + + public int getClientPort() { + return clientPort; + } + + public String getZnodeParent() { + return znodeParent; + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java index c11916f..6c14ef9 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; @@ -27,11 +28,13 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.List; +import com.google.common.collect.ImmutableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -73,8 +76,11 @@ public class TestHBaseConfiguration { String prefix = "hbase.mapred.output."; conf.set("hbase.security.authentication", "kerberos"); conf.set("hbase.regionserver.kerberos.principal", "hbasesource"); - conf.set(prefix + "hbase.regionserver.kerberos.principal", "hbasedest"); - conf.set(prefix, "shouldbemissing"); + HBaseConfiguration.setWithPrefix(conf, prefix, + ImmutableMap.of( + "hbase.regionserver.kerberos.principal", "hbasedest", + "", "shouldbemissing") + .entrySet()); Configuration subsetConf = HBaseConfiguration.subset(conf, prefix); assertNull(subsetConf.get(prefix + "hbase.regionserver.kerberos.principal")); http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java new file mode 100644 index 0000000..7879aea --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@Category({MiscTests.class, SmallTests.class}) +public class TestZKConfig { + + @Test + public void testZKConfigLoading() throws Exception { + Configuration conf = HBaseConfiguration.create(); + // Test that we read only from the config instance + // (i.e. via hbase-default.xml and hbase-site.xml) + conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181); + Properties props = ZKConfig.makeZKProps(conf); + assertEquals("Property client port should have been default from the HBase config", + "2181", + props.getProperty("clientPort")); + } + + @Test + public void testGetZooKeeperClusterKey() { + Configuration conf = HBaseConfiguration.create(); + conf.set(HConstants.ZOOKEEPER_QUORUM, "\tlocalhost\n"); + conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "3333"); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "hbase"); + String clusterKey = ZKConfig.getZooKeeperClusterKey(conf, "test"); + assertTrue(!clusterKey.contains("\t") && !clusterKey.contains("\n")); + assertEquals("localhost:3333:hbase,test", clusterKey); + } + + @Test + public void testClusterKey() throws Exception { + testKey("server", 2181, "hbase"); + testKey("server1,server2,server3", 2181, "hbase"); + try { + ZKConfig.validateClusterKey("2181:hbase"); + } catch (IOException ex) { + // OK + } + } + + @Test + public void testClusterKeyWithMultiplePorts() throws Exception { + // server has different port than the default port + testKey("server1:2182", 2181, "hbase", true); + // multiple servers have their own port + testKey("server1:2182,server2:2183,server3:2184", 2181, "hbase", true); + // one server has no specified port, should use default port + testKey("server1:2182,server2,server3:2184", 2181, "hbase", true); + // the last server has no specified port, should use default port + testKey("server1:2182,server2:2183,server3", 2181, "hbase", true); + // multiple servers have no specified port, should use default port for those servers + testKey("server1:2182,server2,server3:2184,server4", 2181, "hbase", true); + // same server, different ports + testKey("server1:2182,server1:2183,server1", 2181, "hbase", true); + // mix of same server/different port and different server + testKey("server1:2182,server2:2183,server1", 2181, "hbase", true); + } + + private void testKey(String ensemble, int port, String znode) + throws IOException { + testKey(ensemble, port, znode, false); // not support multiple client ports + } + + private void testKey(String ensemble, int port, String znode, Boolean multiplePortSupport) + throws IOException { + Configuration conf = new Configuration(); + String key = ensemble+":"+port+":"+znode; + String ensemble2 = null; + ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key); + if (multiplePortSupport) { + ensemble2 = ZKConfig.standardizeZKQuorumServerString(ensemble, + Integer.toString(port)); + assertEquals(ensemble2, zkClusterKey.getQuorumString()); + } + else { + assertEquals(ensemble, zkClusterKey.getQuorumString()); + } + assertEquals(port, zkClusterKey.getClientPort()); + assertEquals(znode, zkClusterKey.getZnodeParent()); + + conf = HBaseConfiguration.createClusterConf(conf, key); + assertEquals(zkClusterKey.getQuorumString(), conf.get(HConstants.ZOOKEEPER_QUORUM)); + assertEquals(zkClusterKey.getClientPort(), conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, -1)); + assertEquals(zkClusterKey.getZnodeParent(), conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + + String reconstructedKey = ZKConfig.getZooKeeperClusterKey(conf); + if (multiplePortSupport) { + String key2 = ensemble2 + ":" + port + ":" + znode; + assertEquals(key2, reconstructedKey); + } + else { + assertEquals(key, reconstructedKey); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java index 20d6e24..1658ba4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; @@ -174,8 +173,9 @@ public class SyncTable extends Configured implements Tool { Configuration conf = context.getConfiguration(); sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY)); - sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY); - targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY); + sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null); + targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY, + TableOutputFormat.OUTPUT_CONF_PREFIX); sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY); targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY); dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false); @@ -196,13 +196,12 @@ public class SyncTable extends Configured implements Tool { targetHasher = new HashTable.ResultHasher(); } - private static Connection openConnection(Configuration conf, String zkClusterConfKey) + private static Connection openConnection(Configuration conf, String zkClusterConfKey, + String configPrefix) throws IOException { - Configuration clusterConf = new Configuration(conf); String zkCluster = conf.get(zkClusterConfKey); - if (zkCluster != null) { - ZKUtil.applyClusterKeyToConf(clusterConf, zkCluster); - } + Configuration clusterConf = HBaseConfiguration.createClusterConf(conf, + zkCluster, configPrefix); return ConnectionFactory.createConnection(clusterConf); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index cc8a35c..a48871f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -43,12 +43,11 @@ import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.TokenUtil; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.StringUtils; -import com.google.protobuf.InvalidProtocolBufferException; import java.io.File; import java.io.IOException; @@ -485,12 +484,8 @@ public class TableMapReduceUtil { String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS); User user = userProvider.getCurrent(); if (quorumAddress != null) { - Configuration peerConf = HBaseConfiguration.create(job.getConfiguration()); - ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress); - // apply any "hbase.mapred.output." configuration overrides - Configuration outputOverrides = - HBaseConfiguration.subset(peerConf, TableOutputFormat.OUTPUT_CONF_PREFIX); - HBaseConfiguration.merge(peerConf, outputOverrides); + Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(), + quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX); Connection peerConn = ConnectionFactory.createConnection(peerConf); try { TokenUtil.addTokenForJob(peerConn, user, job); @@ -523,15 +518,30 @@ public class TableMapReduceUtil { * @param job The job that requires the permission. * @param quorumAddress string that contains the 3 required configuratins * @throws IOException When the authentication token cannot be obtained. + * @deprecated Since 1.2.0, use {@link #initCredentialsForCluster(Job, Configuration)} instead. */ + @Deprecated public static void initCredentialsForCluster(Job job, String quorumAddress) throws IOException { + Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(), + quorumAddress); + initCredentialsForCluster(job, peerConf); + } + + /** + * Obtain an authentication token, for the specified cluster, on behalf of the current user + * and add it to the credentials for the given map reduce job. + * + * @param job The job that requires the permission. + * @param conf The configuration to use in connecting to the peer cluster + * @throws IOException When the authentication token cannot be obtained. + */ + public static void initCredentialsForCluster(Job job, Configuration conf) + throws IOException { UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); if (userProvider.isHBaseSecurityEnabled()) { try { - Configuration peerConf = HBaseConfiguration.create(job.getConfiguration()); - ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress); - Connection peerConn = ConnectionFactory.createConnection(peerConf); + Connection peerConn = ConnectionFactory.createConnection(conf); try { TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job); } finally { @@ -680,7 +690,7 @@ public class TableMapReduceUtil { // If passed a quorum/ensemble address, pass it on to TableOutputFormat. if (quorumAddress != null) { // Calling this will validate the format - ZKUtil.transformClusterKey(quorumAddress); + ZKConfig.validateClusterKey(quorumAddress); conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress); } if (serverClass != null && serverImpl != null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java index 201e78f..998d700 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -195,22 +194,19 @@ implements Configurable { @Override public void setConf(Configuration otherConf) { - this.conf = HBaseConfiguration.create(otherConf); - - String tableName = this.conf.get(OUTPUT_TABLE); + String tableName = otherConf.get(OUTPUT_TABLE); if(tableName == null || tableName.length() <= 0) { throw new IllegalArgumentException("Must specify table name"); } - String address = this.conf.get(QUORUM_ADDRESS); - int zkClientPort = this.conf.getInt(QUORUM_PORT, 0); - String serverClass = this.conf.get(REGION_SERVER_CLASS); - String serverImpl = this.conf.get(REGION_SERVER_IMPL); + String address = otherConf.get(QUORUM_ADDRESS); + int zkClientPort = otherConf.getInt(QUORUM_PORT, 0); + String serverClass = otherConf.get(REGION_SERVER_CLASS); + String serverImpl = otherConf.get(REGION_SERVER_IMPL); try { - if (address != null) { - ZKUtil.applyClusterKeyToConf(this.conf, address); - } + this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX); + if (serverClass != null) { this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); } @@ -221,9 +217,5 @@ implements Configurable { LOG.error(e); throw new RuntimeException(e); } - - // finally apply any remaining "hbase.mapred.output." configuration overrides - Configuration outputOverrides = HBaseConfiguration.subset(otherConf, OUTPUT_CONF_PREFIX); - HBaseConfiguration.merge(this.conf, outputOverrides); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 76ac541..e6b4802 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.Tool; @@ -70,6 +69,7 @@ public class VerifyReplication extends Configured implements Tool { LogFactory.getLog(VerifyReplication.class); public final static String NAME = "verifyrep"; + private final static String PEER_CONFIG_PREFIX = NAME + ".peer."; static long startTime = 0; static long endTime = Long.MAX_VALUE; static int versions = -1; @@ -130,8 +130,8 @@ public class VerifyReplication extends Configured implements Tool { final TableSplit tableSplit = (TableSplit)(context.getInputSplit()); String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); - Configuration peerConf = HBaseConfiguration.create(conf); - ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey); + Configuration peerConf = HBaseConfiguration.createClusterConf(conf, + zkClusterKey, PEER_CONFIG_PREFIX); TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); connection = ConnectionFactory.createConnection(peerConf); @@ -211,7 +211,8 @@ public class VerifyReplication extends Configured implements Tool { } } - private static String getPeerQuorumAddress(final Configuration conf) throws IOException { + private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig( + final Configuration conf) throws IOException { ZooKeeperWatcher localZKW = null; ReplicationPeerZKImpl peer = null; try { @@ -228,8 +229,8 @@ public class VerifyReplication extends Configured implements Tool { if (pair == null) { throw new IOException("Couldn't get peer conf!"); } - Configuration peerConf = rp.getPeerConf(peerId).getSecond(); - return ZKUtil.getZooKeeperClusterKey(peerConf); + + return pair; } catch (ReplicationException e) { throw new IOException( "An error occured while trying to connect to the remove peer cluster", e); @@ -268,9 +269,14 @@ public class VerifyReplication extends Configured implements Tool { conf.set(NAME+".families", families); } - String peerQuorumAddress = getPeerQuorumAddress(conf); + Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf); + ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); + String peerQuorumAddress = peerConfig.getClusterKey(); + LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " + + peerConfig.getConfiguration()); conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); - LOG.info("Peer Quorum Address: " + peerQuorumAddress); + HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX, + peerConfig.getConfiguration().entrySet()); conf.setInt(NAME + ".versions", versions); LOG.info("Number of version: " + versions); @@ -293,8 +299,9 @@ public class VerifyReplication extends Configured implements Tool { TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); + Configuration peerClusterConf = peerConfigPair.getSecond(); // Obtain the auth token from peer cluster - TableMapReduceUtil.initCredentialsForCluster(job, peerQuorumAddress); + TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); job.setOutputFormatClass(NullOutputFormat.class); job.setNumReduceTasks(0); http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java index 5c61afb..2ba1b47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java @@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; /** * Similar to {@link RegionReplicaUtil} but for the server side @@ -148,7 +148,7 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { try { if (repAdmin.getPeerConfig(REGION_REPLICA_REPLICATION_PEER) == null) { ReplicationPeerConfig peerConfig = new ReplicationPeerConfig(); - peerConfig.setClusterKey(ZKUtil.getZooKeeperClusterKey(conf)); + peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf)); peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName()); repAdmin.addPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, null); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java index 4e1599a..77d01e2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java @@ -343,71 +343,6 @@ public class TestZooKeeper { assertNull(ZKUtil.getDataNoWatch(zkw, "/l1/l2", null)); } - @Test - public void testClusterKey() throws Exception { - testKey("server", 2181, "hbase"); - testKey("server1,server2,server3", 2181, "hbase"); - try { - ZKUtil.transformClusterKey("2181:hbase"); - } catch (IOException ex) { - // OK - } - } - - @Test - public void testClusterKeyWithMultiplePorts() throws Exception { - // server has different port than the default port - testKey("server1:2182", 2181, "hbase", true); - // multiple servers have their own port - testKey("server1:2182,server2:2183,server3:2184", 2181, "hbase", true); - // one server has no specified port, should use default port - testKey("server1:2182,server2,server3:2184", 2181, "hbase", true); - // the last server has no specified port, should use default port - testKey("server1:2182,server2:2183,server3", 2181, "hbase", true); - // multiple servers have no specified port, should use default port for those servers - testKey("server1:2182,server2,server3:2184,server4", 2181, "hbase", true); - // same server, different ports - testKey("server1:2182,server1:2183,server1", 2181, "hbase", true); - // mix of same server/different port and different server - testKey("server1:2182,server2:2183,server1", 2181, "hbase", true); - } - - private void testKey(String ensemble, int port, String znode) - throws IOException { - testKey(ensemble, port, znode, false); // not support multiple client ports - } - - private void testKey(String ensemble, int port, String znode, Boolean multiplePortSupport) - throws IOException { - Configuration conf = new Configuration(); - String key = ensemble+":"+port+":"+znode; - String ensemble2 = null; - ZKUtil.ZKClusterKey zkClusterKey = ZKUtil.transformClusterKey(key); - if (multiplePortSupport) { - ensemble2 = ZKUtil.standardizeQuorumServerString(ensemble, Integer.toString(port)); - assertEquals(ensemble2, zkClusterKey.quorumString); - } - else { - assertEquals(ensemble, zkClusterKey.quorumString); - } - assertEquals(port, zkClusterKey.clientPort); - assertEquals(znode, zkClusterKey.znodeParent); - - ZKUtil.applyClusterKeyToConf(conf, key); - assertEquals(zkClusterKey.quorumString, conf.get(HConstants.ZOOKEEPER_QUORUM)); - assertEquals(zkClusterKey.clientPort, conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, -1)); - assertEquals(zkClusterKey.znodeParent, conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); - - String reconstructedKey = ZKUtil.getZooKeeperClusterKey(conf); - if (multiplePortSupport) { - String key2 = ensemble2 + ":" + port + ":" + znode; - assertEquals(key2, reconstructedKey); - } - else { - assertEquals(key, reconstructedKey); - } - } - /** * A test for HBASE-3238 * @throws IOException A connection attempt to zk failed http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index e187b9b..e18220d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -24,9 +24,13 @@ import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeer; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -38,10 +42,12 @@ import org.junit.experimental.categories.Category; import com.google.common.collect.Lists; -import static org.junit.Assert.fail; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + /** * Unit testing of ReplicationAdmin @@ -117,7 +123,29 @@ public class TestReplicationAdmin { admin.removePeer(ID_SECOND); assertEquals(0, admin.getPeersCount()); } - + + /** + * Tests that the peer configuration used by ReplicationAdmin contains all + * the peer's properties. + */ + @Test + public void testPeerConfig() throws Exception { + ReplicationPeerConfig config = new ReplicationPeerConfig(); + config.setClusterKey(KEY_ONE); + config.getConfiguration().put("key1", "value1"); + config.getConfiguration().put("key2", "value2"); + admin.addPeer(ID_ONE, config, null); + + List<ReplicationPeer> peers = admin.listValidReplicationPeers(); + assertEquals(1, peers.size()); + ReplicationPeer peerOne = peers.get(0); + assertNotNull(peerOne); + assertEquals("value1", peerOne.getConfiguration().get("key1")); + assertEquals("value2", peerOne.getConfiguration().get("key2")); + + admin.removePeer(ID_ONE); + } + @Test public void testAddPeerWithUnDeletedQueues() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 52fb41c..a5a4e73 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -115,7 +115,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { public void testCustomReplicationEndpoint() throws Exception { // test installing a custom replication endpoint other than the default one. admin.addPeer("testCustomReplicationEndpoint", - new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1)) + new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null); // check whether the class has been constructed and started @@ -157,7 +157,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { int peerCount = admin.getPeersCount(); final String id = "testReplicationEndpointReturnsFalseOnReplicate"; admin.addPeer(id, - new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1)) + new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null); // This test is flakey and then there is so much stuff flying around in here its, hard to // debug. Peer needs to be up for the edit to make it across. This wait on @@ -209,7 +209,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { } admin.addPeer(id, - new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf2)) + new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf2)) .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()), null); @@ -234,7 +234,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { @Test (timeout=120000) public void testWALEntryFilterFromReplicationEndpoint() throws Exception { admin.addPeer("testWALEntryFilterFromReplicationEndpoint", - new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1)) + new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1)) .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null); // now replicate some data. try (Connection connection = ConnectionFactory.createConnection(conf1)) { http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index f05eceb..696c130 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -27,7 +27,7 @@ import java.util.SortedSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.zookeeper.KeeperException; import org.junit.Before; import org.junit.Test; @@ -202,7 +202,7 @@ public abstract class TestReplicationStateBasic { fail("There are no connected peers, should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException e) { } - assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond())); + assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond())); rp.removePeer(ID_ONE); rp.peerRemoved(ID_ONE); assertNumberOfPeers(1); http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java index fff6c9d..4587c61 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -79,7 +80,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { String fakeRs = ZKUtil.joinZNode(zkw1.rsZNode, "hostname1.example.org:1234"); ZKUtil.createWithParents(zkw1, fakeRs); ZKClusterId.setClusterId(zkw1, new ClusterId()); - return ZKUtil.getZooKeeperClusterKey(testConf); + return ZKConfig.getZooKeeperClusterKey(testConf); } @Before @@ -94,7 +95,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { rq3 = ReplicationFactory.getReplicationQueues(zkw, conf, ds3); rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, ds1); rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw); - OUR_KEY = ZKUtil.getZooKeeperClusterKey(conf); + OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java index 2231f0e..65600ff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java @@ -53,7 +53,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.testclassification.FlakeyTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.log4j.Level; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -129,7 +129,8 @@ public class TestRegionReplicaReplicationEndpoint { // assert peer configuration is correct peerConfig = admin.getPeerConfig(peerId); assertNotNull(peerConfig); - assertEquals(peerConfig.getClusterKey(), ZKUtil.getZooKeeperClusterKey(HTU.getConfiguration())); + assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( + HTU.getConfiguration())); assertEquals(peerConfig.getReplicationEndpointImpl(), RegionReplicaReplicationEndpoint.class.getName()); admin.close(); @@ -162,7 +163,8 @@ public class TestRegionReplicaReplicationEndpoint { // assert peer configuration is correct peerConfig = admin.getPeerConfig(peerId); assertNotNull(peerConfig); - assertEquals(peerConfig.getClusterKey(), ZKUtil.getZooKeeperClusterKey(HTU.getConfiguration())); + assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( + HTU.getConfiguration())); assertEquals(peerConfig.getReplicationEndpointImpl(), RegionReplicaReplicationEndpoint.class.getName()); admin.close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java deleted file mode 100644 index 8f5961f..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.zookeeper; - -import java.util.Properties; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.testclassification.MiscTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.Assert; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({MiscTests.class, SmallTests.class}) -public class TestZKConfig { - - @Test - public void testZKConfigLoading() throws Exception { - Configuration conf = HBaseConfiguration.create(); - // Test that we read only from the config instance - // (i.e. via hbase-default.xml and hbase-site.xml) - conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181); - Properties props = ZKConfig.makeZKProps(conf); - Assert.assertEquals("Property client port should have been default from the HBase config", - "2181", - props.getProperty("clientPort")); - } -}