HBASE-19661 Replace ReplicationStateZKBase with ZKReplicationStorageBase
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/abe180d3 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/abe180d3 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/abe180d3 Branch: refs/heads/HBASE-19397-branch-2 Commit: abe180d34d79d5e56eddce357a9a97a2d527ae8b Parents: af8ad6a Author: huzheng <open...@gmail.com> Authored: Fri Dec 29 15:55:28 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Tue Jan 30 09:27:47 2018 +0800 ---------------------------------------------------------------------- .../hbase/replication/ReplicationFactory.java | 5 +- .../replication/ReplicationStateZKBase.java | 153 ------------------- .../replication/ReplicationTrackerZKImpl.java | 21 +-- .../replication/ZKReplicationPeerStorage.java | 24 ++- .../replication/ZKReplicationStorageBase.java | 13 +- .../org/apache/hadoop/hbase/master/HMaster.java | 4 +- .../master/ReplicationPeerConfigUpgrader.java | 128 ++++++++-------- .../regionserver/DumpReplicationQueues.java | 18 +-- .../replication/regionserver/Replication.java | 3 +- .../org/apache/hadoop/hbase/util/HBaseFsck.java | 3 +- .../TestReplicationTrackerZKImpl.java | 3 +- .../replication/master/TestTableCFsUpdater.java | 41 ++--- .../TestReplicationSourceManager.java | 6 +- 13 files changed, 136 insertions(+), 286 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/abe180d3/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java index 6c66aff..2a970ba 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java @@ -33,9 +33,8 @@ public class ReplicationFactory { return new ReplicationPeers(zk, conf); } - public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper, - final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable, + public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper, Abortable abortable, Stoppable stopper) { - return new ReplicationTrackerZKImpl(zookeeper, replicationPeers, conf, abortable, stopper); + return new ReplicationTrackerZKImpl(zookeeper, abortable, stopper); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/abe180d3/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java deleted file mode 100644 index f49537c..0000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; -import org.apache.hadoop.hbase.zookeeper.ZKConfig; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; - -/** - * This is a base class for maintaining replication state in zookeeper. - */ -@InterfaceAudience.Private -public abstract class ReplicationStateZKBase { - - /** - * The name of the znode that contains the replication status of a remote slave (i.e. peer) - * cluster. - */ - protected final String peerStateNodeName; - /** The name of the base znode that contains all replication state. */ - protected final String replicationZNode; - /** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */ - protected final String peersZNode; - /** The name of the znode that contains all replication queues */ - protected final String queuesZNode; - /** The name of the znode that contains queues of hfile references to be replicated */ - protected final String hfileRefsZNode; - /** The cluster key of the local cluster */ - protected final String ourClusterKey; - /** The name of the znode that contains tableCFs */ - protected final String tableCFsNodeName; - - protected final ZKWatcher zookeeper; - protected final Configuration conf; - protected final Abortable abortable; - - public static final byte[] ENABLED_ZNODE_BYTES = - toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); - public static final byte[] DISABLED_ZNODE_BYTES = - toByteArray(ReplicationProtos.ReplicationState.State.DISABLED); - public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY = - "zookeeper.znode.replication.hfile.refs"; - public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; - - public ReplicationStateZKBase(ZKWatcher zookeeper, Configuration conf, - Abortable abortable) { - this.zookeeper = zookeeper; - this.conf = conf; - this.abortable = abortable; - - String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); - String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); - String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); - String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, - ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); - this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); - this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs"); - this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf); - this.replicationZNode = ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, - replicationZNodeName); - this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName); - this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName); - this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName); - } - - public List<String> getListOfReplicators() { - List<String> result = null; - try { - result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode); - } catch (KeeperException e) { - this.abortable.abort("Failed to get list of replicators", e); - } - return result; - } - - /** - * @param state - * @return Serialized protobuf of <code>state</code> with pb magic prefix prepended suitable for - * use as content of a peer-state znode under a peer cluster id as in - * /hbase/replication/peers/PEER_ID/peer-state. - */ - protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.State state) { - ReplicationProtos.ReplicationState msg = - ReplicationProtos.ReplicationState.newBuilder().setState(state).build(); - // There is no toByteArray on this pb Message? - // 32 bytes is default which seems fair enough here. - try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - CodedOutputStream cos = CodedOutputStream.newInstance(baos, 16); - msg.writeTo(cos); - cos.flush(); - baos.flush(); - return ProtobufUtil.prependPBMagic(baos.toByteArray()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - protected boolean peerExists(String id) throws KeeperException { - return ZKUtil.checkExists(this.zookeeper, ZNodePaths.joinZNode(this.peersZNode, id)) >= 0; - } - - /** - * Determine if a ZK path points to a peer node. - * @param path path to be checked - * @return true if the path points to a peer node, otherwise false - */ - protected boolean isPeerPath(String path) { - return path.split("/").length == peersZNode.split("/").length + 1; - } - - @VisibleForTesting - protected String getTableCFsNode(String id) { - return ZNodePaths.joinZNode(this.peersZNode, ZNodePaths.joinZNode(id, this.tableCFsNodeName)); - } - - @VisibleForTesting - protected String getPeerStateNode(String id) { - return ZNodePaths.joinZNode(this.peersZNode, ZNodePaths.joinZNode(id, this.peerStateNodeName)); - } - @VisibleForTesting - protected String getPeerNode(String id) { - return ZNodePaths.joinZNode(this.peersZNode, id); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/abe180d3/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java index 5659e4b..16a1668 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java @@ -20,14 +20,12 @@ package org.apache.hadoop.hbase.replication; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; - -import org.apache.hadoop.hbase.zookeeper.ZKListener; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.zookeeper.ZKListener; import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,9 +36,14 @@ import org.slf4j.LoggerFactory; * interface. */ @InterfaceAudience.Private -public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements ReplicationTracker { +public class ReplicationTrackerZKImpl implements ReplicationTracker { private static final Logger LOG = LoggerFactory.getLogger(ReplicationTrackerZKImpl.class); + + // Zookeeper + private final ZKWatcher zookeeper; + // Server to abort. + private final Abortable abortable; // All about stopping private final Stoppable stopper; // listeners to be notified @@ -48,9 +51,9 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements // List of all the other region servers in this cluster private final ArrayList<String> otherRegionServers = new ArrayList<>(); - public ReplicationTrackerZKImpl(ZKWatcher zookeeper, final ReplicationPeers replicationPeers, - Configuration conf, Abortable abortable, Stoppable stopper) { - super(zookeeper, conf, abortable); + public ReplicationTrackerZKImpl(ZKWatcher zookeeper, Abortable abortable, Stoppable stopper) { + this.zookeeper = zookeeper; + this.abortable = abortable; this.stopper = stopper; this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper)); // watch the changes http://git-wip-us.apache.org/repos/asf/hbase/blob/abe180d3/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java index 42d4b3f..a53500a 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; @@ -36,7 +37,14 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; * ZK based replication peer storage. */ @InterfaceAudience.Private -class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements ReplicationPeerStorage { +public class ZKReplicationPeerStorage extends ZKReplicationStorageBase + implements ReplicationPeerStorage { + + public static final String PEERS_ZNODE = "zookeeper.znode.replication.peers"; + public static final String PEERS_ZNODE_DEFAULT = "peers"; + + public static final String PEERS_STATE_ZNODE = "zookeeper.znode.replication.peers.state"; + public static final String PEERS_STATE_ZNODE_DEFAULT = "peer-state"; public static final byte[] ENABLED_ZNODE_BYTES = toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); @@ -56,16 +64,18 @@ class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements Repli public ZKReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) { super(zookeeper, conf); - this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); - String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); + this.peerStateNodeName = conf.get(PEERS_STATE_ZNODE, PEERS_STATE_ZNODE_DEFAULT); + String peersZNodeName = conf.get(PEERS_ZNODE, PEERS_ZNODE_DEFAULT); this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName); } - private String getPeerStateNode(String peerId) { + @VisibleForTesting + public String getPeerStateNode(String peerId) { return ZNodePaths.joinZNode(getPeerNode(peerId), peerStateNodeName); } - private String getPeerNode(String peerId) { + @VisibleForTesting + public String getPeerNode(String peerId) { return ZNodePaths.joinZNode(peersZNode, peerId); } @@ -82,8 +92,8 @@ class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements Repli enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)), false); } catch (KeeperException e) { - throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>" + - peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e); + throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>" + + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/abe180d3/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java index 2321e4f..7190aeb 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication; import java.io.ByteArrayOutputStream; import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; @@ -34,7 +35,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; * zookeeper. */ @InterfaceAudience.Private -class ZKReplicationStorageBase { +public class ZKReplicationStorageBase { + + public static final String REPLICATION_ZNODE = "zookeeper.znode.replication"; + public static final String REPLICATION_ZNODE_DEFAULT = "replication"; /** The name of the base znode that contains all replication state. */ protected final String replicationZNode; @@ -45,10 +49,9 @@ class ZKReplicationStorageBase { protected ZKReplicationStorageBase(ZKWatcher zookeeper, Configuration conf) { this.zookeeper = zookeeper; this.conf = conf; - String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); - this.replicationZNode = - ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, replicationZNodeName); + this.replicationZNode = ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, + conf.get(REPLICATION_ZNODE, REPLICATION_ZNODE_DEFAULT)); } /** @@ -58,7 +61,7 @@ class ZKReplicationStorageBase { */ protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.State state) { ReplicationProtos.ReplicationState msg = - ReplicationProtos.ReplicationState.newBuilder().setState(state).build(); + ReplicationProtos.ReplicationState.newBuilder().setState(state).build(); // There is no toByteArray on this pb Message? // 32 bytes is default which seems fair enough here. try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/abe180d3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 2b978dd..650bcae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -829,8 +829,8 @@ public class HMaster extends HRegionServer implements MasterServices { // This is for backwards compatibility // See HBASE-11393 status.setStatus("Update TableCFs node in ZNode"); - ReplicationPeerConfigUpgrader tableCFsUpdater = new ReplicationPeerConfigUpgrader(zooKeeper, - conf, this.clusterConnection); + ReplicationPeerConfigUpgrader tableCFsUpdater = + new ReplicationPeerConfigUpgrader(zooKeeper, conf); tableCFsUpdater.copyTableCFs(); // Add the Observer to delete space quotas on table deletion before starting all CPs by http://git-wip-us.apache.org/repos/asf/hbase/blob/abe180d3/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java index ea5509f..b6e8862 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java @@ -18,96 +18,107 @@ */ package org.apache.hadoop.hbase.replication.master; +import static org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage.PEERS_ZNODE; +import static org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage.PEERS_ZNODE_DEFAULT; +import static org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.REPLICATION_ZNODE; +import static org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT; + import java.io.IOException; -import java.util.List; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; -import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; +import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** - * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x. - * It will be removed in HBase 3.x. See HBASE-11393 + * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x. It will + * be removed in HBase 3.x. See HBASE-11393 */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase { +public class ReplicationPeerConfigUpgrader{ + + private static final String TABLE_CFS_ZNODE = "zookeeper.znode.replication.peers.tableCFs"; + private static final String TABLE_CFS_ZNODE_DEFAULT = "tableCFs"; private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUpgrader.class); + private final Configuration conf; + private final ZKWatcher zookeeper; + private final ReplicationPeerStorage peerStorage; - public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper, - Configuration conf, Abortable abortable) { - super(zookeeper, conf, abortable); + public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper, Configuration conf) { + this.zookeeper = zookeeper; + this.conf = conf; + this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf); } public void upgrade() throws Exception { try (Connection conn = ConnectionFactory.createConnection(conf)) { Admin admin = conn.getAdmin(); - admin.listReplicationPeers().forEach( - (peerDesc) -> { - String peerId = peerDesc.getPeerId(); - ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig(); - if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) - || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { - peerConfig.setReplicateAllUserTables(false); - try { - admin.updateReplicationPeerConfig(peerId, peerConfig); - } catch (Exception e) { - LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e); - } + admin.listReplicationPeers().forEach((peerDesc) -> { + String peerId = peerDesc.getPeerId(); + ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig(); + if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) + || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { + peerConfig.setReplicateAllUserTables(false); + try { + admin.updateReplicationPeerConfig(peerId, peerConfig); + } catch (Exception e) { + LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e); } - }); + } + }); } } - public void copyTableCFs() { - List<String> znodes = null; - try { - znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); - } catch (KeeperException e) { - LOG.error("Failed to get peers znode", e); - } - if (znodes != null) { - for (String peerId : znodes) { - if (!copyTableCFs(peerId)) { - LOG.error("upgrade tableCFs failed for peerId=" + peerId); - } + public void copyTableCFs() throws ReplicationException { + for (String peerId : peerStorage.listPeerIds()) { + if (!copyTableCFs(peerId)) { + LOG.error("upgrade tableCFs failed for peerId=" + peerId); } } } - public boolean copyTableCFs(String peerId) { + @VisibleForTesting + protected String getTableCFsNode(String peerId) { + String replicationZNode = ZNodePaths.joinZNode(zookeeper.znodePaths.baseZNode, + conf.get(REPLICATION_ZNODE, REPLICATION_ZNODE_DEFAULT)); + String peersZNode = + ZNodePaths.joinZNode(replicationZNode, conf.get(PEERS_ZNODE, PEERS_ZNODE_DEFAULT)); + return ZNodePaths.joinZNode(peersZNode, + ZNodePaths.joinZNode(peerId, conf.get(TABLE_CFS_ZNODE, TABLE_CFS_ZNODE_DEFAULT))); + } + + public boolean copyTableCFs(String peerId) throws ReplicationException { String tableCFsNode = getTableCFsNode(peerId); try { if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) { - String peerNode = getPeerNode(peerId); - ReplicationPeerConfig rpc = getReplicationPeerConig(peerNode); + ReplicationPeerConfig rpc = peerStorage.getPeerConfig(peerId); // We only need to copy data from tableCFs node to rpc Node the first time hmaster start. if (rpc.getTableCFsMap() == null || rpc.getTableCFsMap().isEmpty()) { // we copy TableCFs node into PeerNode LOG.info("copy tableCFs into peerNode:" + peerId); ReplicationProtos.TableCF[] tableCFs = - ReplicationPeerConfigUtil.parseTableCFs( - ZKUtil.getData(this.zookeeper, tableCFsNode)); + ReplicationPeerConfigUtil.parseTableCFs(ZKUtil.getData(this.zookeeper, tableCFsNode)); if (tableCFs != null && tableCFs.length > 0) { rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs)); - ZKUtil.setData(this.zookeeper, peerNode, - ReplicationPeerConfigUtil.toByteArray(rpc)); + peerStorage.updatePeerConfig(peerId, rpc); } } else { LOG.info("No tableCFs in peerNode:" + peerId); @@ -126,23 +137,6 @@ public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase { return true; } - private ReplicationPeerConfig getReplicationPeerConig(String peerNode) - throws KeeperException, InterruptedException { - byte[] data = null; - data = ZKUtil.getData(this.zookeeper, peerNode); - if (data == null) { - LOG.error("Could not get configuration for " + - "peer because it doesn't exist. peer=" + peerNode); - return null; - } - try { - return ReplicationPeerConfigUtil.parsePeerFrom(data); - } catch (DeserializationException e) { - LOG.warn("Failed to parse cluster key from peer=" + peerNode); - return null; - } - } - private static void printUsageAndExit() { System.err.printf( "Usage: hbase org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader" @@ -163,19 +157,17 @@ public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase { printUsageAndExit(); } else if (args[0].equals("copyTableCFs")) { Configuration conf = HBaseConfiguration.create(); - ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null); - try { - ReplicationPeerConfigUpgrader tableCFsUpdater = new ReplicationPeerConfigUpgrader(zkw, - conf, null); + try (ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null)) { + ReplicationPeerConfigUpgrader tableCFsUpdater = + new ReplicationPeerConfigUpgrader(zkw, conf); tableCFsUpdater.copyTableCFs(); - } finally { - zkw.close(); } } else if (args[0].equals("upgrade")) { Configuration conf = HBaseConfiguration.create(); - ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null); - ReplicationPeerConfigUpgrader upgrader = new ReplicationPeerConfigUpgrader(zkw, conf, null); - upgrader.upgrade(); + try (ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null)) { + ReplicationPeerConfigUpgrader upgrader = new ReplicationPeerConfigUpgrader(zkw, conf); + upgrader.upgrade(); + } } else { printUsageAndExit(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/abe180d3/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java index 27bda2d..22e8628 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; -import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; @@ -237,7 +236,7 @@ public class DumpReplicationQueues extends Configured implements Tool { LOG.info("Found [--distributed], will poll each RegionServer."); Set<String> peerIds = peers.stream().map((peer) -> peer.getPeerId()) .collect(Collectors.toSet()); - System.out.println(dumpQueues(connection, zkw, peerIds, opts.isHdfs())); + System.out.println(dumpQueues(zkw, peerIds, opts.isHdfs())); System.out.println(dumpReplicationSummary()); } else { // use ZK instead @@ -301,18 +300,15 @@ public class DumpReplicationQueues extends Configured implements Tool { return sb.toString(); } - public String dumpQueues(ClusterConnection connection, ZKWatcher zkw, Set<String> peerIds, + public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) throws Exception { ReplicationQueueStorage queueStorage; - ReplicationPeers replicationPeers; ReplicationTracker replicationTracker; StringBuilder sb = new StringBuilder(); queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); - replicationPeers = - ReplicationFactory.getReplicationPeers(zkw, getConf()); - replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(), - new WarnOnlyAbortable(), new WarnOnlyStoppable()); + replicationTracker = ReplicationFactory.getReplicationTracker(zkw, new WarnOnlyAbortable(), + new WarnOnlyStoppable()); Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers()); // Loops each peer on each RS and dumps the queues @@ -330,11 +326,9 @@ public class DumpReplicationQueues extends Configured implements Tool { List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId); if (!peerIds.contains(queueInfo.getPeerId())) { deletedQueues.add(regionserver + "/" + queueId); - sb.append( - formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs)); + sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs)); } else { - sb.append( - formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs)); + sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs)); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/abe180d3/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index a8991a0..ce056a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -133,8 +133,7 @@ public class Replication implements ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf); this.replicationPeers.init(); this.replicationTracker = - ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers, - this.conf, this.server, this.server); + ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.server, this.server); } catch (Exception e) { throw new IOException("Failed replication handler create", e); } http://git-wip-us.apache.org/repos/asf/hbase/blob/abe180d3/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index 4c0630d..f6b6501 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -330,8 +330,7 @@ public class HBaseFsck extends Configured implements Closeable { * @throws MasterNotRunningException if the master is not running * @throws ZooKeeperConnectionException if unable to connect to ZooKeeper */ - public HBaseFsck(Configuration conf) throws MasterNotRunningException, - ZooKeeperConnectionException, IOException, ClassNotFoundException { + public HBaseFsck(Configuration conf) throws IOException, ClassNotFoundException { this(conf, createThreadPool(conf)); } http://git-wip-us.apache.org/repos/asf/hbase/blob/abe180d3/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java index 9f0c670..52b914e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java @@ -94,7 +94,8 @@ public class TestReplicationTrackerZKImpl { ZKClusterId.setClusterId(zkw, new ClusterId()); rp = ReplicationFactory.getReplicationPeers(zkw, conf); rp.init(); - rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1)); + rt = ReplicationFactory.getReplicationTracker(zkw, new DummyServer(fakeRs1), + new DummyServer(fakeRs1)); } catch (Exception e) { fail("Exception during test setup: " + e); } http://git-wip-us.apache.org/repos/asf/hbase/blob/abe180d3/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java index b1fe2e9..dcb6bb1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java @@ -29,14 +29,13 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; -import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -59,12 +58,19 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader { private static ZKWatcher zkw = null; private static Abortable abortable = null; + private static ZKStorageUtil zkStorageUtil = null; + + private static class ZKStorageUtil extends ZKReplicationPeerStorage { + public ZKStorageUtil(ZKWatcher zookeeper, Configuration conf) { + super(zookeeper, conf); + } + } @Rule public TestName name = new TestName(); public TestTableCFsUpdater() { - super(zkw, TEST_UTIL.getConfiguration(), abortable); + super(zkw, TEST_UTIL.getConfiguration()); } @BeforeClass @@ -83,6 +89,7 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader { } }; zkw = new ZKWatcher(conf, "TableCFs", abortable, true); + zkStorageUtil = new ZKStorageUtil(zkw, conf); } @AfterClass @@ -91,8 +98,7 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader { } @Test - public void testUpgrade() throws KeeperException, InterruptedException, - DeserializationException { + public void testUpgrade() throws Exception { String peerId = "1"; final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); @@ -100,13 +106,13 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader { ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(zkw.getQuorum()); - String peerNode = getPeerNode(peerId); + String peerNode = zkStorageUtil.getPeerNode(peerId); ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); String tableCFs = tableName1 + ":cf1,cf2;" + tableName2 + ":cf3;" + tableName3; String tableCFsNode = getTableCFsNode(peerId); LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); - ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs)); + ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs)); ReplicationPeerConfig actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); @@ -119,13 +125,13 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader { peerId = "2"; rpc = new ReplicationPeerConfig(); rpc.setClusterKey(zkw.getQuorum()); - peerNode = getPeerNode(peerId); + peerNode = zkStorageUtil.getPeerNode(peerId); ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); tableCFs = tableName1 + ":cf1,cf3;" + tableName2 + ":cf2"; tableCFsNode = getTableCFsNode(peerId); LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); - ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs)); + ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs)); actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); @@ -137,13 +143,13 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader { peerId = "3"; rpc = new ReplicationPeerConfig(); rpc.setClusterKey(zkw.getQuorum()); - peerNode = getPeerNode(peerId); + peerNode = zkStorageUtil.getPeerNode(peerId); ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); tableCFs = ""; tableCFsNode = getTableCFsNode(peerId); LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); - ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs)); + ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs)); actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); @@ -155,7 +161,7 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader { peerId = "4"; rpc = new ReplicationPeerConfig(); rpc.setClusterKey(zkw.getQuorum()); - peerNode = getPeerNode(peerId); + peerNode = zkStorageUtil.getPeerNode(peerId); ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); tableCFsNode = getTableCFsNode(peerId); @@ -169,7 +175,7 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader { copyTableCFs(); peerId = "1"; - peerNode = getPeerNode(peerId); + peerNode = zkStorageUtil.getPeerNode(peerId); actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); Map<TableName, List<String>> tableNameListMap = actualRpc.getTableCFsMap(); @@ -184,9 +190,8 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader { assertEquals("cf3", tableNameListMap.get(tableName2).get(0)); assertNull(tableNameListMap.get(tableName3)); - peerId = "2"; - peerNode = getPeerNode(peerId); + peerNode = zkStorageUtil.getPeerNode(peerId); actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); tableNameListMap = actualRpc.getTableCFsMap(); @@ -200,19 +205,17 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader { assertEquals("cf2", tableNameListMap.get(tableName2).get(0)); peerId = "3"; - peerNode = getPeerNode(peerId); + peerNode = zkStorageUtil.getPeerNode(peerId); actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); tableNameListMap = actualRpc.getTableCFsMap(); assertNull(tableNameListMap); peerId = "4"; - peerNode = getPeerNode(peerId); + peerNode = zkStorageUtil.getPeerNode(peerId); actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); tableNameListMap = actualRpc.getTableCFsMap(); assertNull(tableNameListMap); } - - } http://git-wip-us.apache.org/repos/asf/hbase/blob/abe180d3/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 1401470..ebf0a27 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -69,8 +69,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; -import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -169,9 +169,9 @@ public abstract class TestReplicationSourceManager { + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", - ReplicationStateZKBase.ENABLED_ZNODE_BYTES); + ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); ZKUtil.createWithParents(zkw, "/hbase/replication/state"); - ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES); + ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); ZKClusterId.setClusterId(zkw, new ClusterId()); FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());