Repository: hbase Updated Branches: refs/heads/master af9d359b8 -> 9a78d0088
HBASE-17389 Convert all internal usages from ReplicationAdmin to Admin Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9a78d008 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9a78d008 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9a78d008 Branch: refs/heads/master Commit: 9a78d008841726ec2029215cddf0c0b2141771ae Parents: af9d359 Author: Guanghao Zhang <zg...@apache.org> Authored: Tue Feb 7 10:18:59 2017 +0800 Committer: Guanghao Zhang <zg...@apache.org> Committed: Tue Feb 7 10:18:59 2017 +0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/client/Admin.java | 26 ++++++ .../apache/hadoop/hbase/client/HBaseAdmin.java | 83 +++++++++++++++++++ .../client/replication/ReplicationAdmin.java | 33 ++++++++ .../hbase/client/replication/TableCFs.java | 12 +++ .../org/apache/hadoop/hbase/master/HMaster.java | 2 + .../master/cleaner/ReplicationMetaCleaner.java | 18 ++-- .../regionserver/DumpReplicationQueues.java | 57 ++++++------- .../hbase/util/ServerRegionReplicaUtil.java | 12 +-- .../replication/TestMasterReplication.java | 47 ++++------- ...sibilityLabelReplicationWithExpAsString.java | 4 +- .../TestVisibilityLabelsReplication.java | 6 +- .../src/main/ruby/hbase/replication_admin.rb | 63 +++++++------- .../src/main/ruby/shell/commands/list_peers.rb | 16 ++-- .../shell/commands/list_replicated_tables.rb | 17 ++-- .../test/ruby/hbase/replication_admin_test.rb | 86 ++++++++------------ 15 files changed, 305 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 232dbf4..cc14acd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.Future; @@ -49,6 +50,7 @@ import org.apache.hadoop.hbase.quotas.QuotaFilter; import org.apache.hadoop.hbase.quotas.QuotaRetriever; import org.apache.hadoop.hbase.quotas.QuotaSettings; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException; @@ -1907,6 +1909,30 @@ public interface Admin extends Abortable, Closeable { } /** + * Append the replicable table-cf config of the specified peer + * @param id a short that identifies the cluster + * @param tableCfs A map from tableName to column family names + * @throws ReplicationException + * @throws IOException + */ + default void appendReplicationPeerTableCFs(String id, + Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException, + IOException { + } + + /** + * Remove some table-cfs from config of the specified peer + * @param id a short name that identifies the cluster + * @param tableCfs A map from tableName to column family names + * @throws ReplicationException + * @throws IOException + */ + default void removeReplicationPeerTableCFs(String id, + Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException, + IOException { + } + + /** * Return a list of replication peers. * @return a list of replication peers description * @throws IOException http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 4e0a6c7..65070b9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -23,10 +23,13 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; @@ -81,6 +84,7 @@ import org.apache.hadoop.hbase.quotas.QuotaFilter; import org.apache.hadoop.hbase.quotas.QuotaRetriever; import org.apache.hadoop.hbase.quotas.QuotaSettings; import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; @@ -197,6 +201,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.KeeperException; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.google.protobuf.RpcController; @@ -3872,6 +3877,84 @@ public class HBaseAdmin implements Admin { } @Override + public void appendReplicationPeerTableCFs(String id, + Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException, + IOException { + if (tableCfs == null) { + throw new ReplicationException("tableCfs is null"); + } + ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); + Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); + if (preTableCfs == null) { + peerConfig.setTableCFsMap(tableCfs); + } else { + for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { + TableName table = entry.getKey(); + Collection<String> appendCfs = entry.getValue(); + if (preTableCfs.containsKey(table)) { + List<String> cfs = preTableCfs.get(table); + if (cfs == null || appendCfs == null || appendCfs.isEmpty()) { + preTableCfs.put(table, null); + } else { + Set<String> cfSet = new HashSet<String>(cfs); + cfSet.addAll(appendCfs); + preTableCfs.put(table, Lists.newArrayList(cfSet)); + } + } else { + if (appendCfs == null || appendCfs.isEmpty()) { + preTableCfs.put(table, null); + } else { + preTableCfs.put(table, Lists.newArrayList(appendCfs)); + } + } + } + } + updateReplicationPeerConfig(id, peerConfig); + } + + @Override + public void removeReplicationPeerTableCFs(String id, + Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException, + IOException { + if (tableCfs == null) { + throw new ReplicationException("tableCfs is null"); + } + ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); + Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap(); + if (preTableCfs == null) { + throw new ReplicationException("Table-Cfs for peer" + id + " is null"); + } + for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { + + TableName table = entry.getKey(); + Collection<String> removeCfs = entry.getValue(); + if (preTableCfs.containsKey(table)) { + List<String> cfs = preTableCfs.get(table); + if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { + preTableCfs.remove(table); + } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { + Set<String> cfSet = new HashSet<String>(cfs); + cfSet.removeAll(removeCfs); + if (cfSet.isEmpty()) { + preTableCfs.remove(table); + } else { + preTableCfs.put(table, Lists.newArrayList(cfSet)); + } + } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { + throw new ReplicationException("Cannot remove cf of table: " + table + + " which doesn't specify cfs from table-cfs config in peer: " + id); + } else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) { + throw new ReplicationException("Cannot remove table: " + table + + " which has specified cfs from table-cfs config in peer: " + id); + } + } else { + throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id); + } + } + updateReplicationPeerConfig(id, peerConfig); + } + + @Override public List<ReplicationPeerDescription> listReplicationPeers() throws IOException { return listReplicationPeers((Pattern)null); } http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index c6d580b..706f81e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -194,7 +194,11 @@ public class ReplicationAdmin implements Closeable { * Add a new remote slave cluster for replication. * @param id a short name that identifies the cluster * @param peerConfig configuration for the replication slave cluster + * @deprecated use + * {@link org.apache.hadoop.hbase.client.Admin#addReplicationPeer(String, ReplicationPeerConfig)} + * instead */ + @Deprecated public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException, IOException { checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), @@ -210,6 +214,12 @@ public class ReplicationAdmin implements Closeable { return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig); } + /** + * @deprecated use + * {@link org.apache.hadoop.hbase.client.Admin#updateReplicationPeerConfig(String, ReplicationPeerConfig)} + * instead + */ + @Deprecated public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws IOException { this.admin.updateReplicationPeerConfig(id, peerConfig); } @@ -217,7 +227,9 @@ public class ReplicationAdmin implements Closeable { /** * Removes a peer cluster and stops the replication to it. * @param id a short name that identifies the cluster + * @deprecated use {@link org.apache.hadoop.hbase.client.Admin#removeReplicationPeer(String)} instead */ + @Deprecated public void removePeer(String id) throws IOException { this.admin.removeReplicationPeer(id); } @@ -225,7 +237,10 @@ public class ReplicationAdmin implements Closeable { /** * Restart the replication stream to the specified peer. * @param id a short name that identifies the cluster + * @deprecated use {@link org.apache.hadoop.hbase.client.Admin#enableReplicationPeer(String)} + * instead */ + @Deprecated public void enablePeer(String id) throws IOException { this.admin.enableReplicationPeer(id); } @@ -233,7 +248,10 @@ public class ReplicationAdmin implements Closeable { /** * Stop the replication stream to the specified peer. * @param id a short name that identifies the cluster + * @deprecated use {@link org.apache.hadoop.hbase.client.Admin#disableReplicationPeer(String)} + * instead */ + @Deprecated public void disablePeer(String id) throws IOException { this.admin.disableReplicationPeer(id); } @@ -242,11 +260,17 @@ public class ReplicationAdmin implements Closeable { * Get the number of slave clusters the local cluster has. * @return number of slave clusters * @throws IOException + * @deprecated */ + @Deprecated public int getPeersCount() throws IOException { return this.admin.listReplicationPeers().size(); } + /** + * @deprecated use {@link org.apache.hadoop.hbase.client.Admin#listReplicationPeers()} instead + */ + @Deprecated public Map<String, ReplicationPeerConfig> listPeerConfigs() throws IOException { List<ReplicationPeerDescription> peers = this.admin.listReplicationPeers(); Map<String, ReplicationPeerConfig> result = new TreeMap<String, ReplicationPeerConfig>(); @@ -256,6 +280,11 @@ public class ReplicationAdmin implements Closeable { return result; } + /** + * @deprecated use {@link org.apache.hadoop.hbase.client.Admin#getReplicationPeerConfig(String)} + * instead + */ + @Deprecated public ReplicationPeerConfig getPeerConfig(String id) throws IOException { return admin.getReplicationPeerConfig(id); } @@ -294,6 +323,7 @@ public class ReplicationAdmin implements Closeable { * @throws ReplicationException * @throws IOException */ + @Deprecated public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException, IOException { if (tableCfs == null) { @@ -350,6 +380,7 @@ public class ReplicationAdmin implements Closeable { * @throws ReplicationException * @throws IOException */ + @Deprecated public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException, IOException { if (tableCfs == null) { @@ -398,6 +429,7 @@ public class ReplicationAdmin implements Closeable { * to indicate replicating all column families. Pass null for replicating all table and column * families */ + @Deprecated public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs) throws IOException { ReplicationPeerConfig peerConfig = getPeerConfig(id); @@ -411,6 +443,7 @@ public class ReplicationAdmin implements Closeable { * an IllegalArgumentException is thrown if it doesn't exist * @return true if replication is enabled to that peer, false if it isn't */ + @Deprecated public boolean getPeerState(String id) throws ReplicationException, IOException { List<ReplicationPeerDescription> peers = admin.listReplicationPeers(id); if (peers.isEmpty() || !id.equals(peers.get(0).getPeerId())) { http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/TableCFs.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/TableCFs.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/TableCFs.java index fc39087..f293586 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/TableCFs.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/TableCFs.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client.replication; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -46,4 +47,15 @@ public class TableCFs { public Map<String, Integer> getColumnFamilyMap() { return this.cfs; } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(table.getNameAsString()); + if (!cfs.isEmpty()) { + sb.append(":"); + sb.append(StringUtils.join(cfs.keySet(), ',')); + } + return sb.toString(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 3374405..4aff21a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -3177,6 +3177,8 @@ public class HMaster extends HRegionServer implements MasterServices { cpHost.preGetReplicationPeerConfig(peerId); } final ReplicationPeerConfig peerConfig = this.replicationManager.getPeerConfig(peerId); + LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId + ", config=" + + peerConfig); if (cpHost != null) { cpHost.postGetReplicationPeerConfig(peerId); } http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java index b133c56..5c56271 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationMetaCleaner.java @@ -35,11 +35,11 @@ import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.util.Bytes; /** @@ -50,14 +50,14 @@ public class ReplicationMetaCleaner extends ScheduledChore { private static final Log LOG = LogFactory.getLog(ReplicationMetaCleaner.class); - private ReplicationAdmin replicationAdmin; - private MasterServices master; + private final Admin admin; + private final MasterServices master; public ReplicationMetaCleaner(MasterServices master, Stoppable stoppable, int period) throws IOException { super("ReplicationMetaCleaner", stoppable, period); this.master = master; - replicationAdmin = new ReplicationAdmin(master.getConfiguration()); + admin = master.getConnection().getAdmin(); } @Override @@ -81,12 +81,12 @@ public class ReplicationMetaCleaner extends ScheduledChore { return; } - Map<String, ReplicationPeerConfig> peers = replicationAdmin.listPeerConfigs(); - for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) { - for (Map.Entry<TableName, List<String>> map : entry.getValue().getTableCFsMap() + List<ReplicationPeerDescription> peers = admin.listReplicationPeers(); + for (ReplicationPeerDescription peerDesc : peers) { + for (Map.Entry<TableName, List<String>> map : peerDesc.getPeerConfig().getTableCFsMap() .entrySet()) { if (serialTables.containsKey(map.getKey().getNameAsString())) { - serialTables.get(map.getKey().getNameAsString()).add(entry.getKey()); + serialTables.get(map.getKey().getNameAsString()).add(peerDesc.getPeerId()); break; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java index 766b551..9a1e2bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java @@ -21,12 +21,12 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,15 +37,16 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.client.replication.TableCFs; import org.apache.hadoop.hbase.io.WALLink; import org.apache.hadoop.hbase.procedure2.util.StringUtils; -import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; @@ -207,8 +208,8 @@ public class DumpReplicationQueues extends Configured implements Tool { Configuration conf = getConf(); HBaseAdmin.available(conf); - ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf); ClusterConnection connection = (ClusterConnection) ConnectionFactory.createConnection(conf); + Admin admin = connection.getAdmin(); ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "DumpReplicationQueues" + System.currentTimeMillis(), new WarnOnlyAbortable(), true); @@ -216,26 +217,28 @@ public class DumpReplicationQueues extends Configured implements Tool { try { // Our zk watcher LOG.info("Our Quorum: " + zkw.getQuorum()); - List<HashMap<String, String>> replicatedTables = replicationAdmin.listReplicated(); - if (replicatedTables.isEmpty()) { + List<TableCFs> replicatedTableCFs = admin.listReplicatedTableCFs(); + if (replicatedTableCFs.isEmpty()) { LOG.info("No tables with a configured replication peer were found."); return(0); } else { - LOG.info("Replicated Tables: " + replicatedTables); + LOG.info("Replicated Tables: " + replicatedTableCFs); } - Map<String, ReplicationPeerConfig> peerConfigs = replicationAdmin.listPeerConfigs(); + List<ReplicationPeerDescription> peers = admin.listReplicationPeers(); - if (peerConfigs.isEmpty()) { + if (peers.isEmpty()) { LOG.info("Replication is enabled but no peer configuration was found."); } System.out.println("Dumping replication peers and configurations:"); - System.out.println(dumpPeersState(replicationAdmin, peerConfigs)); + System.out.println(dumpPeersState(peers)); if (opts.isDistributed()) { LOG.info("Found [--distributed], will poll each RegionServer."); - System.out.println(dumpQueues(connection, zkw, peerConfigs.keySet(), opts.isHdfs())); + Set<String> peerIds = peers.stream().map((peer) -> peer.getPeerId()) + .collect(Collectors.toSet()); + System.out.println(dumpQueues(connection, zkw, peerIds, opts.isHdfs())); System.out.println(dumpReplicationSummary()); } else { // use ZK instead @@ -279,28 +282,22 @@ public class DumpReplicationQueues extends Configured implements Tool { return sb.toString(); } - public String dumpPeersState(ReplicationAdmin replicationAdmin, - Map<String, ReplicationPeerConfig> peerConfigs) throws Exception { + public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exception { Map<String, String> currentConf; StringBuilder sb = new StringBuilder(); - for (Map.Entry<String, ReplicationPeerConfig> peer : peerConfigs.entrySet()) { - try { - ReplicationPeerConfig peerConfig = peer.getValue(); - sb.append("Peer: " + peer.getKey() + "\n"); - sb.append(" " + "State: " - + (replicationAdmin.getPeerState(peer.getKey()) ? "ENABLED" : "DISABLED") + "\n"); - sb.append(" " + "Cluster Name: " + peerConfig.getClusterKey() + "\n"); - sb.append(" " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n"); - currentConf = peerConfig.getConfiguration(); - // Only show when we have a custom configuration for the peer - if (currentConf.size() > 1) { - sb.append(" " + "Peer Configuration: " + currentConf + "\n"); - } - sb.append(" " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n"); - sb.append(" " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n"); - } catch (ReplicationException re) { - sb.append("Got an exception while invoking ReplicationAdmin: " + re + "\n"); + for (ReplicationPeerDescription peer : peers) { + ReplicationPeerConfig peerConfig = peer.getPeerConfig(); + sb.append("Peer: " + peer.getPeerId() + "\n"); + sb.append(" " + "State: " + (peer.isEnabled() ? "ENABLED" : "DISABLED") + "\n"); + sb.append(" " + "Cluster Name: " + peerConfig.getClusterKey() + "\n"); + sb.append(" " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n"); + currentConf = peerConfig.getConfiguration(); + // Only show when we have a custom configuration for the peer + if (currentConf.size() > 1) { + sb.append(" " + "Peer Configuration: " + currentConf + "\n"); } + sb.append(" " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n"); + sb.append(" " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n"); } return sb.toString(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java index 9ecc9eb..648ccc6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java @@ -27,6 +27,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.io.HFileLink; @@ -148,10 +150,10 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { if (!isRegionReplicaReplicationEnabled(conf)) { return; } - ReplicationAdmin repAdmin = new ReplicationAdmin(conf); + Admin admin = ConnectionFactory.createConnection(conf).getAdmin(); ReplicationPeerConfig peerConfig = null; try { - peerConfig = repAdmin.getPeerConfig(REGION_REPLICA_REPLICATION_PEER); + peerConfig = admin.getReplicationPeerConfig(REGION_REPLICA_REPLICATION_PEER); } catch (ReplicationPeerNotFoundException e) { LOG.warn("Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER + " not exist", e); @@ -163,12 +165,10 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { peerConfig = new ReplicationPeerConfig(); peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf)); peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName()); - repAdmin.addPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, null); + admin.addReplicationPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig); } - } catch (ReplicationException ex) { - throw new IOException(ex); } finally { - repAdmin.close(); + admin.close(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index c1b6f4a..3b6718a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -514,51 +514,34 @@ public class TestMasterReplication { private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber) throws Exception { - ReplicationAdmin replicationAdmin = null; - try { - replicationAdmin = new ReplicationAdmin( - configurations[masterClusterNumber]); - ReplicationPeerConfig rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(utilities[slaveClusterNumber].getClusterKey()); - replicationAdmin.addPeer(id, rpc, null); - } finally { - close(replicationAdmin); + try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber]) + .getAdmin()) { + admin.addReplicationPeer(id, + new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())); } } private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs) throws Exception { - ReplicationAdmin replicationAdmin = null; - try { - replicationAdmin = new ReplicationAdmin(configurations[masterClusterNumber]); - ReplicationPeerConfig replicationPeerConfig = new ReplicationPeerConfig(); - replicationPeerConfig.setClusterKey(utilities[slaveClusterNumber].getClusterKey()); - replicationAdmin.addPeer(id, replicationPeerConfig, - ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs)); - } finally { - close(replicationAdmin); + try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber]) + .getAdmin()) { + admin.addReplicationPeer(id, + new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey()) + .setTableCFsMap(ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs))); } } private void disablePeer(String id, int masterClusterNumber) throws Exception { - ReplicationAdmin replicationAdmin = null; - try { - replicationAdmin = new ReplicationAdmin( - configurations[masterClusterNumber]); - replicationAdmin.disablePeer(id); - } finally { - close(replicationAdmin); + try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber]) + .getAdmin()) { + admin.disableReplicationPeer(id); } } private void enablePeer(String id, int masterClusterNumber) throws Exception { - ReplicationAdmin replicationAdmin = null; - try { - replicationAdmin = new ReplicationAdmin( - configurations[masterClusterNumber]); - replicationAdmin.enablePeer(id); - } finally { - close(replicationAdmin); + try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber]) + .getAdmin()) { + admin.enableReplicationPeer(id); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java index dd1fe2a..13d0e3c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelReplicationWithExpAsString.java @@ -113,7 +113,7 @@ public class TestVisibilityLabelReplicationWithExpAsString extends TestVisibilit TEST_UTIL.startMiniZKCluster(); MiniZooKeeperCluster miniZK = TEST_UTIL.getZkCluster(); zkw1 = new ZooKeeperWatcher(conf, "cluster1", null, true); - replicationAdmin = new ReplicationAdmin(conf); + admin = TEST_UTIL.getAdmin(); // Base conf2 on conf1 so it gets the right zk cluster. conf1 = HBaseConfiguration.create(conf); @@ -136,7 +136,7 @@ public class TestVisibilityLabelReplicationWithExpAsString extends TestVisibilit ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(TEST_UTIL1.getClusterKey()); - replicationAdmin.addPeer("2", rpc, null); + admin.addReplicationPeer("2", rpc); HTableDescriptor table = new HTableDescriptor(TABLE_NAME); HColumnDescriptor desc = new HColumnDescriptor(fam); http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java index 5977c23..2181ddb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java @@ -89,7 +89,7 @@ public class TestVisibilityLabelsReplication { protected static Configuration conf; protected static Configuration conf1; protected static TableName TABLE_NAME = TableName.valueOf("TABLE_NAME"); - protected static ReplicationAdmin replicationAdmin; + protected static Admin admin; public static final String TOPSECRET = "topsecret"; public static final String PUBLIC = "public"; public static final String PRIVATE = "private"; @@ -161,7 +161,7 @@ public class TestVisibilityLabelsReplication { TEST_UTIL.startMiniZKCluster(); MiniZooKeeperCluster miniZK = TEST_UTIL.getZkCluster(); zkw1 = new ZooKeeperWatcher(conf, "cluster1", null, true); - replicationAdmin = new ReplicationAdmin(conf); + admin = TEST_UTIL.getAdmin(); // Base conf2 on conf1 so it gets the right zk cluster. conf1 = HBaseConfiguration.create(conf); @@ -185,7 +185,7 @@ public class TestVisibilityLabelsReplication { ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(TEST_UTIL1.getClusterKey()); - replicationAdmin.addPeer("2", rpc, null); + admin.addReplicationPeer("2", rpc); Admin hBaseAdmin = TEST_UTIL.getAdmin(); HTableDescriptor table = new HTableDescriptor(TABLE_NAME); http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-shell/src/main/ruby/hbase/replication_admin.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index 3c94db2..b9df821 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -20,6 +20,7 @@ include Java java_import org.apache.hadoop.hbase.client.replication.ReplicationAdmin +java_import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper java_import org.apache.hadoop.hbase.replication.ReplicationPeerConfig java_import org.apache.hadoop.hbase.util.Bytes java_import org.apache.hadoop.hbase.zookeeper.ZKConfig @@ -34,6 +35,7 @@ module Hbase def initialize(configuration) @replication_admin = ReplicationAdmin.new(configuration) @configuration = configuration + @admin = ConnectionFactory.createConnection(configuration).getAdmin end #---------------------------------------------------------------------------------------------- @@ -100,7 +102,7 @@ module Hbase } replication_peer_config.set_table_cfs_map(map) end - @replication_admin.add_peer(id, replication_peer_config) + @admin.addReplicationPeer(id, replication_peer_config) else raise(ArgumentError, "args must be a Hash") end @@ -109,46 +111,40 @@ module Hbase #---------------------------------------------------------------------------------------------- # Remove a peer cluster, stops the replication def remove_peer(id) - @replication_admin.removePeer(id) + @admin.removeReplicationPeer(id) end - #--------------------------------------------------------------------------------------------- # Show replcated tables/column families, and their ReplicationType def list_replicated_tables(regex = ".*") pattern = java.util.regex.Pattern.compile(regex) - list = @replication_admin.listReplicated() - list.select {|s| pattern.match(s.get(ReplicationAdmin::TNAME))} + list = @admin.listReplicatedTableCFs() + list.select {|t| pattern.match(t.getTable().getNameAsString())} end #---------------------------------------------------------------------------------------------- # List all peer clusters def list_peers - @replication_admin.listPeerConfigs - end - - #---------------------------------------------------------------------------------------------- - # Get peer cluster state - def get_peer_state(id) - @replication_admin.getPeerState(id) ? "ENABLED" : "DISABLED" + @admin.listReplicationPeers end #---------------------------------------------------------------------------------------------- # Restart the replication stream to the specified peer def enable_peer(id) - @replication_admin.enablePeer(id) + @admin.enableReplicationPeer(id) end #---------------------------------------------------------------------------------------------- # Stop the replication stream to the specified peer def disable_peer(id) - @replication_admin.disablePeer(id) + @admin.disableReplicationPeer(id) end #---------------------------------------------------------------------------------------------- # Show the current tableCFs config for the specified peer def show_peer_tableCFs(id) - @replication_admin.getPeerTableCFs(id) + rpc = @admin.getReplicationPeerConfig(id) + ReplicationSerDeHelper.convertToString(rpc.getTableCFsMap()) end #---------------------------------------------------------------------------------------------- @@ -160,8 +156,12 @@ module Hbase tableCFs.each{|key, val| map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val) } + rpc = get_peer_config(id) + unless rpc.nil? + rpc.setTableCFsMap(map) + @admin.updateReplicationPeerConfig(id, rpc) + end end - @replication_admin.setPeerTableCFs(id, map) end #---------------------------------------------------------------------------------------------- @@ -174,7 +174,7 @@ module Hbase map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val) } end - @replication_admin.appendPeerTableCFs(id, map) + @admin.appendReplicationPeerTableCFs(id, map) end #---------------------------------------------------------------------------------------------- @@ -187,7 +187,7 @@ module Hbase map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val) } end - @replication_admin.removePeerTableCFs(id, map) + @admin.removeReplicationPeerTableCFs(id, map) end # Set new namespaces config for the specified peer @@ -200,7 +200,7 @@ module Hbase rpc = get_peer_config(id) unless rpc.nil? rpc.setNamespaces(ns_set) - @replication_admin.updatePeerConfig(id, rpc) + @admin.updateReplicationPeerConfig(id, rpc) end end end @@ -218,7 +218,7 @@ module Hbase ns_set.add(n) end rpc.setNamespaces(ns_set) - @replication_admin.updatePeerConfig(id, rpc) + @admin.updateReplicationPeerConfig(id, rpc) end end end @@ -235,7 +235,7 @@ module Hbase end end rpc.setNamespaces(ns_set) - @replication_admin.updatePeerConfig(id, rpc) + @admin.updateReplicationPeerConfig(id, rpc) end end end @@ -257,7 +257,7 @@ module Hbase rpc = get_peer_config(id) unless rpc.nil? rpc.setBandwidth(bandwidth) - @replication_admin.updatePeerConfig(id, rpc) + @admin.updateReplicationPeerConfig(id, rpc) end end @@ -265,26 +265,27 @@ module Hbase # Enables a table's replication switch def enable_tablerep(table_name) tableName = TableName.valueOf(table_name) - @replication_admin.enableTableRep(tableName) + @admin.enableTableReplication(tableName) end #---------------------------------------------------------------------------------------------- # Disables a table's replication switch def disable_tablerep(table_name) tableName = TableName.valueOf(table_name) - @replication_admin.disableTableRep(tableName) + @admin.disableTableReplication(tableName) end def list_peer_configs - @replication_admin.list_peer_configs + map = java.util.HashMap.new + peers = @admin.listReplicationPeers + peers.each do |peer| + map.put(peer.getPeerId, peer.getPeerConfig) + end + return map end def get_peer_config(id) - @replication_admin.get_peer_config(id) - end - - def peer_added(id) - @replication_admin.peer_added(id) + @admin.getReplicationPeerConfig(id) end def update_peer_config(id, args={}) @@ -306,7 +307,7 @@ module Hbase } end - @replication_admin.update_peer_config(id, replication_peer_config) + @admin.updateReplicationPeerConfig(id, replication_peer_config) end end end http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-shell/src/main/ruby/shell/commands/list_peers.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb index 7d53158..2dd8483 100644 --- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb +++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb @@ -35,13 +35,15 @@ EOF formatter.header(["PEER_ID", "CLUSTER_KEY", "ENDPOINT_CLASSNAME", "STATE", "NAMESPACES", "TABLE_CFS", "BANDWIDTH"]) - peers.entrySet().each do |e| - state = replication_admin.get_peer_state(e.key) - namespaces = replication_admin.show_peer_namespaces(e.value) - tableCFs = replication_admin.show_peer_tableCFs(e.key) - formatter.row([ e.key, e.value.getClusterKey, - e.value.getReplicationEndpointImpl, state, namespaces, tableCFs, - e.value.getBandwidth ]) + peers.each do |peer| + id = peer.getPeerId + state = peer.isEnabled ? "ENABLED" : "DISABLED" + config = peer.getPeerConfig + namespaces = replication_admin.show_peer_namespaces(config) + tableCFs = replication_admin.show_peer_tableCFs(id) + formatter.row([ id, config.getClusterKey, + config.getReplicationEndpointImpl, state, namespaces, tableCFs, + config.getBandwidth ]) end formatter.footer() http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-shell/src/main/ruby/shell/commands/list_replicated_tables.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/main/ruby/shell/commands/list_replicated_tables.rb b/hbase-shell/src/main/ruby/shell/commands/list_replicated_tables.rb index 142adfc..4200cae 100644 --- a/hbase-shell/src/main/ruby/shell/commands/list_replicated_tables.rb +++ b/hbase-shell/src/main/ruby/shell/commands/list_replicated_tables.rb @@ -34,12 +34,19 @@ EOF formatter.header([ "TABLE:COLUMNFAMILY", "ReplicationType" ], [ 32 ]) list = replication_admin.list_replicated_tables(regex) list.each do |e| - if e.get(org.apache.hadoop.hbase.client.replication.ReplicationAdmin::REPLICATIONTYPE) == org.apache.hadoop.hbase.client.replication.ReplicationAdmin::REPLICATIONGLOBAL - replicateType = "GLOBAL" - else - replicateType = "unknown" + map = e.getColumnFamilyMap() + map.each do |cf| + if cf[1] == org.apache.hadoop.hbase.HConstants::REPLICATION_SCOPE_LOCAL + replicateType = "LOCAL" + elsif cf[1] == org.apache.hadoop.hbase.HConstants::REPLICATION_SCOPE_GLOBAL + replicateType = "GLOBAL" + elsif cf[1] == org.apache.hadoop.hbase.HConstants::REPLICATION_SCOPE_SERIAL + replicateType = "SERIAL" + else + replicateType = "UNKNOWN" + end + formatter.row([e.getTable().getNameAsString() + ":" + cf[0], replicateType], true, [32]) end - formatter.row([e.get(org.apache.hadoop.hbase.client.replication.ReplicationAdmin::TNAME) + ":" + e.get(org.apache.hadoop.hbase.client.replication.ReplicationAdmin::CFNAME), replicateType], true, [32]) end formatter.footer() end http://git-wip-us.apache.org/repos/asf/hbase/blob/9a78d008/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb ---------------------------------------------------------------------- diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index cd1fe35..0d92287 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -73,8 +73,8 @@ module Hbase command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key}) assert_equal(1, command(:list_peers).length) - assert(command(:list_peers).key?(@peer_id)) - assert_equal(cluster_key, command(:list_peers).fetch(@peer_id).get_cluster_key) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) # cleanup for future tests command(:remove_peer, @peer_id) @@ -86,8 +86,8 @@ module Hbase command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key}) assert_equal(1, command(:list_peers).length) - assert(command(:list_peers).key?(@peer_id)) - assert_equal(cluster_key, command(:list_peers).fetch(@peer_id).get_cluster_key) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) # cleanup for future tests command(:remove_peer, @peer_id) @@ -100,8 +100,8 @@ module Hbase command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) - assert(command(:list_peers).key?(@peer_id)) - assert_equal(cluster_key, command(:list_peers).fetch(@peer_id).get_cluster_key) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) # cleanup for future tests command(:remove_peer, @peer_id) @@ -114,8 +114,8 @@ module Hbase command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) - assert(command(:list_peers).key?(@peer_id)) - assert_equal(cluster_key, command(:list_peers).fetch(@peer_id).get_cluster_key) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) # cleanup for future tests command(:remove_peer, @peer_id) @@ -130,8 +130,8 @@ module Hbase command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) - assert(command(:list_peers).key?(@peer_id)) - peer_config = command(:list_peers).fetch(@peer_id) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + peer_config = command(:list_peers).get(0).getPeerConfig assert_equal(cluster_key, peer_config.get_cluster_key) assert_equal(namespaces_str, replication_admin.show_peer_namespaces(peer_config)) @@ -152,8 +152,8 @@ module Hbase command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) - assert(command(:list_peers).key?(@peer_id)) - peer_config = command(:list_peers).fetch(@peer_id) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + peer_config = command(:list_peers).get(0).getPeerConfig assert_equal(cluster_key, peer_config.get_cluster_key) assert_equal(namespaces_str, replication_admin.show_peer_namespaces(peer_config)) @@ -186,8 +186,8 @@ module Hbase command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) - assert(command(:list_peers).key?(@peer_id)) - assert_equal(cluster_key, command(:list_peers).fetch(@peer_id).get_cluster_key) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) assert_tablecfs_equal(table_cfs, command(:get_peer_config, @peer_id).getTableCFsMap()) # cleanup for future tests @@ -210,8 +210,8 @@ module Hbase command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) - assert(command(:list_peers).key?(@peer_id)) - assert_equal(cluster_key, command(:list_peers).fetch(@peer_id).get_cluster_key) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) table_cfs = { "table1" => [], "table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] } command(:set_peer_tableCFs, @peer_id, table_cfs) @@ -227,8 +227,8 @@ module Hbase command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) - assert(command(:list_peers).key?(@peer_id)) - assert_equal(cluster_key, command(:list_peers).fetch(@peer_id).get_cluster_key) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] } command(:append_peer_tableCFs, @peer_id, table_cfs) @@ -249,8 +249,8 @@ module Hbase command(:add_peer, @peer_id, args) assert_equal(1, command(:list_peers).length) - assert(command(:list_peers).key?(@peer_id)) - assert_equal(cluster_key, command(:list_peers).fetch(@peer_id).get_cluster_key) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey) table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] } command(:remove_peer_tableCFs, @peer_id, { "ns3:table3" => ["cf1", "cf2"] }) @@ -268,15 +268,11 @@ module Hbase args = { CLUSTER_KEY => cluster_key } command(:add_peer, @peer_id, args) - # Normally the ReplicationSourceManager will call ReplicationPeer#peer_added - # but here we have to do it ourselves - replication_admin.peer_added(@peer_id) - command(:set_peer_namespaces, @peer_id, namespaces) assert_equal(1, command(:list_peers).length) - assert(command(:list_peers).key?(@peer_id)) - peer_config = command(:list_peers).fetch(@peer_id) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + peer_config = command(:list_peers).get(0).getPeerConfig assert_equal(namespaces_str, replication_admin.show_peer_namespaces(peer_config)) @@ -292,15 +288,11 @@ module Hbase args = { CLUSTER_KEY => cluster_key } command(:add_peer, @peer_id, args) - # Normally the ReplicationSourceManager will call ReplicationPeer#peer_added - # but here we have to do it ourselves - replication_admin.peer_added(@peer_id) - command(:append_peer_namespaces, @peer_id, namespaces) assert_equal(1, command(:list_peers).length) - assert(command(:list_peers).key?(@peer_id)) - peer_config = command(:list_peers).fetch(@peer_id) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + peer_config = command(:list_peers).get(0).getPeerConfig assert_equal(namespaces_str, replication_admin.show_peer_namespaces(peer_config)) @@ -309,8 +301,8 @@ module Hbase command(:append_peer_namespaces, @peer_id, namespaces) assert_equal(1, command(:list_peers).length) - assert(command(:list_peers).key?(@peer_id)) - peer_config = command(:list_peers).fetch(@peer_id) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + peer_config = command(:list_peers).get(0).getPeerConfig assert_equal(namespaces_str, replication_admin.show_peer_namespaces(peer_config)) @@ -318,8 +310,8 @@ module Hbase command(:append_peer_namespaces, @peer_id, namespaces) assert_equal(1, command(:list_peers).length) - assert(command(:list_peers).key?(@peer_id)) - peer_config = command(:list_peers).fetch(@peer_id) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + peer_config = command(:list_peers).get(0).getPeerConfig assert_equal(namespaces_str, replication_admin.show_peer_namespaces(peer_config)) @@ -334,17 +326,13 @@ module Hbase args = { CLUSTER_KEY => cluster_key, NAMESPACES => namespaces } command(:add_peer, @peer_id, args) - # Normally the ReplicationSourceManager will call ReplicationPeer#peer_added - # but here we have to do it ourselves - replication_admin.peer_added(@peer_id) - namespaces = ["ns1", "ns2"] namespaces_str = "ns3" command(:remove_peer_namespaces, @peer_id, namespaces) assert_equal(1, command(:list_peers).length) - assert(command(:list_peers).key?(@peer_id)) - peer_config = command(:list_peers).fetch(@peer_id) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + peer_config = command(:list_peers).get(0).getPeerConfig assert_equal(namespaces_str, replication_admin.show_peer_namespaces(peer_config)) @@ -353,8 +341,8 @@ module Hbase command(:remove_peer_namespaces, @peer_id, namespaces) assert_equal(1, command(:list_peers).length) - assert(command(:list_peers).key?(@peer_id)) - peer_config = command(:list_peers).fetch(@peer_id) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + peer_config = command(:list_peers).get(0).getPeerConfig assert_equal(namespaces_str, replication_admin.show_peer_namespaces(peer_config)) @@ -362,8 +350,8 @@ module Hbase command(:remove_peer_namespaces, @peer_id, namespaces) assert_equal(1, command(:list_peers).length) - assert(command(:list_peers).key?(@peer_id)) - peer_config = command(:list_peers).fetch(@peer_id) + assert_equal(@peer_id, command(:list_peers).get(0).getPeerId) + peer_config = command(:list_peers).get(0).getPeerConfig assert_equal(namespaces_str, replication_admin.show_peer_namespaces(peer_config)) @@ -375,9 +363,6 @@ module Hbase cluster_key = "localhost:2181:/hbase-test" args = { CLUSTER_KEY => cluster_key } command(:add_peer, @peer_id, args) - # Normally the ReplicationSourceManager will call ReplicationPeer#peer_added - # but here we have to do it ourselves - replication_admin.peer_added(@peer_id) peer_config = command(:get_peer_config, @peer_id) assert_equal(0, peer_config.get_bandwidth) @@ -442,9 +427,6 @@ module Hbase args = { ENDPOINT_CLASSNAME => repl_impl, CONFIG => config_params, DATA => data_params} command(:add_peer, @peer_id, args) - #Normally the ReplicationSourceManager will call ReplicationPeer#peer_added, but here we have to do it ourselves - replication_admin.peer_added(@peer_id) - new_config_params = { "config1" => "new_value1" } new_data_params = {"data1" => "new_value1"} new_args = {CONFIG => new_config_params, DATA => new_data_params}