HBASE-19573 Rewrite ReplicationPeer with the new replication storage interface
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f1e447cb Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f1e447cb Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f1e447cb Branch: refs/heads/HBASE-19397-branch-2 Commit: f1e447cbe316c8b68a52672738b09f46e560e1ae Parents: 2c439d3 Author: Guanghao Zhang <zg...@apache.org> Authored: Tue Dec 26 11:39:34 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Thu Jan 18 13:27:54 2018 +0800 ---------------------------------------------------------------------- .../replication/VerifyReplication.java | 5 - .../hbase/replication/ReplicationPeer.java | 42 ++-- .../hbase/replication/ReplicationPeerImpl.java | 169 ++++++++++++++ .../replication/ReplicationPeerZKImpl.java | 233 ------------------- .../hbase/replication/ReplicationPeers.java | 4 +- .../replication/ReplicationPeersZKImpl.java | 23 +- .../replication/TestReplicationStateBasic.java | 7 +- .../regionserver/PeerProcedureHandlerImpl.java | 29 +-- 8 files changed, 216 insertions(+), 296 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/f1e447cb/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 9065f4e..09d4b4b 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.mapreduce.TableSplit; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; import org.apache.hadoop.hbase.util.Bytes; @@ -333,7 +332,6 @@ public class VerifyReplication extends Configured implements Tool { private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig( final Configuration conf, String peerId) throws IOException { ZKWatcher localZKW = null; - ReplicationPeerZKImpl peer = null; try { localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() { @@ -354,9 +352,6 @@ public class VerifyReplication extends Configured implements Tool { throw new IOException( "An error occurred while trying to connect to the remove peer cluster", e); } finally { - if (peer != null) { - peer.close(); - } if (localZKW != null) { localZKW.close(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/f1e447cb/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index b66d76d..4846018 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; - /** * ReplicationPeer manages enabled / disabled state for the peer. */ @@ -49,65 +48,52 @@ public interface ReplicationPeer { String getId(); /** - * Get the peer config object - * @return the ReplicationPeerConfig for this peer - */ - public ReplicationPeerConfig getPeerConfig(); - - /** - * Get the peer config object. if loadFromBackingStore is true, it will load from backing store - * directly and update its load peer config. otherwise, just return the local cached peer config. - * @return the ReplicationPeerConfig for this peer - */ - public ReplicationPeerConfig getPeerConfig(boolean loadFromBackingStore) - throws ReplicationException; - - /** * Returns the state of the peer by reading local cache. * @return the enabled state */ PeerState getPeerState(); /** - * Returns the state of peer, if loadFromBackingStore is true, it will load from backing store - * directly and update its local peer state. otherwise, just return the local cached peer state. - * @return the enabled state + * Get the peer config object + * @return the ReplicationPeerConfig for this peer */ - PeerState getPeerState(boolean loadFromBackingStore) throws ReplicationException; + ReplicationPeerConfig getPeerConfig(); /** * Get the configuration object required to communicate with this peer * @return configuration object */ - public Configuration getConfiguration(); + Configuration getConfiguration(); /** * Get replicable (table, cf-list) map of this peer * @return the replicable (table, cf-list) map */ - public Map<TableName, List<String>> getTableCFs(); + Map<TableName, List<String>> getTableCFs(); /** * Get replicable namespace set of this peer * @return the replicable namespaces set */ - public Set<String> getNamespaces(); + Set<String> getNamespaces(); /** * Get the per node bandwidth upper limit for this peer * @return the bandwidth upper limit */ - public long getPeerBandwidth(); + long getPeerBandwidth(); /** * Register a peer config listener to catch the peer config change event. * @param listener listener to catch the peer config change event. */ - public void registerPeerConfigListener(ReplicationPeerConfigListener listener); + void registerPeerConfigListener(ReplicationPeerConfigListener listener); /** - * Notify all the registered ReplicationPeerConfigListener to update their peer config. - * @param newPeerConfig the new peer config. + * @deprecated Use {@link #registerPeerConfigListener(ReplicationPeerConfigListener)} instead. */ - public void triggerPeerConfigChange(ReplicationPeerConfig newPeerConfig); -} + @Deprecated + default void trackPeerConfigChanges(ReplicationPeerConfigListener listener) { + registerPeerConfigListener(listener); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/f1e447cb/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java new file mode 100644 index 0000000..2c7ea9b --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerImpl.java @@ -0,0 +1,169 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; + +@InterfaceAudience.Private +public class ReplicationPeerImpl implements ReplicationPeer { + private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerImpl.class); + + private final ReplicationPeerStorage peerStorage; + + private final Configuration conf; + + private final String id; + + private volatile ReplicationPeerConfig peerConfig; + + private volatile PeerState peerState; + + private final List<ReplicationPeerConfigListener> peerConfigListeners; + + /** + * Constructor that takes all the objects required to communicate with the specified peer, except + * for the region server addresses. + * @param conf configuration object to this peer + * @param id string representation of this peer's identifier + * @param peerConfig configuration for the replication peer + */ + public ReplicationPeerImpl(ZKWatcher zkWatcher, Configuration conf, String id, + ReplicationPeerConfig peerConfig) { + this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkWatcher, conf); + this.conf = conf; + this.peerConfig = peerConfig; + this.id = id; + this.peerConfigListeners = new ArrayList<>(); + } + + public void refreshPeerState() throws ReplicationException { + this.peerState = peerStorage.isPeerEnabled(id) ? PeerState.ENABLED : PeerState.DISABLED; + } + + public void refreshPeerConfig() throws ReplicationException { + this.peerConfig = peerStorage.getPeerConfig(id).orElse(peerConfig); + peerConfigListeners.forEach(listener -> listener.peerConfigUpdated(peerConfig)); + } + + /** + * Get the identifier of this peer + * @return string representation of the id (short) + */ + @Override + public String getId() { + return id; + } + + @Override + public PeerState getPeerState() { + return peerState; + } + + /** + * Get the peer config object + * @return the ReplicationPeerConfig for this peer + */ + @Override + public ReplicationPeerConfig getPeerConfig() { + return peerConfig; + } + + /** + * Get the configuration object required to communicate with this peer + * @return configuration object + */ + @Override + public Configuration getConfiguration() { + return conf; + } + + /** + * Get replicable (table, cf-list) map of this peer + * @return the replicable (table, cf-list) map + */ + @Override + public Map<TableName, List<String>> getTableCFs() { + return this.peerConfig.getTableCFsMap(); + } + + /** + * Get replicable namespace set of this peer + * @return the replicable namespaces set + */ + @Override + public Set<String> getNamespaces() { + return this.peerConfig.getNamespaces(); + } + + @Override + public long getPeerBandwidth() { + return this.peerConfig.getBandwidth(); + } + + @Override + public void registerPeerConfigListener(ReplicationPeerConfigListener listener) { + this.peerConfigListeners.add(listener); + } + + /** + * Parse the raw data from ZK to get a peer's state + * @param bytes raw ZK data + * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state. + * @throws DeserializationException + */ + public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException { + ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes); + return ReplicationProtos.ReplicationState.State.ENABLED == state; + } + + /** + * @param bytes Content of a state znode. + * @return State parsed from the passed bytes. + * @throws DeserializationException + */ + private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes) + throws DeserializationException { + ProtobufUtil.expectPBMagicPrefix(bytes); + int pbLen = ProtobufUtil.lengthOfPBMagic(); + ReplicationProtos.ReplicationState.Builder builder = + ReplicationProtos.ReplicationState.newBuilder(); + ReplicationProtos.ReplicationState state; + try { + ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen); + state = builder.build(); + return state.getState(); + } catch (IOException e) { + throw new DeserializationException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/f1e447cb/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java deleted file mode 100644 index 49b9460..0000000 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; -import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.log.HBaseMarkers; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; - -@InterfaceAudience.Private -public class ReplicationPeerZKImpl extends ReplicationStateZKBase - implements ReplicationPeer, Abortable, Closeable { - private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerZKImpl.class); - - private volatile ReplicationPeerConfig peerConfig; - private final String id; - private volatile PeerState peerState; - private volatile Map<TableName, List<String>> tableCFs = new HashMap<>(); - private final Configuration conf; - - private final List<ReplicationPeerConfigListener> peerConfigListeners; - - /** - * Constructor that takes all the objects required to communicate with the specified peer, except - * for the region server addresses. - * @param conf configuration object to this peer - * @param id string representation of this peer's identifier - * @param peerConfig configuration for the replication peer - */ - public ReplicationPeerZKImpl(ZKWatcher zkWatcher, Configuration conf, String id, - ReplicationPeerConfig peerConfig, Abortable abortable) throws ReplicationException { - super(zkWatcher, conf, abortable); - this.conf = conf; - this.peerConfig = peerConfig; - this.id = id; - this.peerConfigListeners = new ArrayList<>(); - } - - private PeerState readPeerState() throws ReplicationException { - try { - byte[] data = ZKUtil.getData(zookeeper, this.getPeerStateNode(id)); - this.peerState = isStateEnabled(data) ? PeerState.ENABLED : PeerState.DISABLED; - } catch (DeserializationException | KeeperException | InterruptedException e) { - throw new ReplicationException("Get and deserialize peer state data from zookeeper failed: ", - e); - } - return this.peerState; - } - - private ReplicationPeerConfig readPeerConfig() throws ReplicationException { - try { - byte[] data = ZKUtil.getData(zookeeper, this.getPeerNode(id)); - if (data != null) { - this.peerConfig = ReplicationPeerConfigUtil.parsePeerFrom(data); - } - } catch (DeserializationException | KeeperException | InterruptedException e) { - throw new ReplicationException("Get and deserialize peer config date from zookeeper failed: ", - e); - } - return this.peerConfig; - } - - @Override - public PeerState getPeerState() { - return peerState; - } - - @Override - public PeerState getPeerState(boolean loadFromBackingStore) throws ReplicationException { - if (loadFromBackingStore) { - return readPeerState(); - } else { - return peerState; - } - } - - /** - * Get the identifier of this peer - * @return string representation of the id (short) - */ - @Override - public String getId() { - return id; - } - - /** - * Get the peer config object - * @return the ReplicationPeerConfig for this peer - */ - @Override - public ReplicationPeerConfig getPeerConfig() { - return peerConfig; - } - - @Override - public ReplicationPeerConfig getPeerConfig(boolean loadFromBackingStore) - throws ReplicationException { - if (loadFromBackingStore) { - return readPeerConfig(); - } else { - return peerConfig; - } - } - - /** - * Get the configuration object required to communicate with this peer - * @return configuration object - */ - @Override - public Configuration getConfiguration() { - return conf; - } - - /** - * Get replicable (table, cf-list) map of this peer - * @return the replicable (table, cf-list) map - */ - @Override - public Map<TableName, List<String>> getTableCFs() { - this.tableCFs = peerConfig.getTableCFsMap(); - return this.tableCFs; - } - - /** - * Get replicable namespace set of this peer - * @return the replicable namespaces set - */ - @Override - public Set<String> getNamespaces() { - return this.peerConfig.getNamespaces(); - } - - @Override - public long getPeerBandwidth() { - return this.peerConfig.getBandwidth(); - } - - @Override - public void registerPeerConfigListener(ReplicationPeerConfigListener listener) { - this.peerConfigListeners.add(listener); - } - - @Override - public void triggerPeerConfigChange(ReplicationPeerConfig newPeerConfig) { - for (ReplicationPeerConfigListener listener : this.peerConfigListeners) { - listener.peerConfigUpdated(newPeerConfig); - } - } - - @Override - public void abort(String why, Throwable e) { - LOG.error(HBaseMarkers.FATAL, "The ReplicationPeer corresponding to peer " + - peerConfig + " was aborted for the following reason(s):" + why, e); - } - - @Override - public boolean isAborted() { - // Currently the replication peer is never "Aborted", we just log when the - // abort method is called. - return false; - } - - @Override - public void close() throws IOException { - // TODO: stop zkw? - } - - /** - * Parse the raw data from ZK to get a peer's state - * @param bytes raw ZK data - * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state. - * @throws DeserializationException - */ - public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException { - ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes); - return ReplicationProtos.ReplicationState.State.ENABLED == state; - } - - /** - * @param bytes Content of a state znode. - * @return State parsed from the passed bytes. - * @throws DeserializationException - */ - private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes) - throws DeserializationException { - ProtobufUtil.expectPBMagicPrefix(bytes); - int pbLen = ProtobufUtil.lengthOfPBMagic(); - ReplicationProtos.ReplicationState.Builder builder = - ReplicationProtos.ReplicationState.newBuilder(); - ReplicationProtos.ReplicationState state; - try { - ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen); - state = builder.build(); - return state.getState(); - } catch (IOException e) { - throw new DeserializationException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/f1e447cb/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index 10936bf..afc19bd 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -116,13 +116,13 @@ public interface ReplicationPeers { throws ReplicationException; /** - * Returns the ReplicationPeer for the specified connected peer. This ReplicationPeer will + * Returns the ReplicationPeerImpl for the specified connected peer. This ReplicationPeer will * continue to track changes to the Peer's state and config. This method returns null if no * peer has been connected with the given peerId. * @param peerId id for the peer * @return ReplicationPeer object */ - ReplicationPeer getConnectedPeer(String peerId); + ReplicationPeerImpl getConnectedPeer(String peerId); /** * Returns the set of peerIds of the clusters that have been connected and have an underlying http://git-wip-us.apache.org/repos/asf/hbase/blob/f1e447cb/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 7de4619..7f6498d 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -80,7 +80,7 @@ import org.slf4j.LoggerFactory; public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers { // Map of peer clusters keyed by their id - private Map<String, ReplicationPeerZKImpl> peerClusters; + private ConcurrentMap<String, ReplicationPeerImpl> peerClusters; private final ReplicationQueueStorage queueStorage; private Abortable abortable; @@ -232,7 +232,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } String peerStateZNode = getPeerStateNode(id); try { - return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode)); + return ReplicationPeerImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode)); } catch (KeeperException e) { throw new ReplicationException(e); } catch (DeserializationException e) { @@ -270,7 +270,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } @Override - public ReplicationPeer getConnectedPeer(String peerId) { + public ReplicationPeerImpl getConnectedPeer(String peerId) { return peerClusters.get(peerId); } @@ -423,7 +423,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re public void peerDisconnected(String peerId) { ReplicationPeer rp = this.peerClusters.get(peerId); if (rp != null) { - ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).remove(peerId, rp); + peerClusters.remove(peerId, rp); } } @@ -440,7 +440,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re return false; } - ReplicationPeerZKImpl peer = null; + ReplicationPeerImpl peer = null; try { peer = createPeer(peerId); } catch (Exception e) { @@ -449,8 +449,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re if (peer == null) { return false; } - ReplicationPeerZKImpl previous = - ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).putIfAbsent(peerId, peer); + ReplicationPeerImpl previous = peerClusters.putIfAbsent(peerId, peer); if (previous == null) { LOG.info("Added new peer cluster=" + peer.getPeerConfig().getClusterKey()); } else { @@ -493,19 +492,19 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re * @return object representing the peer * @throws ReplicationException */ - private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException { + private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException { Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId); if (pair == null) { return null; } Configuration peerConf = pair.getSecond(); - ReplicationPeerZKImpl peer = - new ReplicationPeerZKImpl(zookeeper, peerConf, peerId, pair.getFirst(), abortable); + ReplicationPeerImpl peer = + new ReplicationPeerImpl(zookeeper, peerConf, peerId, pair.getFirst()); // Load peer state and peer config by reading zookeeper directly. - peer.getPeerState(true); - peer.getPeerConfig(true); + peer.refreshPeerState(); + peer.refreshPeerConfig(); return peer; } http://git-wip-us.apache.org/repos/asf/hbase/blob/f1e447cb/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index 6fe869c..8905d43 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -312,12 +312,15 @@ public abstract class TestReplicationStateBasic { rp.disablePeer(ID_ONE); // now we do not rely on zk watcher to trigger the state change so we need to trigger it // manually... - assertEquals(PeerState.DISABLED, rp.getConnectedPeer(ID_ONE).getPeerState(true)); + ReplicationPeerImpl peer = rp.getConnectedPeer(ID_ONE); + peer.refreshPeerState(); + assertEquals(PeerState.DISABLED, peer.getPeerState()); assertConnectedPeerStatus(false, ID_ONE); rp.enablePeer(ID_ONE); // now we do not rely on zk watcher to trigger the state change so we need to trigger it // manually... - assertEquals(PeerState.ENABLED, rp.getConnectedPeer(ID_ONE).getPeerState(true)); + peer.refreshPeerState(); + assertEquals(PeerState.ENABLED, peer.getPeerState()); assertConnectedPeerStatus(true, ID_ONE); // Disconnect peer http://git-wip-us.apache.org/repos/asf/hbase/blob/f1e447cb/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java index 9b493d9..598357c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java @@ -21,15 +21,14 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationPeer; -import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.log4j.Logger; +import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @InterfaceAudience.Private public class PeerProcedureHandlerImpl implements PeerProcedureHandler { - private static final Logger LOG = Logger.getLogger(PeerProcedureHandlerImpl.class); + private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class); private ReplicationSourceManager replicationSourceManager; @@ -49,10 +48,11 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { @Override public void disablePeer(String peerId) throws ReplicationException, IOException { - ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId); + ReplicationPeerImpl peer = + replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId); if (peer != null) { - PeerState peerState = peer.getPeerState(true); - LOG.info("disablePeer state, peer id: " + peerId + ", state: " + peerState); + peer.refreshPeerState(); + LOG.info("disable replication peer, id: " + peerId + ", new state: " + peer.getPeerState()); } else { throw new ReplicationException("No connected peer found, peerId=" + peerId); } @@ -60,10 +60,11 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { @Override public void enablePeer(String peerId) throws ReplicationException, IOException { - ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId); + ReplicationPeerImpl peer = + replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId); if (peer != null) { - PeerState peerState = peer.getPeerState(true); - LOG.info("enablePeer state, peer id: " + peerId + ", state: " + peerState); + peer.refreshPeerState(); + LOG.info("enable replication peer, id: " + peerId + ", new state: " + peer.getPeerState()); } else { throw new ReplicationException("No connected peer found, peerId=" + peerId); } @@ -71,11 +72,11 @@ public class PeerProcedureHandlerImpl implements PeerProcedureHandler { @Override public void updatePeerConfig(String peerId) throws ReplicationException, IOException { - ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId); + ReplicationPeerImpl peer = + replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId); if (peer == null) { throw new ReplicationException("No connected peer found, peerId=" + peerId); } - ReplicationPeerConfig rpc = peer.getPeerConfig(true); - peer.triggerPeerConfigChange(rpc); + peer.refreshPeerConfig(); } }