HBASE-16653 Backport HBASE-11393 to branches which support namespace Signed-off-by: chenheng <chenh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/66941910 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/66941910 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/66941910 Branch: refs/heads/branch-1 Commit: 66941910bd07462fe496c5bbb591f4071f77b8fb Parents: 6df7554 Author: Guanghao Zhang <zghao...@gmail.com> Authored: Mon Sep 26 19:33:43 2016 +0800 Committer: chenheng <chenh...@apache.org> Committed: Tue Oct 18 09:12:47 2016 +0800 ---------------------------------------------------------------------- .../client/replication/ReplicationAdmin.java | 84 +- .../replication/ReplicationPeerConfig.java | 16 +- .../replication/ReplicationPeerZKImpl.java | 80 +- .../hbase/replication/ReplicationPeers.java | 15 +- .../replication/ReplicationPeersZKImpl.java | 60 +- .../replication/ReplicationSerDeHelper.java | 189 +++ .../replication/ReplicationStateZKBase.java | 17 + .../protobuf/generated/ZooKeeperProtos.java | 1155 +++++++++++++++++- .../src/main/protobuf/ZooKeeper.proto | 8 +- .../org/apache/hadoop/hbase/master/HMaster.java | 8 + .../replication/master/TableCFsUpdater.java | 120 ++ .../hbase/client/TestReplicaWithCluster.java | 5 +- .../replication/TestReplicationAdmin.java | 193 +-- .../cleaner/TestReplicationHFileCleaner.java | 2 +- .../replication/TestMasterReplication.java | 9 +- .../replication/TestMultiSlaveReplication.java | 8 +- .../replication/TestPerTableCFReplication.java | 153 ++- .../hbase/replication/TestReplicationBase.java | 4 +- .../replication/TestReplicationSmallTests.java | 4 +- .../replication/TestReplicationStateBasic.java | 20 +- .../replication/TestReplicationSyncUpTool.java | 4 +- .../TestReplicationTrackerZKImpl.java | 10 +- .../replication/TestReplicationWithTags.java | 4 +- .../replication/master/TestTableCFsUpdater.java | 164 +++ .../TestReplicationSourceManager.java | 2 +- ...sibilityLabelReplicationWithExpAsString.java | 5 +- .../TestVisibilityLabelsReplication.java | 5 +- .../apache/hadoop/hbase/util/TestHBaseFsck.java | 5 +- .../src/main/ruby/hbase/replication_admin.rb | 49 +- .../src/main/ruby/shell/commands/add_peer.rb | 4 +- .../ruby/shell/commands/append_peer_tableCFs.rb | 2 +- .../src/main/ruby/shell/commands/list_peers.rb | 6 +- .../ruby/shell/commands/remove_peer_tableCFs.rb | 4 +- .../ruby/shell/commands/set_peer_tableCFs.rb | 4 +- .../hbase/client/TestReplicationShell.java | 2 +- .../test/ruby/hbase/replication_admin_test.rb | 118 +- 36 files changed, 2167 insertions(+), 371 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 1304396..9fca28b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationSerDeHelper; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -184,8 +185,8 @@ public class ReplicationAdmin implements Closeable { @Deprecated public void addPeer(String id, String clusterKey, String tableCFs) throws ReplicationException { - this.replicationPeers.addPeer(id, - new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs); + this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), + parseTableCFsFromConfig(tableCFs)); } /** @@ -199,7 +200,19 @@ public class ReplicationAdmin implements Closeable { */ public void addPeer(String id, ReplicationPeerConfig peerConfig, Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException { - this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs)); + if (tableCfs != null) { + peerConfig.setTableCFsMap(tableCfs); + } + this.replicationPeers.addPeer(id, peerConfig); + } + + /** + * Add a new remote slave cluster for replication. + * @param id a short name that identifies the cluster + * @param peerConfig configuration for the replication slave cluster + */ + public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException { + this.replicationPeers.addPeer(id, peerConfig); } public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) @@ -208,52 +221,7 @@ public class ReplicationAdmin implements Closeable { } public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) { - if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) { - return null; - } - - Map<TableName, List<String>> tableCFsMap = null; - // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393 - // parse out (table, cf-list) pairs from tableCFsConfig - // format: "table1:cf1,cf2;table2:cfA,cfB" - String[] tables = tableCFsConfig.split(";"); - for (String tab : tables) { - // 1 ignore empty table config - tab = tab.trim(); - if (tab.length() == 0) { - continue; - } - // 2 split to "table" and "cf1,cf2" - // for each table: "table:cf1,cf2" or "table" - String[] pair = tab.split(":"); - String tabName = pair[0].trim(); - if (pair.length > 2 || tabName.length() == 0) { - LOG.error("ignore invalid tableCFs setting: " + tab); - continue; - } - - // 3 parse "cf1,cf2" part to List<cf> - List<String> cfs = null; - if (pair.length == 2) { - String[] cfsList = pair[1].split(","); - for (String cf : cfsList) { - String cfName = cf.trim(); - if (cfName.length() > 0) { - if (cfs == null) { - cfs = new ArrayList<String>(); - } - cfs.add(cfName); - } - } - } - - // 4 put <table, List<cf>> to map - if (tableCFsMap == null) { - tableCFsMap = new HashMap<TableName, List<String>>(); - } - tableCFsMap.put(TableName.valueOf(tabName), cfs); - } - return tableCFsMap; + return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig); } @VisibleForTesting @@ -338,7 +306,7 @@ public class ReplicationAdmin implements Closeable { * @param id a short name that identifies the cluster */ public String getPeerTableCFs(String id) throws ReplicationException { - return this.replicationPeers.getPeerTableCFsConfig(id); + return ReplicationSerDeHelper.convertToString(this.replicationPeers.getPeerTableCFsConfig(id)); } /** @@ -348,7 +316,7 @@ public class ReplicationAdmin implements Closeable { */ @Deprecated public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException { - this.replicationPeers.setPeerTableCFsConfig(id, tableCFs); + this.setPeerTableCFs(id, parseTableCFsFromConfig(tableCFs)); } /** @@ -357,7 +325,7 @@ public class ReplicationAdmin implements Closeable { * @param tableCfs table-cfs config str */ public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException { - appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs)); + appendPeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs)); } /** @@ -370,7 +338,7 @@ public class ReplicationAdmin implements Closeable { if (tableCfs == null) { throw new ReplicationException("tableCfs is null"); } - Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id)); + Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id); if (preTableCfs == null) { setPeerTableCFs(id, tableCfs); return; @@ -406,7 +374,7 @@ public class ReplicationAdmin implements Closeable { * @throws ReplicationException */ public void removePeerTableCFs(String id, String tableCf) throws ReplicationException { - removePeerTableCFs(id, parseTableCFsFromConfig(tableCf)); + removePeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCf)); } /** @@ -421,7 +389,7 @@ public class ReplicationAdmin implements Closeable { throw new ReplicationException("tableCfs is null"); } - Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id)); + Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id); if (preTableCfs == null) { throw new ReplicationException("Table-Cfs for peer" + id + " is null"); } @@ -464,7 +432,7 @@ public class ReplicationAdmin implements Closeable { */ public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException { - this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs)); + this.replicationPeers.setPeerTableCFsConfig(id, tableCfs); } /** @@ -658,8 +626,8 @@ public class ReplicationAdmin implements Closeable { try { Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId); Configuration peerConf = pair.getSecond(); - ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(), - parseTableCFsFromConfig(this.getPeerTableCFs(peerId))); + ReplicationPeer peer = new ReplicationPeerZKImpl(zkw, pair.getSecond(), + peerId, pair.getFirst(), this.connection); listOfPeers.add(peer); } catch (ReplicationException e) { LOG.warn("Failed to get valid replication peers. " http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index 043b38f..e2c7bc7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -18,10 +18,13 @@ package org.apache.hadoop.hbase.replication; +import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.TreeMap; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.util.Bytes; @@ -37,7 +40,7 @@ public class ReplicationPeerConfig { private String replicationEndpointImpl; private final Map<byte[], byte[]> peerData; private final Map<String, String> configuration; - + private Map<TableName, ? extends Collection<String>> tableCFsMap = null; public ReplicationPeerConfig() { this.peerData = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR); @@ -78,10 +81,21 @@ public class ReplicationPeerConfig { return configuration; } + public Map<TableName, List<String>> getTableCFsMap() { + return (Map<TableName, List<String>>) tableCFsMap; + } + + public void setTableCFsMap(Map<TableName, ? extends Collection<String>> tableCFsMap) { + this.tableCFsMap = tableCFsMap; + } + @Override public String toString() { StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(","); builder.append("replicationEndpointImpl=").append(replicationEndpointImpl); + if (tableCFsMap != null) { + builder.append(tableCFsMap.toString()); + } return builder.toString(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java index 6b10015..382545d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java @@ -42,7 +42,8 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NodeExistsException; @InterfaceAudience.Private -public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closeable { +public class ReplicationPeerZKImpl extends ReplicationStateZKBase implements ReplicationPeer, + Abortable, Closeable { private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class); private ReplicationPeerConfig peerConfig; @@ -52,8 +53,8 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea private final Configuration conf; private PeerStateTracker peerStateTracker; - private TableCFsTracker tableCFsTracker; private PeerConfigTracker peerConfigTracker; + /** * Constructor that takes all the objects required to communicate with the specified peer, except * for the region server addresses. @@ -61,39 +62,23 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea * @param id string representation of this peer's identifier * @param peerConfig configuration for the replication peer */ - public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig) - throws ReplicationException { - this.conf = conf; - this.peerConfig = peerConfig; - this.id = id; - } - - /** - * Constructor that takes all the objects required to communicate with the specified peer, except - * for the region server addresses. - * @param conf configuration object to this peer - * @param id string representation of this peer's identifier - * @param peerConfig configuration for the replication peer - * @param tableCFs table-cf configuration for this peer - */ - public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig, - Map<TableName, List<String>> tableCFs) throws ReplicationException { + public ReplicationPeerZKImpl(ZooKeeperWatcher zkWatcher, Configuration conf, String id, + ReplicationPeerConfig peerConfig, Abortable abortable) throws ReplicationException { + super(zkWatcher, conf, abortable); this.conf = conf; this.peerConfig = peerConfig; this.id = id; - this.tableCFs = tableCFs; } /** * start a state tracker to check whether this peer is enabled or not * - * @param zookeeper zk watcher for the local cluster * @param peerStateNode path to zk node which stores peer state * @throws KeeperException */ - public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode) + public void startStateTracker(String peerStateNode) throws KeeperException { - ensurePeerEnabled(zookeeper, peerStateNode); + ensurePeerEnabled(peerStateNode); this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this); this.peerStateTracker.start(); try { @@ -112,25 +97,6 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea /** * start a table-cfs tracker to listen the (table, cf-list) map change - * - * @param zookeeper zk watcher for the local cluster - * @param tableCFsNode path to zk node which stores table-cfs - * @throws KeeperException - */ - public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode) - throws KeeperException { - this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper, - this); - this.tableCFsTracker.start(); - this.readTableCFsZnode(); - } - - private void readTableCFsZnode() { - String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false)); - this.tableCFs = ReplicationAdmin.parseTableCFsFromConfig(currentTableCFs); - } - /** - * start a table-cfs tracker to listen the (table, cf-list) map change * @param zookeeper * @param peerConfigNode path to zk node which stores table-cfs * @throws KeeperException @@ -154,6 +120,7 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea } return this.peerConfig; } + @Override public PeerState getPeerState() { return peerState; @@ -192,6 +159,7 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea */ @Override public Map<TableName, List<String>> getTableCFs() { + this.tableCFs = peerConfig.getTableCFsMap(); return this.tableCFs; } @@ -260,7 +228,7 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea * @throws NodeExistsException * @throws KeeperException */ - private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path) + private boolean ensurePeerEnabled(final String path) throws NodeExistsException, KeeperException { if (ZKUtil.checkExists(zookeeper, path) == -1) { // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the @@ -297,32 +265,6 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea } /** - * Tracker for (table, cf-list) map of this peer - */ - public class TableCFsTracker extends ZooKeeperNodeTracker { - - public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher, - Abortable abortable) { - super(watcher, tableCFsZNode, abortable); - } - - @Override - public synchronized void nodeCreated(String path) { - if (path.equals(node)) { - super.nodeCreated(path); - readTableCFsZnode(); - } - } - - @Override - public synchronized void nodeDataChanged(String path) { - if (path.equals(node)) { - super.nodeDataChanged(path); - } - } - } - - /** * Tracker for PeerConfigNode of this peer */ public class PeerConfigTracker extends ZooKeeperNodeTracker { http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index b8d04b4..37d157a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.replication; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -49,10 +50,8 @@ public interface ReplicationPeers { * Add a new remote slave cluster for replication. * @param peerId a short that identifies the cluster * @param peerConfig configuration for the replication slave cluster - * @param tableCFs the table and column-family list which will be replicated for this peer or null - * for all table and column families */ - void addPeer(String peerId, ReplicationPeerConfig peerConfig, String tableCFs) + void addPeer(String peerId, ReplicationPeerConfig peerConfig) throws ReplicationException; /** @@ -78,17 +77,19 @@ public interface ReplicationPeers { void disablePeer(String peerId) throws ReplicationException; /** - * Get the table and column-family list string of the peer from ZK. + * Get the table and column-family list of the peer from ZK. * @param peerId a short that identifies the cluster */ - public String getPeerTableCFsConfig(String peerId) throws ReplicationException; + public Map<TableName, List<String>> getPeerTableCFsConfig(String peerId) + throws ReplicationException; /** - * Set the table and column-family list string of the peer to ZK. + * Set the table and column-family list of the peer to ZK. * @param peerId a short that identifies the cluster * @param tableCFs the table and column-family list which will be replicated for this peer */ - public void setPeerTableCFsConfig(String peerId, String tableCFs) throws ReplicationException; + public void setPeerTableCFsConfig(String peerId, + Map<TableName, ? extends Collection<String>> tableCFs) throws ReplicationException; /** * Get the table and column-family-list map of the peer. http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index d717b0b..bb9842b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -78,15 +79,15 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re // Map of peer clusters keyed by their id private Map<String, ReplicationPeerZKImpl> peerClusters; - private final String tableCFsNodeName; private final ReplicationQueuesClient queuesClient; + private Abortable abortable; private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class); public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf, final ReplicationQueuesClient queuesClient, Abortable abortable) { super(zk, conf, abortable); - this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs"); + this.abortable = abortable; this.peerClusters = new ConcurrentHashMap<String, ReplicationPeerZKImpl>(); this.queuesClient = queuesClient; } @@ -104,7 +105,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } @Override - public void addPeer(String id, ReplicationPeerConfig peerConfig, String tableCFs) + public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException { try { if (peerExists(id)) { @@ -129,18 +130,15 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re ZKUtil.createWithParents(this.zookeeper, this.peersZNode); List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>(); - ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id), + ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id), ReplicationSerDeHelper.toByteArray(peerConfig)); // There is a race (if hbase.zookeeper.useMulti is false) // b/w PeerWatcher and ReplicationZookeeper#add method to create the // peer-state znode. This happens while adding a peer // The peer state data is set as "ENABLED" by default. ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES); - String tableCFsStr = (tableCFs == null) ? "" : tableCFs; - ZKUtilOp op3 = ZKUtilOp.createAndFailSilent(getTableCFsNode(id), Bytes.toBytes(tableCFsStr)); listOfOps.add(op1); listOfOps.add(op2); - listOfOps.add(op3); ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); // A peer is enabled by default } catch (KeeperException e) { @@ -175,13 +173,17 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } @Override - public String getPeerTableCFsConfig(String id) throws ReplicationException { + public Map<TableName, List<String>> getPeerTableCFsConfig(String id) throws ReplicationException { try { if (!peerExists(id)) { throw new IllegalArgumentException("peer " + id + " doesn't exist"); } try { - return Bytes.toString(ZKUtil.getData(this.zookeeper, getTableCFsNode(id))); + ReplicationPeerConfig rpc = getReplicationPeerConfig(id); + if (rpc == null) { + throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id); + } + return rpc.getTableCFsMap(); } catch (Exception e) { throw new ReplicationException(e); } @@ -191,20 +193,22 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } @Override - public void setPeerTableCFsConfig(String id, String tableCFsStr) throws ReplicationException { + public void setPeerTableCFsConfig(String id, + Map<TableName, ? extends Collection<String>> tableCFs) throws ReplicationException { try { if (!peerExists(id)) { throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id + " does not exist."); } - String tableCFsZKNode = getTableCFsNode(id); - byte[] tableCFs = Bytes.toBytes(tableCFsStr); - if (ZKUtil.checkExists(this.zookeeper, tableCFsZKNode) != -1) { - ZKUtil.setData(this.zookeeper, tableCFsZKNode, tableCFs); - } else { - ZKUtil.createAndWatch(this.zookeeper, tableCFsZKNode, tableCFs); + ReplicationPeerConfig rpc = getReplicationPeerConfig(id); + if (rpc == null) { + throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id); } - LOG.info("Peer tableCFs with id= " + id + " is now " + tableCFsStr); + rpc.setTableCFsMap(tableCFs); + ZKUtil.setData(this.zookeeper, getPeerNode(id), + ReplicationSerDeHelper.toByteArray(rpc)); + LOG.info("Peer tableCFs with id= " + id + " is now " + + ReplicationSerDeHelper.convertToString(tableCFs)); } catch (KeeperException e) { throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e); } @@ -289,7 +293,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re @Override public ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException { - String znode = ZKUtil.joinZNode(this.peersZNode, peerId); + String znode = getPeerNode(peerId); byte[] data = null; try { data = ZKUtil.getData(this.zookeeper, znode); @@ -458,14 +462,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re return true; } - private String getTableCFsNode(String id) { - return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName)); - } - - private String getPeerStateNode(String id) { - return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName)); - } - /** * Update the state znode of a peer cluster. * @param id @@ -506,22 +502,16 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } Configuration peerConf = pair.getSecond(); - ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst()); + ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(zookeeper, peerConf, peerId, + pair.getFirst(), abortable); try { - peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId)); + peer.startStateTracker(getPeerStateNode(peerId)); } catch (KeeperException e) { throw new ReplicationException("Error starting the peer state tracker for peerId=" + peerId, e); } try { - peer.startTableCFsTracker(this.zookeeper, this.getTableCFsNode(peerId)); - } catch (KeeperException e) { - throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" + - peerId, e); - } - - try { peer.startPeerConfigTracker(this.zookeeper, this.getPeerNode(peerId)); } catch(KeeperException e) { http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java index 05f909d..cdb95f7f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSerDeHelper.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hbase.replication; import com.google.protobuf.ByteString; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.exceptions.DeserializationException; @@ -28,8 +30,13 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Strings; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; import java.util.Map; @InterfaceAudience.Private @@ -39,6 +46,175 @@ public final class ReplicationSerDeHelper { private ReplicationSerDeHelper() {} + /** convert map to TableCFs Object */ + public static ZooKeeperProtos.TableCF[] convert( + Map<TableName, ? extends Collection<String>> tableCfs) { + if (tableCfs == null) { + return null; + } + List<ZooKeeperProtos.TableCF> tableCFList = new ArrayList<>(); + ZooKeeperProtos.TableCF.Builder tableCFBuilder = ZooKeeperProtos.TableCF.newBuilder(); + for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { + tableCFBuilder.clear(); + tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey())); + Collection<String> v = entry.getValue(); + if (v != null && !v.isEmpty()) { + for (String value : entry.getValue()) { + tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value)); + } + } + tableCFList.add(tableCFBuilder.build()); + } + return tableCFList.toArray(new ZooKeeperProtos.TableCF[tableCFList.size()]); + } + + public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) { + if (tableCfs == null) { + return null; + } + return convert(convert(tableCfs)); + } + + /** + * Convert string to TableCFs Object. + * This is only for read TableCFs information from TableCF node. + * Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3. + * */ + public static ZooKeeperProtos.TableCF[] convert(String tableCFsConfig) { + if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) { + return null; + } + List<ZooKeeperProtos.TableCF> tableCFList = new ArrayList<>(); + ZooKeeperProtos.TableCF.Builder tableCFBuilder = ZooKeeperProtos.TableCF.newBuilder(); + + String[] tables = tableCFsConfig.split(";"); + for (String tab : tables) { + // 1 ignore empty table config + tab = tab.trim(); + if (tab.length() == 0) { + continue; + } + // 2 split to "table" and "cf1,cf2" + // for each table: "table#cf1,cf2" or "table" + String[] pair = tab.split(":"); + String tabName = pair[0].trim(); + if (pair.length > 2 || tabName.length() == 0) { + LOG.info("incorrect format:" + tableCFsConfig); + continue; + } + + tableCFBuilder.clear(); + // split namespace from tableName + String ns = "default"; + String tName = tabName; + String[] dbs = tabName.split("\\."); + if (dbs != null && dbs.length == 2) { + ns = dbs[0]; + tName = dbs[1]; + } + tableCFBuilder.setTableName( + ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName))); + + // 3 parse "cf1,cf2" part to List<cf> + if (pair.length == 2) { + String[] cfsList = pair[1].split(","); + for (String cf : cfsList) { + String cfName = cf.trim(); + if (cfName.length() > 0) { + tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName)); + } + } + } + tableCFList.add(tableCFBuilder.build()); + } + return tableCFList.toArray(new ZooKeeperProtos.TableCF[tableCFList.size()]); + } + + /** + * Convert TableCFs Object to String. + * Output String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3 + * */ + public static String convert(ZooKeeperProtos.TableCF[] tableCFs) { + StringBuilder sb = new StringBuilder(); + for (int i = 0, n = tableCFs.length; i < n; i++) { + ZooKeeperProtos.TableCF tableCF = tableCFs[i]; + String namespace = tableCF.getTableName().getNamespace().toStringUtf8(); + if (!Strings.isEmpty(namespace)) { + sb.append(namespace).append("."). + append(tableCF.getTableName().getQualifier().toStringUtf8()) + .append(":"); + } else { + sb.append(tableCF.getTableName().toString()).append(":"); + } + for (int j = 0; j < tableCF.getFamiliesCount(); j++) { + sb.append(tableCF.getFamilies(j).toStringUtf8()).append(","); + } + sb.deleteCharAt(sb.length() - 1).append(";"); + } + if (sb.length() > 0) { + sb.deleteCharAt(sb.length() - 1); + } + return sb.toString(); + } + + /** + * Get TableCF in TableCFs, if not exist, return null. + * */ + public static ZooKeeperProtos.TableCF getTableCF(ZooKeeperProtos.TableCF[] tableCFs, + String table) { + for (int i = 0, n = tableCFs.length; i < n; i++) { + ZooKeeperProtos.TableCF tableCF = tableCFs[i]; + if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) { + return tableCF; + } + } + return null; + } + + /** + * Parse bytes into TableCFs. + * It is used for backward compatibility. + * Old format bytes have no PB_MAGIC Header + * */ + public static ZooKeeperProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException { + if (bytes == null) { + return null; + } + return ReplicationSerDeHelper.convert(Bytes.toString(bytes)); + } + + /** + * Convert tableCFs string into Map. + * */ + public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) { + ZooKeeperProtos.TableCF[] tableCFs = convert(tableCFsConfig); + return convert2Map(tableCFs); + } + + /** + * Convert tableCFs Object to Map. + * */ + public static Map<TableName, List<String>> convert2Map(ZooKeeperProtos.TableCF[] tableCFs) { + if (tableCFs == null || tableCFs.length == 0) { + return null; + } + Map<TableName, List<String>> tableCFsMap = new HashMap<TableName, List<String>>(); + for (int i = 0, n = tableCFs.length; i < n; i++) { + ZooKeeperProtos.TableCF tableCF = tableCFs[i]; + List<String> families = new ArrayList<>(); + for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) { + families.add(tableCF.getFamilies(j).toStringUtf8()); + } + if (families.size() > 0) { + tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families); + } else { + tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null); + } + } + + return tableCFsMap; + } + /** * @param bytes Content of a peer znode. * @return ClusterKey parsed from the passed bytes. @@ -82,6 +258,12 @@ public final class ReplicationSerDeHelper { for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) { peerConfig.getConfiguration().put(pair.getName(), pair.getValue()); } + + Map<TableName, ? extends Collection<String>> tableCFsMap = convert2Map( + peer.getTableCfsList().toArray(new ZooKeeperProtos.TableCF[peer.getTableCfsCount()])); + if (tableCFsMap != null) { + peerConfig.setTableCFsMap(tableCFsMap); + } return peerConfig; } @@ -119,6 +301,13 @@ public final class ReplicationSerDeHelper { .build()); } + ZooKeeperProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap()); + if (tableCFs != null) { + for (int i = 0; i < tableCFs.length; i++) { + builder.addTableCfs(tableCFs[i]); + } + } + return builder.build(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/66941910/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java index ed9359d..d0c3513 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; +import com.google.common.annotations.VisibleForTesting; + /** * This is a base class for maintaining replication state in zookeeper. @@ -52,6 +54,9 @@ public abstract class ReplicationStateZKBase { protected final String hfileRefsZNode; /** The cluster key of the local cluster */ protected final String ourClusterKey; + /** The name of the znode that contains tableCFs */ + protected final String tableCFsNodeName; + protected final ZooKeeperWatcher zookeeper; protected final Configuration conf; protected final Abortable abortable; @@ -77,6 +82,7 @@ public abstract class ReplicationStateZKBase { String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); + this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs"); this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf); this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName); this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); @@ -119,6 +125,17 @@ public abstract class ReplicationStateZKBase { return path.split("/").length == peersZNode.split("/").length + 1; } + @VisibleForTesting + protected String getTableCFsNode(String id) { + return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName)); + } + + @VisibleForTesting + protected String getPeerStateNode(String id) { + return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName)); + } + + @VisibleForTesting protected String getPeerNode(String id) { return ZKUtil.joinZNode(this.peersZNode, id); }