HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1104d16a Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1104d16a Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1104d16a Branch: refs/heads/HBASE-19397 Commit: 1104d16a49132a0e1703efb8728fae9874e84879 Parents: d72f6f1 Author: huzheng <open...@gmail.com> Authored: Tue Dec 26 16:46:10 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Thu Jan 4 09:22:35 2018 +0800 ---------------------------------------------------------------------- .../replication/ReplicationPeerConfigUtil.java | 10 +- .../replication/VerifyReplication.java | 9 +- .../hbase/replication/ReplicationFactory.java | 10 +- .../hbase/replication/ReplicationPeerImpl.java | 60 +- .../replication/ReplicationPeerStorage.java | 3 +- .../hbase/replication/ReplicationPeers.java | 235 ++++---- .../replication/ReplicationPeersZKImpl.java | 543 ------------------- .../replication/ZKReplicationPeerStorage.java | 12 +- .../replication/ZKReplicationStorageBase.java | 3 +- .../replication/TestReplicationStateBasic.java | 125 ++--- .../replication/TestReplicationStateZKImpl.java | 2 +- .../TestZKReplicationPeerStorage.java | 12 +- .../cleaner/ReplicationZKNodeCleaner.java | 57 +- .../replication/ReplicationPeerManager.java | 6 +- .../regionserver/DumpReplicationQueues.java | 2 +- .../regionserver/PeerProcedureHandlerImpl.java | 49 +- .../replication/regionserver/Replication.java | 2 +- .../regionserver/ReplicationSource.java | 6 +- .../regionserver/ReplicationSourceManager.java | 45 +- .../cleaner/TestReplicationHFileCleaner.java | 7 +- .../replication/TestMultiSlaveReplication.java | 2 - .../TestReplicationTrackerZKImpl.java | 36 +- .../TestReplicationSourceManager.java | 17 +- .../hadoop/hbase/HBaseZKTestingUtility.java | 3 +- 24 files changed, 304 insertions(+), 952 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/1104d16a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index 022bf64..a234a9b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -247,22 +247,22 @@ public final class ReplicationPeerConfigUtil { public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes) throws DeserializationException { if (ProtobufUtil.isPBMagicPrefix(bytes)) { - int pblen = ProtobufUtil.lengthOfPBMagic(); + int pbLen = ProtobufUtil.lengthOfPBMagic(); ReplicationProtos.ReplicationPeer.Builder builder = ReplicationProtos.ReplicationPeer.newBuilder(); ReplicationProtos.ReplicationPeer peer; try { - ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); + ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen); peer = builder.build(); } catch (IOException e) { throw new DeserializationException(e); } return convert(peer); } else { - if (bytes.length > 0) { - return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build(); + if (bytes == null || bytes.length <= 0) { + throw new DeserializationException("Bytes to deserialize should not be empty."); } - return ReplicationPeerConfig.newBuilder().setClusterKey("").build(); + return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/1104d16a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 64ef279..15ac2ab 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -336,15 +336,10 @@ public class VerifyReplication extends Configured implements Tool { @Override public boolean isAborted() {return false;} }); - ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW); + ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf); rp.init(); - Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId); - if (pair == null) { - throw new IOException("Couldn't get peer conf!"); - } - - return pair; + return Pair.newPair(rp.getPeerConfig(peerId), rp.getPeerClusterConfiguration(peerId)); } catch (ReplicationException e) { throw new IOException( "An error occurred while trying to connect to the remove peer cluster", e); http://git-wip-us.apache.org/repos/asf/hbase/blob/1104d16a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java index 5e70e57..6c66aff 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java @@ -29,14 +29,8 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public class ReplicationFactory { - public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf, - Abortable abortable) { - return getReplicationPeers(zk, conf, null, abortable); - } - - public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf, - ReplicationQueueStorage queueStorage, Abortable abortable) { - return new ReplicationPeersZKImpl(zk, conf, queueStorage, abortable); + public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf) { + return new ReplicationPeers(zk, conf); } public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper, http://git-wip-us.apache.org/repos/asf/hbase/blob/1104d16a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java index ed9fe14..9771f3d 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.replication; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -26,21 +25,10 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; @InterfaceAudience.Private public class ReplicationPeerImpl implements ReplicationPeer { - private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerImpl.class); - - private final ReplicationPeerStorage peerStorage; - private final Configuration conf; private final String id; @@ -58,21 +46,21 @@ public class ReplicationPeerImpl implements ReplicationPeer { * @param id string representation of this peer's identifier * @param peerConfig configuration for the replication peer */ - public ReplicationPeerImpl(ZKWatcher zkWatcher, Configuration conf, String id, + public ReplicationPeerImpl(Configuration conf, String id, boolean peerState, ReplicationPeerConfig peerConfig) { - this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkWatcher, conf); this.conf = conf; - this.peerConfig = peerConfig; this.id = id; + this.peerState = peerState ? PeerState.ENABLED : PeerState.DISABLED; + this.peerConfig = peerConfig; this.peerConfigListeners = new ArrayList<>(); } - public void refreshPeerState() throws ReplicationException { - this.peerState = peerStorage.isPeerEnabled(id) ? PeerState.ENABLED : PeerState.DISABLED; + void setPeerState(boolean enabled) { + this.peerState = enabled ? PeerState.ENABLED : PeerState.DISABLED; } - public void refreshPeerConfig() throws ReplicationException { - this.peerConfig = peerStorage.getPeerConfig(id).orElse(peerConfig); + void setPeerConfig(ReplicationPeerConfig peerConfig) { + this.peerConfig = peerConfig; peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig)); } @@ -135,36 +123,4 @@ public class ReplicationPeerImpl implements ReplicationPeer { public void registerPeerConfigListener(ReplicationPeerConfigListener listener) { this.peerConfigListeners.add(listener); } - - /** - * Parse the raw data from ZK to get a peer's state - * @param bytes raw ZK data - * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state. - * @throws DeserializationException - */ - public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException { - ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes); - return ReplicationProtos.ReplicationState.State.ENABLED == state; - } - - /** - * @param bytes Content of a state znode. - * @return State parsed from the passed bytes. - * @throws DeserializationException - */ - private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes) - throws DeserializationException { - ProtobufUtil.expectPBMagicPrefix(bytes); - int pbLen = ProtobufUtil.lengthOfPBMagic(); - ReplicationProtos.ReplicationState.Builder builder = - ReplicationProtos.ReplicationState.newBuilder(); - ReplicationProtos.ReplicationState state; - try { - ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen); - state = builder.build(); - return state.getState(); - } catch (IOException e) { - throw new DeserializationException(e); - } - } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/1104d16a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java index e00cd0d..1adda02 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.replication; import java.util.List; -import java.util.Optional; import org.apache.yetus.audience.InterfaceAudience; @@ -70,5 +69,5 @@ public interface ReplicationPeerStorage { * Get the peer config of a replication peer. * @throws ReplicationException if there are errors accessing the storage service. */ - Optional<ReplicationPeerConfig> getPeerConfig(String peerId) throws ReplicationException; + ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException; } http://git-wip-us.apache.org/repos/asf/hbase/blob/1104d16a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index afc19bd..8393469 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,58 +17,54 @@ */ package org.apache.hadoop.hbase.replication; -import java.util.Collection; -import java.util.List; -import java.util.Map; +import java.io.IOException; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.CompoundConfiguration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; /** - * This provides an interface for maintaining a set of peer clusters. These peers are remote slave - * clusters that data is replicated to. A peer cluster can be in three different states: - * - * 1. Not-Registered - There is no notion of the peer cluster. - * 2. Registered - The peer has an id and is being tracked but there is no connection. - * 3. Connected - There is an active connection to the remote peer. - * - * In the registered or connected state, a peer cluster can either be enabled or disabled. + * This provides an class for maintaining a set of peer clusters. These peers are remote slave + * clusters that data is replicated to. */ @InterfaceAudience.Private -public interface ReplicationPeers { +public class ReplicationPeers { - /** - * Initialize the ReplicationPeers interface. - */ - void init() throws ReplicationException; + private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class); - /** - * Add a new remote slave cluster for replication. - * @param peerId a short that identifies the cluster - * @param peerConfig configuration for the replication slave cluster - */ - default void registerPeer(String peerId, ReplicationPeerConfig peerConfig) - throws ReplicationException { - registerPeer(peerId, peerConfig, true); + private final Configuration conf; + + // Map of peer clusters keyed by their id + private final ConcurrentMap<String, ReplicationPeerImpl> peerCache; + private final ReplicationPeerStorage peerStorage; + + protected ReplicationPeers(ZKWatcher zookeeper, Configuration conf) { + this.conf = conf; + this.peerCache = new ConcurrentHashMap<>(); + this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf); } - /** - * Add a new remote slave cluster for replication. - * @param peerId a short that identifies the cluster - * @param peerConfig configuration for the replication slave cluster - * @param enabled peer state, true if ENABLED and false if DISABLED - */ - void registerPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) - throws ReplicationException; + public void init() throws ReplicationException { + // Loading all existing peerIds into peer cache. + for (String peerId : this.peerStorage.listPeerIds()) { + addPeer(peerId); + } + } - /** - * Removes a remote slave cluster and stops the replication to it. - * @param peerId a short that identifies the cluster - */ - void unregisterPeer(String peerId) throws ReplicationException; + @VisibleForTesting + public ReplicationPeerStorage getPeerStorage() { + return this.peerStorage; + } /** * Method called after a peer has been connected. It will create a ReplicationPeer to track the @@ -78,111 +73,109 @@ public interface ReplicationPeers { * @return whether a ReplicationPeer was successfully created * @throws ReplicationException */ - boolean peerConnected(String peerId) throws ReplicationException; - - /** - * Method called after a peer has been disconnected. It will remove the ReplicationPeer that - * tracked the disconnected cluster. - * @param peerId a short that identifies the cluster - */ - void peerDisconnected(String peerId); + public boolean addPeer(String peerId) throws ReplicationException { + if (this.peerCache.containsKey(peerId)) { + return false; + } - /** - * Restart the replication to the specified remote slave cluster. - * @param peerId a short that identifies the cluster - */ - void enablePeer(String peerId) throws ReplicationException; - - /** - * Stop the replication to the specified remote slave cluster. - * @param peerId a short that identifies the cluster - */ - void disablePeer(String peerId) throws ReplicationException; + peerCache.put(peerId, createPeer(peerId)); + return true; + } - /** - * Get the table and column-family list string of the peer from the underlying storage. - * @param peerId a short that identifies the cluster - */ - public Map<TableName, List<String>> getPeerTableCFsConfig(String peerId) - throws ReplicationException; + public void removePeer(String peerId) { + peerCache.remove(peerId); + } /** - * Set the table and column-family list string of the peer to the underlying storage. + * Get the peer state for the specified connected remote slave cluster. The value might be read + * from cache, so it is recommended to use {@link #peerStorage } to read storage directly if + * reading the state after enabling or disabling it. * @param peerId a short that identifies the cluster - * @param tableCFs the table and column-family list which will be replicated for this peer + * @return true if replication is enabled, false otherwise. */ - public void setPeerTableCFsConfig(String peerId, - Map<TableName, ? extends Collection<String>> tableCFs) - throws ReplicationException; + public boolean isPeerEnabled(String peerId) { + ReplicationPeer replicationPeer = this.peerCache.get(peerId); + if (replicationPeer == null) { + throw new IllegalArgumentException("Peer with id= " + peerId + " is not cached"); + } + return replicationPeer.getPeerState() == PeerState.ENABLED; + } /** - * Returns the ReplicationPeerImpl for the specified connected peer. This ReplicationPeer will - * continue to track changes to the Peer's state and config. This method returns null if no - * peer has been connected with the given peerId. + * Returns the ReplicationPeerImpl for the specified cached peer. This ReplicationPeer will + * continue to track changes to the Peer's state and config. This method returns null if no peer + * has been cached with the given peerId. * @param peerId id for the peer * @return ReplicationPeer object */ - ReplicationPeerImpl getConnectedPeer(String peerId); + public ReplicationPeerImpl getPeer(String peerId) { + return peerCache.get(peerId); + } /** * Returns the set of peerIds of the clusters that have been connected and have an underlying * ReplicationPeer. * @return a Set of Strings for peerIds */ - public Set<String> getConnectedPeerIds(); + public Set<String> getAllPeerIds() { + return peerCache.keySet(); + } - /** - * Get the replication status for the specified connected remote slave cluster. - * The value might be read from cache, so it is recommended to - * use {@link #getStatusOfPeerFromBackingStore(String)} - * if reading the state after enabling or disabling it. - * @param peerId a short that identifies the cluster - * @return true if replication is enabled, false otherwise. - */ - boolean getStatusOfPeer(String peerId); + public ReplicationPeerConfig getPeerConfig(String peerId) { + ReplicationPeer replicationPeer = this.peerCache.get(peerId); + if (replicationPeer == null) { + throw new IllegalArgumentException("Peer with id= " + peerId + " is not cached"); + } + return replicationPeer.getPeerConfig(); + } - /** - * Get the replication status for the specified remote slave cluster, which doesn't - * have to be connected. The state is read directly from the backing store. - * @param peerId a short that identifies the cluster - * @return true if replication is enabled, false otherwise. - * @throws ReplicationException thrown if there's an error contacting the store - */ - boolean getStatusOfPeerFromBackingStore(String peerId) throws ReplicationException; + public Configuration getPeerClusterConfiguration(String peerId) throws ReplicationException { + ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); - /** - * List the cluster replication configs of all remote slave clusters (whether they are - * enabled/disabled or connected/disconnected). - * @return A map of peer ids to peer cluster keys - */ - Map<String, ReplicationPeerConfig> getAllPeerConfigs(); + Configuration otherConf; + try { + otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey()); + } catch (IOException e) { + throw new ReplicationException("Can't get peer configuration for peerId=" + peerId, e); + } - /** - * List the peer ids of all remote slave clusters (whether they are enabled/disabled or - * connected/disconnected). - * @return A list of peer ids - */ - List<String> getAllPeerIds(); + if (!peerConfig.getConfiguration().isEmpty()) { + CompoundConfiguration compound = new CompoundConfiguration(); + compound.add(otherConf); + compound.addStringMap(peerConfig.getConfiguration()); + return compound; + } - /** - * Returns the configured ReplicationPeerConfig for this peerId - * @param peerId a short name that identifies the cluster - * @return ReplicationPeerConfig for the peer - */ - ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException; + return otherConf; + } - /** - * Returns the configuration needed to talk to the remote slave cluster. - * @param peerId a short that identifies the cluster - * @return the configuration for the peer cluster, null if it was unable to get the configuration - */ - Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws ReplicationException; + public PeerState refreshPeerState(String peerId) throws ReplicationException { + ReplicationPeerImpl peer = peerCache.get(peerId); + if (peer == null) { + throw new ReplicationException("Peer with id=" + peerId + " is not cached."); + } + peer.setPeerState(peerStorage.isPeerEnabled(peerId)); + return peer.getPeerState(); + } + + public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException { + ReplicationPeerImpl peer = peerCache.get(peerId); + if (peer == null) { + throw new ReplicationException("Peer with id=" + peerId + " is not cached."); + } + peer.setPeerConfig(peerStorage.getPeerConfig(peerId)); + return peer.getPeerConfig(); + } /** - * Update the peerConfig for the a given peer cluster - * @param id a short that identifies the cluster - * @param peerConfig new config for the peer cluster + * Helper method to connect to a peer + * @param peerId peer's identifier + * @return object representing the peer * @throws ReplicationException */ - void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException; + private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException { + ReplicationPeerConfig peerConf = peerStorage.getPeerConfig(peerId); + boolean enabled = peerStorage.isPeerEnabled(peerId); + return new ReplicationPeerImpl(getPeerClusterConfiguration(peerId), peerId, enabled, peerConf); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/1104d16a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java deleted file mode 100644 index 268ba87..0000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ /dev/null @@ -1,543 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.CompoundConfiguration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; -import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.zookeeper.ZKConfig; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; - -/** - * This class provides an implementation of the ReplicationPeers interface using ZooKeeper. The - * peers znode contains a list of all peer replication clusters and the current replication state of - * those clusters. It has one child peer znode for each peer cluster. The peer znode is named with - * the cluster id provided by the user in the HBase shell. The value of the peer znode contains the - * peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of - * zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase. - * For example: - * - * /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase] - * /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase] - * - * Each of these peer znodes has a child znode that indicates whether or not replication is enabled - * on that peer cluster. These peer-state znodes do not have child znodes and simply contain a - * boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the - * ReplicationPeer.PeerStateTracker class. For example: - * - * /hbase/replication/peers/1/peer-state [Value: ENABLED] - * - * Each of these peer znodes has a child znode that indicates which data will be replicated - * to the peer cluster. These peer-tableCFs znodes do not have child znodes and only have a - * table/cf list config. This value is read/maintained by the ReplicationPeer.TableCFsTracker - * class. For example: - * - * /hbase/replication/peers/1/tableCFs [Value: "table1; table2:cf1,cf3; table3:cfx,cfy"] - */ -@InterfaceAudience.Private -public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers { - - // Map of peer clusters keyed by their id - private ConcurrentMap<String, ReplicationPeerImpl> peerClusters; - private final ReplicationQueueStorage queueStorage; - private Abortable abortable; - - private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeersZKImpl.class); - - public ReplicationPeersZKImpl(ZKWatcher zk, Configuration conf, - ReplicationQueueStorage queueStorage, Abortable abortable) { - super(zk, conf, abortable); - this.abortable = abortable; - this.peerClusters = new ConcurrentHashMap<>(); - this.queueStorage = queueStorage; - } - - @Override - public void init() throws ReplicationException { - try { - if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) { - ZKUtil.createWithParents(this.zookeeper, this.peersZNode); - } - } catch (KeeperException e) { - throw new ReplicationException("Could not initialize replication peers", e); - } - addExistingPeers(); - } - - @Override - public void registerPeer(String id, ReplicationPeerConfig peerConfig, boolean enabled) - throws ReplicationException { - try { - if (peerExists(id)) { - throw new IllegalArgumentException("Cannot add a peer with id=" + id - + " because that id already exists."); - } - - if(id.contains("-")){ - throw new IllegalArgumentException("Found invalid peer name:" + id); - } - - if (peerConfig.getClusterKey() != null) { - try { - ZKConfig.validateClusterKey(peerConfig.getClusterKey()); - } catch (IOException ioe) { - throw new IllegalArgumentException(ioe.getMessage()); - } - } - - checkQueuesDeleted(id); - - ZKUtil.createWithParents(this.zookeeper, this.peersZNode); - - List<ZKUtilOp> listOfOps = new ArrayList<>(2); - ZKUtilOp op1 = - ZKUtilOp.createAndFailSilent(getPeerNode(id), - ReplicationPeerConfigUtil.toByteArray(peerConfig)); - ZKUtilOp op2 = - ZKUtilOp.createAndFailSilent(getPeerStateNode(id), enabled ? ENABLED_ZNODE_BYTES - : DISABLED_ZNODE_BYTES); - listOfOps.add(op1); - listOfOps.add(op2); - ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false); - } catch (KeeperException e) { - throw new ReplicationException("Could not add peer with id=" + id + ", peerConfif=>" - + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e); - } - } - - @Override - public void unregisterPeer(String id) throws ReplicationException { - try { - if (!peerExists(id)) { - throw new IllegalArgumentException("Cannot remove peer with id=" + id - + " because that id does not exist."); - } - ZKUtil.deleteNodeRecursively(this.zookeeper, ZNodePaths.joinZNode(this.peersZNode, id)); - } catch (KeeperException e) { - throw new ReplicationException("Could not remove peer with id=" + id, e); - } - } - - @Override - public void enablePeer(String id) throws ReplicationException { - changePeerState(id, ReplicationProtos.ReplicationState.State.ENABLED); - LOG.info("peer " + id + " is enabled"); - } - - @Override - public void disablePeer(String id) throws ReplicationException { - changePeerState(id, ReplicationProtos.ReplicationState.State.DISABLED); - LOG.info("peer " + id + " is disabled"); - } - - @Override - public Map<TableName, List<String>> getPeerTableCFsConfig(String id) throws ReplicationException { - try { - if (!peerExists(id)) { - throw new IllegalArgumentException("peer " + id + " doesn't exist"); - } - try { - ReplicationPeerConfig rpc = getReplicationPeerConfig(id); - if (rpc == null) { - throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id); - } - return rpc.getTableCFsMap(); - } catch (Exception e) { - throw new ReplicationException(e); - } - } catch (KeeperException e) { - throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e); - } - } - - @Override - public void setPeerTableCFsConfig(String id, - Map<TableName, ? extends Collection<String>> tableCFs) - throws ReplicationException { - try { - if (!peerExists(id)) { - throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id - + " does not exist."); - } - ReplicationPeerConfig rpc = getReplicationPeerConfig(id); - if (rpc == null) { - throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id); - } - rpc.setTableCFsMap(tableCFs); - ZKUtil.setData(this.zookeeper, getPeerNode(id), - ReplicationPeerConfigUtil.toByteArray(rpc)); - LOG.info("Peer tableCFs with id= " + id + " is now " + - ReplicationPeerConfigUtil.convertToString(tableCFs)); - } catch (KeeperException e) { - throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e); - } - } - - @Override - public boolean getStatusOfPeer(String id) { - ReplicationPeer replicationPeer = this.peerClusters.get(id); - if (replicationPeer == null) { - throw new IllegalArgumentException("Peer with id= " + id + " is not cached"); - } - return replicationPeer.getPeerState() == PeerState.ENABLED; - } - - @Override - public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException { - try { - if (!peerExists(id)) { - throw new IllegalArgumentException("peer " + id + " doesn't exist"); - } - String peerStateZNode = getPeerStateNode(id); - try { - return ReplicationPeerImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode)); - } catch (KeeperException e) { - throw new ReplicationException(e); - } catch (DeserializationException e) { - throw new ReplicationException(e); - } - } catch (KeeperException e) { - throw new ReplicationException("Unable to get status of the peer with id=" + id + - " from backing store", e); - } catch (InterruptedException e) { - throw new ReplicationException(e); - } - } - - @Override - public Map<String, ReplicationPeerConfig> getAllPeerConfigs() { - Map<String, ReplicationPeerConfig> peers = new TreeMap<>(); - List<String> ids = null; - try { - ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); - for (String id : ids) { - ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); - if (peerConfig == null) { - LOG.warn("Failed to get replication peer configuration of clusterid=" + id - + " znode content, continuing."); - continue; - } - peers.put(id, peerConfig); - } - } catch (KeeperException e) { - this.abortable.abort("Cannot get the list of peers ", e); - } catch (ReplicationException e) { - this.abortable.abort("Cannot get the list of peers ", e); - } - return peers; - } - - @Override - public ReplicationPeerImpl getConnectedPeer(String peerId) { - return peerClusters.get(peerId); - } - - @Override - public Set<String> getConnectedPeerIds() { - return peerClusters.keySet(); // this is not thread-safe - } - - /** - * Returns a ReplicationPeerConfig from the znode or null for the given peerId. - */ - @Override - public ReplicationPeerConfig getReplicationPeerConfig(String peerId) - throws ReplicationException { - String znode = getPeerNode(peerId); - byte[] data = null; - try { - data = ZKUtil.getData(this.zookeeper, znode); - } catch (InterruptedException e) { - LOG.warn("Could not get configuration for peer because the thread " + - "was interrupted. peerId=" + peerId); - Thread.currentThread().interrupt(); - return null; - } catch (KeeperException e) { - throw new ReplicationException("Error getting configuration for peer with id=" - + peerId, e); - } - if (data == null) { - LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId); - return null; - } - - try { - return ReplicationPeerConfigUtil.parsePeerFrom(data); - } catch (DeserializationException e) { - LOG.warn("Failed to parse cluster key from peerId=" + peerId - + ", specifically the content from the following znode: " + znode); - return null; - } - } - - @Override - public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) - throws ReplicationException { - ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId); - - if (peerConfig == null) { - return null; - } - - Configuration otherConf; - try { - otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey()); - } catch (IOException e) { - LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e); - return null; - } - - if (!peerConfig.getConfiguration().isEmpty()) { - CompoundConfiguration compound = new CompoundConfiguration(); - compound.add(otherConf); - compound.addStringMap(peerConfig.getConfiguration()); - return new Pair<>(peerConfig, compound); - } - - return new Pair<>(peerConfig, otherConf); - } - - @Override - public void updatePeerConfig(String id, ReplicationPeerConfig newConfig) - throws ReplicationException { - ReplicationPeer peer = getConnectedPeer(id); - if (peer == null){ - throw new ReplicationException("Could not find peer Id " + id + " in connected peers"); - } - ReplicationPeerConfig existingConfig = peer.getPeerConfig(); - if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty() && - !newConfig.getClusterKey().equals(existingConfig.getClusterKey())){ - throw new ReplicationException("Changing the cluster key on an existing peer is not allowed." - + " Existing key '" + existingConfig.getClusterKey() + "' does not match new key '" - + newConfig.getClusterKey() + - "'"); - } - String existingEndpointImpl = existingConfig.getReplicationEndpointImpl(); - if (newConfig.getReplicationEndpointImpl() != null && - !newConfig.getReplicationEndpointImpl().isEmpty() && - !newConfig.getReplicationEndpointImpl().equals(existingEndpointImpl)){ - throw new ReplicationException("Changing the replication endpoint implementation class " + - "on an existing peer is not allowed. Existing class '" - + existingConfig.getReplicationEndpointImpl() - + "' does not match new class '" + newConfig.getReplicationEndpointImpl() + "'"); - } - // Update existingConfig's peer config and peer data with the new values, but don't touch config - // or data that weren't explicitly changed - ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(existingConfig); - builder.putAllConfiguration(newConfig.getConfiguration()) - .putAllPeerData(newConfig.getPeerData()) - .setReplicateAllUserTables(newConfig.replicateAllUserTables()) - .setNamespaces(newConfig.getNamespaces()).setTableCFsMap(newConfig.getTableCFsMap()) - .setExcludeNamespaces(newConfig.getExcludeNamespaces()) - .setExcludeTableCFsMap(newConfig.getExcludeTableCFsMap()) - .setBandwidth(newConfig.getBandwidth()); - - try { - ZKUtil.setData(this.zookeeper, getPeerNode(id), - ReplicationPeerConfigUtil.toByteArray(builder.build())); - } - catch(KeeperException ke){ - throw new ReplicationException("There was a problem trying to save changes to the " + - "replication peer " + id, ke); - } - } - - /** - * List all registered peer clusters and set a watch on their znodes. - */ - @Override - public List<String> getAllPeerIds() { - List<String> ids = null; - try { - ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode); - } catch (KeeperException e) { - this.abortable.abort("Cannot get the list of peers ", e); - } - return ids; - } - - /** - * A private method used during initialization. This method attempts to add all registered - * peer clusters. This method does not set a watch on the peer cluster znodes. - */ - private void addExistingPeers() throws ReplicationException { - List<String> znodes = null; - try { - znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode); - } catch (KeeperException e) { - throw new ReplicationException("Error getting the list of peer clusters.", e); - } - if (znodes != null) { - for (String z : znodes) { - createAndAddPeer(z); - } - } - } - - @Override - public boolean peerConnected(String peerId) throws ReplicationException { - return createAndAddPeer(peerId); - } - - @Override - public void peerDisconnected(String peerId) { - ReplicationPeer rp = this.peerClusters.get(peerId); - if (rp != null) { - peerClusters.remove(peerId, rp); - } - } - - /** - * Attempt to connect to a new remote slave cluster. - * @param peerId a short that identifies the cluster - * @return true if a new connection was made, false if no new connection was made. - */ - public boolean createAndAddPeer(String peerId) throws ReplicationException { - if (peerClusters == null) { - return false; - } - if (this.peerClusters.containsKey(peerId)) { - return false; - } - - ReplicationPeerImpl peer = null; - try { - peer = createPeer(peerId); - } catch (Exception e) { - throw new ReplicationException("Error adding peer with id=" + peerId, e); - } - if (peer == null) { - return false; - } - ReplicationPeerImpl previous = peerClusters.putIfAbsent(peerId, peer); - if (previous == null) { - LOG.info("Added new peer cluster=" + peer.getPeerConfig().getClusterKey()); - } else { - LOG.info("Peer already present, " + previous.getPeerConfig().getClusterKey() + - ", new cluster=" + peer.getPeerConfig().getClusterKey()); - } - return true; - } - - /** - * Update the state znode of a peer cluster. - * @param id - * @param state - */ - private void changePeerState(String id, ReplicationProtos.ReplicationState.State state) - throws ReplicationException { - try { - if (!peerExists(id)) { - throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id - + " does not exist."); - } - String peerStateZNode = getPeerStateNode(id); - byte[] stateBytes = - (state == ReplicationProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES - : DISABLED_ZNODE_BYTES; - if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) { - ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes); - } else { - ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes); - } - LOG.info("Peer with id= " + id + " is now " + state.name()); - } catch (KeeperException e) { - throw new ReplicationException("Unable to change state of the peer with id=" + id, e); - } - } - - /** - * Helper method to connect to a peer - * @param peerId peer's identifier - * @return object representing the peer - * @throws ReplicationException - */ - private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException { - Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId); - if (pair == null) { - return null; - } - Configuration peerConf = pair.getSecond(); - - ReplicationPeerImpl peer = - new ReplicationPeerImpl(zookeeper, peerConf, peerId, pair.getFirst()); - - // Load peer state and peer config by reading zookeeper directly. - peer.refreshPeerState(); - peer.refreshPeerConfig(); - - return peer; - } - - private void checkQueuesDeleted(String peerId) throws ReplicationException { - if (queueStorage == null) { - return; - } - try { - List<ServerName> replicators = queueStorage.getListOfReplicators(); - if (replicators == null || replicators.isEmpty()) { - return; - } - for (ServerName replicator : replicators) { - List<String> queueIds = queueStorage.getAllQueues(replicator); - for (String queueId : queueIds) { - ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); - if (queueInfo.getPeerId().equals(peerId)) { - throw new IllegalArgumentException("undeleted queue for peerId: " + peerId - + ", replicator: " + replicator + ", queueId: " + queueId); - } - } - } - // Check for hfile-refs queue - if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode) - && queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) { - throw new IllegalArgumentException("Undeleted queue for peerId: " + peerId - + ", found in hfile-refs node path " + hfileRefsZNode); - } - } catch (KeeperException e) { - throw new ReplicationException("Could not check queues deleted with id=" + peerId, e); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/1104d16a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java index 49af4c3..bf448e8 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication; import java.util.Arrays; import java.util.List; -import java.util.Optional; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; @@ -144,7 +143,7 @@ class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements Repli } @Override - public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) throws ReplicationException { + public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException { byte[] data; try { data = ZKUtil.getData(zookeeper, getPeerNode(peerId)); @@ -152,13 +151,14 @@ class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements Repli throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e); } if (data == null || data.length == 0) { - return Optional.empty(); + throw new ReplicationException( + "Replication peer config data shouldn't be empty, peerId=" + peerId); } try { - return Optional.of(ReplicationPeerConfigUtil.parsePeerFrom(data)); + return ReplicationPeerConfigUtil.parsePeerFrom(data); } catch (DeserializationException e) { - LOG.warn("Failed to parse replication peer config for peer with id=" + peerId, e); - return Optional.empty(); + throw new ReplicationException( + "Failed to parse replication peer config for peer with id=" + peerId, e); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/1104d16a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java index b8a2044..d09a56b 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java @@ -48,8 +48,7 @@ class ZKReplicationStorageBase { String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); this.replicationZNode = - ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, replicationZNodeName); - + ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, replicationZNodeName); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/1104d16a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index 4afda5d..2589199 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -55,7 +55,6 @@ public abstract class TestReplicationStateBasic { protected static String KEY_TWO; // For testing when we try to replicate to ourself - protected String OUR_ID = "3"; protected String OUR_KEY; protected static int zkTimeoutCount; @@ -152,37 +151,6 @@ public abstract class TestReplicationStateBasic { } @Test - public void testInvalidClusterKeys() throws ReplicationException, KeeperException { - rp.init(); - - try { - rp.registerPeer(ID_ONE, - new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase")); - fail("Should throw an IllegalArgumentException because " + - "zookeeper.znode.parent is missing leading '/'."); - } catch (IllegalArgumentException e) { - // Expected. - } - - try { - rp.registerPeer(ID_ONE, - new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/")); - fail("Should throw an IllegalArgumentException because zookeeper.znode.parent is missing."); - } catch (IllegalArgumentException e) { - // Expected. - } - - try { - rp.registerPeer(ID_ONE, - new ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase")); - fail("Should throw an IllegalArgumentException because " + - "hbase.zookeeper.property.clientPort is missing."); - } catch (IllegalArgumentException e) { - // Expected. - } - } - - @Test public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException { rp.init(); @@ -192,7 +160,8 @@ public abstract class TestReplicationStateBasic { files1.add(new Pair<>(null, new Path("file_3"))); assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); - rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); + rp.getPeerStorage().addPeer(ID_ONE, + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true); rqs.addPeerToHFileRefs(ID_ONE); rqs.addHFileRefs(ID_ONE, files1); assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); @@ -208,15 +177,17 @@ public abstract class TestReplicationStateBasic { hfiles2.add(removedString); rqs.removeHFileRefs(ID_ONE, hfiles2); assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size()); - rp.unregisterPeer(ID_ONE); + rp.getPeerStorage().removePeer(ID_ONE); } @Test public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException { rp.init(); - rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); + rp.getPeerStorage().addPeer(ID_ONE, + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true); rqs.addPeerToHFileRefs(ID_ONE); - rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); + rp.getPeerStorage().addPeer(ID_TWO, + ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true); rqs.addPeerToHFileRefs(ID_TWO); List<Pair<Path, Path>> files1 = new ArrayList<>(3); @@ -229,13 +200,13 @@ public abstract class TestReplicationStateBasic { assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); - rp.unregisterPeer(ID_ONE); + rp.getPeerStorage().removePeer(ID_ONE); rqs.removePeerFromHFileRefs(ID_ONE); assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); - rp.unregisterPeer(ID_TWO); + rp.getPeerStorage().removePeer(ID_TWO); rqs.removePeerFromHFileRefs(ID_TWO); assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty()); @@ -245,74 +216,77 @@ public abstract class TestReplicationStateBasic { public void testReplicationPeers() throws Exception { rp.init(); - // Test methods with non-existent peer ids try { - rp.unregisterPeer("bogus"); + rp.getPeerStorage().setPeerState("bogus", true); fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); - } catch (IllegalArgumentException e) { + } catch (ReplicationException e) { } try { - rp.enablePeer("bogus"); + rp.getPeerStorage().setPeerState("bogus", false); fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); - } catch (IllegalArgumentException e) { + } catch (ReplicationException e) { } try { - rp.disablePeer("bogus"); + rp.isPeerEnabled("bogus"); fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); } catch (IllegalArgumentException e) { } + try { - rp.getStatusOfPeer("bogus"); - fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); - } catch (IllegalArgumentException e) { + assertFalse(rp.addPeer("bogus")); + fail("Should have thrown an ReplicationException when passed a bogus peerId"); + } catch (ReplicationException e) { + } + + try { + assertNull(rp.getPeerClusterConfiguration("bogus")); + fail("Should have thrown an ReplicationException when passed a bogus peerId"); + } catch (ReplicationException e) { } - assertFalse(rp.peerConnected("bogus")); - rp.peerDisconnected("bogus"); - assertNull(rp.getPeerConf("bogus")); assertNumberOfPeers(0); // Add some peers - rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); + rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true); assertNumberOfPeers(1); - rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO)); + rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true); assertNumberOfPeers(2); // Test methods with a peer that is added but not connected try { - rp.getStatusOfPeer(ID_ONE); + rp.isPeerEnabled(ID_ONE); fail("There are no connected peers, should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException e) { } - assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond())); - rp.unregisterPeer(ID_ONE); - rp.peerDisconnected(ID_ONE); + assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerClusterConfiguration(ID_ONE))); + rp.getPeerStorage().removePeer(ID_ONE); + rp.removePeer(ID_ONE); assertNumberOfPeers(1); // Add one peer - rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE)); - rp.peerConnected(ID_ONE); + rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true); + rp.addPeer(ID_ONE); assertNumberOfPeers(2); - assertTrue(rp.getStatusOfPeer(ID_ONE)); - rp.disablePeer(ID_ONE); + assertTrue(rp.isPeerEnabled(ID_ONE)); + rp.getPeerStorage().setPeerState(ID_ONE, false); // now we do not rely on zk watcher to trigger the state change so we need to trigger it // manually... - ReplicationPeerImpl peer = rp.getConnectedPeer(ID_ONE); - peer.refreshPeerState(); + ReplicationPeerImpl peer = rp.getPeer(ID_ONE); + rp.refreshPeerState(peer.getId()); assertEquals(PeerState.DISABLED, peer.getPeerState()); assertConnectedPeerStatus(false, ID_ONE); - rp.enablePeer(ID_ONE); + rp.getPeerStorage().setPeerState(ID_ONE, true); // now we do not rely on zk watcher to trigger the state change so we need to trigger it // manually... - peer.refreshPeerState(); + rp.refreshPeerState(peer.getId()); assertEquals(PeerState.ENABLED, peer.getPeerState()); assertConnectedPeerStatus(true, ID_ONE); // Disconnect peer - rp.peerDisconnected(ID_ONE); + rp.removePeer(ID_ONE); assertNumberOfPeers(2); try { - rp.getStatusOfPeer(ID_ONE); + rp.isPeerEnabled(ID_ONE); fail("There are no connected peers, should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException e) { } @@ -320,16 +294,16 @@ public abstract class TestReplicationStateBasic { protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { // we can first check if the value was changed in the store, if it wasn't then fail right away - if (status != rp.getStatusOfPeerFromBackingStore(peerId)) { + if (status != rp.getPeerStorage().isPeerEnabled(peerId)) { fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK"); } while (true) { - if (status == rp.getStatusOfPeer(peerId)) { + if (status == rp.isPeerEnabled(peerId)) { return; } if (zkTimeoutCount < ZK_MAX_COUNT) { - LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status + - ", sleeping and trying again."); + LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status + + ", sleeping and trying again."); Thread.sleep(ZK_SLEEP_INTERVAL); } else { fail("Timed out waiting for ConnectedPeerStatus to be " + status); @@ -337,10 +311,8 @@ public abstract class TestReplicationStateBasic { } } - protected void assertNumberOfPeers(int total) { - assertEquals(total, rp.getAllPeerConfigs().size()); - assertEquals(total, rp.getAllPeerIds().size()); - assertEquals(total, rp.getAllPeerIds().size()); + protected void assertNumberOfPeers(int total) throws ReplicationException { + assertEquals(total, rp.getPeerStorage().listPeerIds().size()); } /* @@ -359,8 +331,9 @@ public abstract class TestReplicationStateBasic { rqs.addWAL(server3, "qId" + i, "filename" + j); } // Add peers for the corresponding queues so they are not orphans - rp.registerPeer("qId" + i, - new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i)); + rp.getPeerStorage().addPeer("qId" + i, + ReplicationPeerConfig.newBuilder().setClusterKey("localhost:2818:/bogus" + i).build(), + true); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/1104d16a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java index ac869d9..6825c36 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -79,7 +79,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { public void setUp() { zkTimeoutCount = 0; rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); - rp = ReplicationFactory.getReplicationPeers(zkw, conf, new WarnOnlyAbortable()); + rp = ReplicationFactory.getReplicationPeers(zkw, conf); OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); } http://git-wip-us.apache.org/repos/asf/hbase/blob/1104d16a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java index a3be1e6..e8098c8 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.HashMap; @@ -143,14 +144,14 @@ public class TestZKReplicationPeerStorage { assertEquals(peerCount, peerIds.size()); for (String peerId : peerIds) { int seed = Integer.parseInt(peerId); - assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId).get()); + assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId)); } for (int i = 0; i < peerCount; i++) { STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1)); } for (String peerId : peerIds) { int seed = Integer.parseInt(peerId); - assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId).get()); + assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId)); } for (int i = 0; i < peerCount; i++) { assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i))); @@ -166,6 +167,11 @@ public class TestZKReplicationPeerStorage { peerIds = STORAGE.listPeerIds(); assertEquals(peerCount - 1, peerIds.size()); assertFalse(peerIds.contains(toRemove)); - assertFalse(STORAGE.getPeerConfig(toRemove).isPresent()); + + try { + STORAGE.getPeerConfig(toRemove); + fail("Should throw a ReplicationException when get peer config of a peerId"); + } catch (ReplicationException e) { + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/1104d16a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java index af41399..f2c3ec9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java @@ -30,8 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationFactory; -import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; @@ -51,20 +50,14 @@ import org.slf4j.LoggerFactory; public class ReplicationZKNodeCleaner { private static final Logger LOG = LoggerFactory.getLogger(ReplicationZKNodeCleaner.class); private final ReplicationQueueStorage queueStorage; - private final ReplicationPeers replicationPeers; + private final ReplicationPeerStorage peerStorage; private final ReplicationQueueDeletor queueDeletor; public ReplicationZKNodeCleaner(Configuration conf, ZKWatcher zkw, Abortable abortable) throws IOException { - try { - this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); - this.replicationPeers = - ReplicationFactory.getReplicationPeers(zkw, conf, this.queueStorage, abortable); - this.replicationPeers.init(); - this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable); - } catch (ReplicationException e) { - throw new IOException("failed to construct ReplicationZKNodeCleaner", e); - } + this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); + this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkw, conf); + this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable); } /** @@ -73,8 +66,8 @@ public class ReplicationZKNodeCleaner { */ public Map<ServerName, List<String>> getUnDeletedQueues() throws IOException { Map<ServerName, List<String>> undeletedQueues = new HashMap<>(); - Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds()); try { + Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds()); List<ServerName> replicators = this.queueStorage.getListOfReplicators(); if (replicators == null || replicators.isEmpty()) { return undeletedQueues; @@ -84,8 +77,7 @@ public class ReplicationZKNodeCleaner { for (String queueId : queueIds) { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); if (!peerIds.contains(queueInfo.getPeerId())) { - undeletedQueues.computeIfAbsent(replicator, (key) -> new ArrayList<>()).add( - queueId); + undeletedQueues.computeIfAbsent(replicator, (key) -> new ArrayList<>()).add(queueId); if (LOG.isDebugEnabled()) { LOG.debug("Undeleted replication queue for removed peer found: " + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", @@ -106,9 +98,9 @@ public class ReplicationZKNodeCleaner { */ public Set<String> getUnDeletedHFileRefsQueues() throws IOException { Set<String> undeletedHFileRefsQueue = new HashSet<>(); - Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds()); String hfileRefsZNode = queueDeletor.getHfileRefsZNode(); try { + Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds()); List<String> listOfPeers = this.queueStorage.getAllPeersFromHFileRefsQueue(); Set<String> peers = new HashSet<>(listOfPeers); peers.removeAll(peerIds); @@ -116,15 +108,15 @@ public class ReplicationZKNodeCleaner { undeletedHFileRefsQueue.addAll(peers); } } catch (ReplicationException e) { - throw new IOException( - "Failed to get list of all peers from hfile-refs znode " + hfileRefsZNode, e); + throw new IOException("Failed to get list of all peers from hfile-refs znode " + + hfileRefsZNode, e); } return undeletedHFileRefsQueue; } private class ReplicationQueueDeletor extends ReplicationStateZKBase { - public ReplicationQueueDeletor(ZKWatcher zk, Configuration conf, Abortable abortable) { + ReplicationQueueDeletor(ZKWatcher zk, Configuration conf, Abortable abortable) { super(zk, conf, abortable); } @@ -132,19 +124,20 @@ public class ReplicationZKNodeCleaner { * @param replicator The regionserver which has undeleted queue * @param queueId The undeleted queue id */ - public void removeQueue(final ServerName replicator, final String queueId) throws IOException { - String queueZnodePath = ZNodePaths - .joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator.getServerName()), queueId); + void removeQueue(final ServerName replicator, final String queueId) throws IOException { + String queueZnodePath = + ZNodePaths.joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator.getServerName()), + queueId); try { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); - if (!replicationPeers.getAllPeerIds().contains(queueInfo.getPeerId())) { + if (!peerStorage.listPeerIds().contains(queueInfo.getPeerId())) { ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath); - LOG.info("Successfully removed replication queue, replicator: " + replicator + - ", queueId: " + queueId); + LOG.info("Successfully removed replication queue, replicator: " + replicator + + ", queueId: " + queueId); } - } catch (KeeperException e) { - throw new IOException( - "Failed to delete queue, replicator: " + replicator + ", queueId: " + queueId); + } catch (ReplicationException | KeeperException e) { + throw new IOException("Failed to delete queue, replicator: " + replicator + ", queueId: " + + queueId); } } @@ -152,17 +145,17 @@ public class ReplicationZKNodeCleaner { * @param hfileRefsQueueId The undeleted hfile-refs queue id * @throws IOException */ - public void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException { + void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException { String node = ZNodePaths.joinZNode(this.hfileRefsZNode, hfileRefsQueueId); try { - if (!replicationPeers.getAllPeerIds().contains(hfileRefsQueueId)) { + if (!peerStorage.listPeerIds().contains(hfileRefsQueueId)) { ZKUtil.deleteNodeRecursively(this.zookeeper, node); LOG.info("Successfully removed hfile-refs queue " + hfileRefsQueueId + " from path " + hfileRefsZNode); } - } catch (KeeperException e) { + } catch (ReplicationException | KeeperException e) { throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId - + " from path " + hfileRefsZNode); + + " from path " + hfileRefsZNode, e); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/1104d16a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index f4ccce8..b6732d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -314,12 +314,12 @@ public class ReplicationPeerManager { public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf) throws ReplicationException { ReplicationPeerStorage peerStorage = - ReplicationStorageFactory.getReplicationPeerStorage(zk, conf); + ReplicationStorageFactory.getReplicationPeerStorage(zk, conf); ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>(); for (String peerId : peerStorage.listPeerIds()) { - Optional<ReplicationPeerConfig> peerConfig = peerStorage.getPeerConfig(peerId); + ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); boolean enabled = peerStorage.isPeerEnabled(peerId); - peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig.get())); + peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig)); } return new ReplicationPeerManager(peerStorage, ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers); http://git-wip-us.apache.org/repos/asf/hbase/blob/1104d16a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java index 73e600e..27bda2d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java @@ -310,7 +310,7 @@ public class DumpReplicationQueues extends Configured implements Tool { queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); replicationPeers = - ReplicationFactory.getReplicationPeers(zkw, getConf(), queueStorage, connection); + ReplicationFactory.getReplicationPeers(zkw, getConf()); replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(), new WarnOnlyAbortable(), new WarnOnlyStoppable()); Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers()); http://git-wip-us.apache.org/repos/asf/hbase/blob/1104d16a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java index 598357c..1efe180 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java @@ -19,9 +19,10 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; +import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +31,8 @@ import org.slf4j.LoggerFactory; public class PeerProcedureHandlerImpl implements PeerProcedureHandler { private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class); - private ReplicationSourceManager replicationSourceManager; + private final ReplicationSourceManager replicationSourceManager; + private final ReentrantLock peersLock = new ReentrantLock(); public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager) { this.replicationSourceManager = replicationSourceManager; @@ -38,45 +40,40 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { @Override public void addPeer(String peerId) throws ReplicationException, IOException { - replicationSourceManager.addPeer(peerId); + peersLock.lock(); + try { + replicationSourceManager.addPeer(peerId); + } finally { + peersLock.unlock(); + } } @Override public void removePeer(String peerId) throws ReplicationException, IOException { - replicationSourceManager.removePeer(peerId); + peersLock.lock(); + try { + if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) { + replicationSourceManager.removePeer(peerId); + } + } finally { + peersLock.unlock(); + } } @Override public void disablePeer(String peerId) throws ReplicationException, IOException { - ReplicationPeerImpl peer = - replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId); - if (peer != null) { - peer.refreshPeerState(); - LOG.info("disable replication peer, id: " + peerId + ", new state: " + peer.getPeerState()); - } else { - throw new ReplicationException("No connected peer found, peerId=" + peerId); - } + PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId); + LOG.info("disable replication peer, id: " + peerId + ", new state: " + newState); } @Override public void enablePeer(String peerId) throws ReplicationException, IOException { - ReplicationPeerImpl peer = - replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId); - if (peer != null) { - peer.refreshPeerState(); - LOG.info("enable replication peer, id: " + peerId + ", new state: " + peer.getPeerState()); - } else { - throw new ReplicationException("No connected peer found, peerId=" + peerId); - } + PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId); + LOG.info("enable replication peer, id: " + peerId + ", new state: " + newState); } @Override public void updatePeerConfig(String peerId) throws ReplicationException, IOException { - ReplicationPeerImpl peer = - replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId); - if (peer == null) { - throw new ReplicationException("No connected peer found, peerId=" + peerId); - } - peer.refreshPeerConfig(); + replicationSourceManager.getReplicationPeers().refreshPeerConfig(peerId); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/1104d16a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 72f0fe7..f985f90 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -129,7 +129,7 @@ public class Replication implements this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf); this.replicationPeers = - ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server); + ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf); this.replicationPeers.init(); this.replicationTracker = ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers, http://git-wip-us.apache.org/repos/asf/hbase/blob/1104d16a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 271eea7..1f4729b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -223,7 +223,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf // A peerId will not have "-" in its name, see HBASE-11394 peerId = peerClusterZnode.split("-")[0]; } - Map<TableName, List<String>> tableCFMap = replicationPeers.getConnectedPeer(peerId).getTableCFs(); + Map<TableName, List<String>> tableCFMap = replicationPeers.getPeer(peerId).getTableCFs(); if (tableCFMap != null) { List<String> tableCfs = tableCFMap.get(tableName); if (tableCFMap.containsKey(tableName) @@ -371,7 +371,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } private long getCurrentBandwidth() { - ReplicationPeer replicationPeer = this.replicationPeers.getConnectedPeer(peerId); + ReplicationPeer replicationPeer = this.replicationPeers.getPeer(peerId); long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0; // user can set peer bandwidth to 0 to use default bandwidth return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth; @@ -416,7 +416,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf */ @Override public boolean isPeerEnabled() { - return this.replicationPeers.getStatusOfPeer(this.peerId); + return this.replicationPeers.isPeerEnabled(this.peerId); } @Override