HBASE-19525 RS side changes for moving peer modification from zk watcher to procedure
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3885f322 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3885f322 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3885f322 Branch: refs/heads/HBASE-19397 Commit: 3885f322733a02bdc6b93cf792cf18d4702bff13 Parents: 2a13851 Author: huzheng <open...@gmail.com> Authored: Wed Dec 20 10:47:18 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Wed Dec 27 09:40:34 2017 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/protobuf/ProtobufUtil.java | 11 +- .../hbase/shaded/protobuf/ProtobufUtil.java | 13 +- .../hbase/replication/ReplicationListener.java | 14 -- .../hbase/replication/ReplicationPeer.java | 28 ++- .../replication/ReplicationPeerZKImpl.java | 180 ++++----------- .../replication/ReplicationPeersZKImpl.java | 19 +- .../replication/ReplicationTrackerZKImpl.java | 73 +----- .../regionserver/ReplicationSourceService.java | 9 +- .../handler/RSProcedureHandler.java | 3 + .../replication/BaseReplicationEndpoint.java | 2 +- .../regionserver/PeerProcedureHandler.java | 38 ++++ .../regionserver/PeerProcedureHandlerImpl.java | 81 +++++++ .../regionserver/RefreshPeerCallable.java | 39 +++- .../replication/regionserver/Replication.java | 10 + .../regionserver/ReplicationSource.java | 9 +- .../regionserver/ReplicationSourceManager.java | 37 ++- .../replication/TestReplicationAdmin.java | 2 +- .../TestReplicationAdminUsingProcedure.java | 226 +++++++++++++++++++ .../replication/DummyModifyPeerProcedure.java | 48 ---- .../TestDummyModifyPeerProcedure.java | 80 ------- .../TestReplicationTrackerZKImpl.java | 51 ----- .../TestReplicationSourceManager.java | 32 ++- 22 files changed, 533 insertions(+), 472 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 267dc7a..d5285dc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.protobuf; +import static org.apache.hadoop.hbase.protobuf.ProtobufMagic.PB_MAGIC; + import com.google.protobuf.ByteString; import com.google.protobuf.CodedInputStream; import com.google.protobuf.InvalidProtocolBufferException; @@ -199,7 +201,7 @@ public final class ProtobufUtil { * byte array that is <code>bytes.length</code> plus {@link ProtobufMagic#PB_MAGIC}.length. */ public static byte [] prependPBMagic(final byte [] bytes) { - return Bytes.add(ProtobufMagic.PB_MAGIC, bytes); + return Bytes.add(PB_MAGIC, bytes); } /** @@ -224,10 +226,11 @@ public final class ProtobufUtil { * @param bytes bytes to check * @throws DeserializationException if we are missing the pb magic prefix */ - public static void expectPBMagicPrefix(final byte [] bytes) throws DeserializationException { + public static void expectPBMagicPrefix(final byte[] bytes) throws DeserializationException { if (!isPBMagicPrefix(bytes)) { - throw new DeserializationException("Missing pb magic " + - Bytes.toString(ProtobufMagic.PB_MAGIC) + " prefix"); + String bytesPrefix = bytes == null ? "null" : Bytes.toStringBinary(bytes, 0, PB_MAGIC.length); + throw new DeserializationException( + "Missing pb magic " + Bytes.toString(PB_MAGIC) + " prefix, bytes: " + bytesPrefix); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 17b1141..8954d04 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.shaded.protobuf; +import static org.apache.hadoop.hbase.protobuf.ProtobufMagic.PB_MAGIC; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -282,7 +284,7 @@ public final class ProtobufUtil { * byte array that is <code>bytes.length</code> plus {@link ProtobufMagic#PB_MAGIC}.length. */ public static byte [] prependPBMagic(final byte [] bytes) { - return Bytes.add(ProtobufMagic.PB_MAGIC, bytes); + return Bytes.add(PB_MAGIC, bytes); } /** @@ -307,10 +309,11 @@ public final class ProtobufUtil { * @param bytes bytes to check * @throws DeserializationException if we are missing the pb magic prefix */ - public static void expectPBMagicPrefix(final byte [] bytes) throws DeserializationException { + public static void expectPBMagicPrefix(final byte[] bytes) throws DeserializationException { if (!isPBMagicPrefix(bytes)) { - throw new DeserializationException("Missing pb magic " + - Bytes.toString(ProtobufMagic.PB_MAGIC) + " prefix"); + String bytesPrefix = bytes == null ? "null" : Bytes.toStringBinary(bytes, 0, PB_MAGIC.length); + throw new DeserializationException( + "Missing pb magic " + Bytes.toString(PB_MAGIC) + " prefix" + ", bytes: " + bytesPrefix); } } @@ -1962,7 +1965,7 @@ public final class ProtobufUtil { public static byte [] toDelimitedByteArray(final Message m) throws IOException { // Allocate arbitrary big size so we avoid resizing. ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); - baos.write(ProtobufMagic.PB_MAGIC); + baos.write(PB_MAGIC); m.writeDelimitedTo(baos); return baos.toByteArray(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java index 3edfcf9..f040bf9 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java @@ -18,8 +18,6 @@ */ package org.apache.hadoop.hbase.replication; -import java.util.List; - import org.apache.yetus.audience.InterfaceAudience; /** @@ -36,16 +34,4 @@ public interface ReplicationListener { * @param regionServer the removed region server */ public void regionServerRemoved(String regionServer); - - /** - * A peer cluster has been removed (i.e. unregistered) from replication. - * @param peerId The peer id of the cluster that has been removed - */ - public void peerRemoved(String peerId); - - /** - * The list of registered peer clusters has changed. - * @param peerIds A list of all currently registered peer clusters - */ - public void peerListChanged(List<String> peerIds); } http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java index 97e2ddb..b66d76d 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java @@ -55,12 +55,27 @@ public interface ReplicationPeer { public ReplicationPeerConfig getPeerConfig(); /** - * Returns the state of the peer + * Get the peer config object. if loadFromBackingStore is true, it will load from backing store + * directly and update its load peer config. otherwise, just return the local cached peer config. + * @return the ReplicationPeerConfig for this peer + */ + public ReplicationPeerConfig getPeerConfig(boolean loadFromBackingStore) + throws ReplicationException; + + /** + * Returns the state of the peer by reading local cache. * @return the enabled state */ PeerState getPeerState(); /** + * Returns the state of peer, if loadFromBackingStore is true, it will load from backing store + * directly and update its local peer state. otherwise, just return the local cached peer state. + * @return the enabled state + */ + PeerState getPeerState(boolean loadFromBackingStore) throws ReplicationException; + + /** * Get the configuration object required to communicate with this peer * @return configuration object */ @@ -84,6 +99,15 @@ public interface ReplicationPeer { */ public long getPeerBandwidth(); - void trackPeerConfigChanges(ReplicationPeerConfigListener listener); + /** + * Register a peer config listener to catch the peer config change event. + * @param listener listener to catch the peer config change event. + */ + public void registerPeerConfigListener(ReplicationPeerConfigListener listener); + /** + * Notify all the registered ReplicationPeerConfigListener to update their peer config. + * @param newPeerConfig the new peer config. + */ + public void triggerPeerConfigChange(ReplicationPeerConfig newPeerConfig); } http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java index 454d09c..5d051a0 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -33,12 +34,10 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; -import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.NodeExistsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,14 +46,13 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase implements ReplicationPeer, Abortable, Closeable { private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerZKImpl.class); - private ReplicationPeerConfig peerConfig; + private volatile ReplicationPeerConfig peerConfig; private final String id; private volatile PeerState peerState; private volatile Map<TableName, List<String>> tableCFs = new HashMap<>(); private final Configuration conf; - private PeerStateTracker peerStateTracker; - private PeerConfigTracker peerConfigTracker; + private final List<ReplicationPeerConfigListener> peerConfigListeners; /** * Constructor that takes all the objects required to communicate with the specified peer, except @@ -63,62 +61,35 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase * @param id string representation of this peer's identifier * @param peerConfig configuration for the replication peer */ - public ReplicationPeerZKImpl(ZKWatcher zkWatcher, Configuration conf, - String id, ReplicationPeerConfig peerConfig, - Abortable abortable) - throws ReplicationException { + public ReplicationPeerZKImpl(ZKWatcher zkWatcher, Configuration conf, String id, + ReplicationPeerConfig peerConfig, Abortable abortable) throws ReplicationException { super(zkWatcher, conf, abortable); this.conf = conf; this.peerConfig = peerConfig; this.id = id; + this.peerConfigListeners = new ArrayList<>(); } - /** - * start a state tracker to check whether this peer is enabled or not - * - * @param peerStateNode path to zk node which stores peer state - * @throws KeeperException - */ - public void startStateTracker(String peerStateNode) - throws KeeperException { - ensurePeerEnabled(peerStateNode); - this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this); - this.peerStateTracker.start(); + private PeerState readPeerState() throws ReplicationException { try { - this.readPeerStateZnode(); - } catch (DeserializationException e) { - throw ZKUtil.convert(e); + byte[] data = ZKUtil.getData(zookeeper, this.getPeerStateNode(id)); + this.peerState = isStateEnabled(data) ? PeerState.ENABLED : PeerState.DISABLED; + } catch (DeserializationException | KeeperException | InterruptedException e) { + throw new ReplicationException("Get and deserialize peer state data from zookeeper failed: ", + e); } + return this.peerState; } - private void readPeerStateZnode() throws DeserializationException { - this.peerState = - isStateEnabled(this.peerStateTracker.getData(false)) - ? PeerState.ENABLED - : PeerState.DISABLED; - } - - /** - * start a table-cfs tracker to listen the (table, cf-list) map change - * @param peerConfigNode path to zk node which stores table-cfs - * @throws KeeperException - */ - public void startPeerConfigTracker(String peerConfigNode) - throws KeeperException { - this.peerConfigTracker = new PeerConfigTracker(peerConfigNode, zookeeper, - this); - this.peerConfigTracker.start(); - this.readPeerConfig(); - } - - private ReplicationPeerConfig readPeerConfig() { + private ReplicationPeerConfig readPeerConfig() throws ReplicationException { try { - byte[] data = peerConfigTracker.getData(false); + byte[] data = ZKUtil.getData(zookeeper, this.getPeerNode(id)); if (data != null) { this.peerConfig = ReplicationPeerConfigUtil.parsePeerFrom(data); } - } catch (DeserializationException e) { - LOG.error("", e); + } catch (DeserializationException | KeeperException | InterruptedException e) { + throw new ReplicationException("Get and deserialize peer config date from zookeeper failed: ", + e); } return this.peerConfig; } @@ -128,6 +99,15 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase return peerState; } + @Override + public PeerState getPeerState(boolean loadFromBackingStore) throws ReplicationException { + if (loadFromBackingStore) { + return readPeerState(); + } else { + return peerState; + } + } + /** * Get the identifier of this peer * @return string representation of the id (short) @@ -146,6 +126,16 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase return peerConfig; } + @Override + public ReplicationPeerConfig getPeerConfig(boolean loadFromBackingStore) + throws ReplicationException { + if (loadFromBackingStore) { + return readPeerConfig(); + } else { + return peerConfig; + } + } + /** * Get the configuration object required to communicate with this peer * @return configuration object @@ -180,9 +170,14 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase } @Override - public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) { - if (this.peerConfigTracker != null){ - this.peerConfigTracker.setListener(listener); + public void registerPeerConfigListener(ReplicationPeerConfigListener listener) { + this.peerConfigListeners.add(listener); + } + + @Override + public void triggerPeerConfigChange(ReplicationPeerConfig newPeerConfig) { + for (ReplicationPeerConfigListener listener : this.peerConfigListeners) { + listener.peerConfigUpdated(newPeerConfig); } } @@ -223,97 +218,16 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes) throws DeserializationException { ProtobufUtil.expectPBMagicPrefix(bytes); - int pblen = ProtobufUtil.lengthOfPBMagic(); + int pbLen = ProtobufUtil.lengthOfPBMagic(); ReplicationProtos.ReplicationState.Builder builder = ReplicationProtos.ReplicationState.newBuilder(); ReplicationProtos.ReplicationState state; try { - ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen); + ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen); state = builder.build(); return state.getState(); } catch (IOException e) { throw new DeserializationException(e); } } - - /** - * Utility method to ensure an ENABLED znode is in place; if not present, we create it. - * @param path Path to znode to check - * @return True if we created the znode. - * @throws NodeExistsException - * @throws KeeperException - */ - private boolean ensurePeerEnabled(final String path) - throws NodeExistsException, KeeperException { - if (ZKUtil.checkExists(zookeeper, path) == -1) { - // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the - // peer-state znode. This happens while adding a peer. - // The peer state data is set as "ENABLED" by default. - ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path, - ReplicationStateZKBase.ENABLED_ZNODE_BYTES); - return true; - } - return false; - } - - /** - * Tracker for state of this peer - */ - public class PeerStateTracker extends ZKNodeTracker { - - public PeerStateTracker(String peerStateZNode, ZKWatcher watcher, - Abortable abortable) { - super(watcher, peerStateZNode, abortable); - } - - @Override - public synchronized void nodeDataChanged(String path) { - if (path.equals(node)) { - super.nodeDataChanged(path); - try { - readPeerStateZnode(); - } catch (DeserializationException e) { - LOG.warn("Failed deserializing the content of " + path, e); - } - } - } - } - - /** - * Tracker for PeerConfigNode of this peer - */ - public class PeerConfigTracker extends ZKNodeTracker { - - ReplicationPeerConfigListener listener; - - public PeerConfigTracker(String peerConfigNode, ZKWatcher watcher, - Abortable abortable) { - super(watcher, peerConfigNode, abortable); - } - - public synchronized void setListener(ReplicationPeerConfigListener listener){ - this.listener = listener; - } - - @Override - public synchronized void nodeCreated(String path) { - if (path.equals(node)) { - super.nodeCreated(path); - ReplicationPeerConfig config = readPeerConfig(); - if (listener != null){ - listener.peerConfigUpdated(config); - } - } - } - - @Override - public synchronized void nodeDataChanged(String path) { - //superclass calls nodeCreated - if (path.equals(node)) { - super.nodeDataChanged(path); - } - - } - - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 0b2f08a..8e2c5f4 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -500,21 +500,12 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } Configuration peerConf = pair.getSecond(); - ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(zookeeper, - peerConf, peerId, pair.getFirst(), abortable); - try { - peer.startStateTracker(this.getPeerStateNode(peerId)); - } catch (KeeperException e) { - throw new ReplicationException("Error starting the peer state tracker for peerId=" + - peerId, e); - } + ReplicationPeerZKImpl peer = + new ReplicationPeerZKImpl(zookeeper, peerConf, peerId, pair.getFirst(), abortable); - try { - peer.startPeerConfigTracker(this.getPeerNode(peerId)); - } catch (KeeperException e) { - throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" + - peerId, e); - } + // Load peer state and peer config by reading zookeeper directly. + peer.getPeerState(true); + peer.getPeerConfig(true); return peer; } http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java index 9a1d9aa..2c522f6 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java @@ -48,16 +48,12 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements private final List<ReplicationListener> listeners = new CopyOnWriteArrayList<>(); // List of all the other region servers in this cluster private final ArrayList<String> otherRegionServers = new ArrayList<>(); - private final ReplicationPeers replicationPeers; - public ReplicationTrackerZKImpl(ZKWatcher zookeeper, - final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable, - Stoppable stopper) { + public ReplicationTrackerZKImpl(ZKWatcher zookeeper, final ReplicationPeers replicationPeers, + Configuration conf, Abortable abortable, Stoppable stopper) { super(zookeeper, conf, abortable); - this.replicationPeers = replicationPeers; this.stopper = stopper; this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper)); - this.zookeeper.registerListener(new PeersWatcher(this.zookeeper)); } @Override @@ -146,71 +142,6 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements } /** - * Watcher used to follow the creation and deletion of peer clusters. - */ - public class PeersWatcher extends ZKListener { - - /** - * Construct a ZooKeeper event listener. - */ - public PeersWatcher(ZKWatcher watcher) { - super(watcher); - } - - /** - * Called when a node has been deleted - * @param path full path of the deleted node - */ - @Override - public void nodeDeleted(String path) { - List<String> peers = refreshPeersList(path); - if (peers == null) { - return; - } - if (isPeerPath(path)) { - String id = getZNodeName(path); - LOG.info(path + " znode expired, triggering peerRemoved event"); - for (ReplicationListener rl : listeners) { - rl.peerRemoved(id); - } - } - } - - /** - * Called when an existing node has a child node added or removed. - * @param path full path of the node whose children have changed - */ - @Override - public void nodeChildrenChanged(String path) { - List<String> peers = refreshPeersList(path); - if (peers == null) { - return; - } - LOG.info(path + " znode expired, triggering peerListChanged event"); - for (ReplicationListener rl : listeners) { - rl.peerListChanged(peers); - } - } - } - - /** - * Verify if this event is meant for us, and if so then get the latest peers' list from ZK. Also - * reset the watches. - * @param path path to check against - * @return A list of peers' identifiers if the event concerns this watcher, else null. - */ - private List<String> refreshPeersList(String path) { - if (!path.startsWith(getPeersZNode())) { - return null; - } - return this.replicationPeers.getAllPeerIds(); - } - - private String getPeersZNode() { - return this.peersZNode; - } - - /** * Extracts the znode name of a peer cluster from a ZK path * @param fullPath Path to extract the id from * @return the id or an empty string if path is invalid http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java index 835ffbf..a82fa3d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @@ -29,8 +30,14 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @InterfaceAudience.Private public interface ReplicationSourceService extends ReplicationService { /** - * Returns a WALObserver for the service. This is needed to + * Returns a WALObserver for the service. This is needed to * observe log rolls and log archival events. */ WALActionsListener getWALActionsListener(); + + + /** + * Returns a Handler to handle peer procedures. + */ + PeerProcedureHandler getPeerProcedureHandler(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java index 94bcfec..240b0a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.handler; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.log4j.Logger; import org.apache.yetus.audience.InterfaceAudience; /** @@ -28,6 +29,7 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public class RSProcedureHandler extends EventHandler { + private static final Logger LOG = Logger.getLogger(RSProcedureHandler.class); private final long procId; private final RSProcedureCallable callable; @@ -44,6 +46,7 @@ public class RSProcedureHandler extends EventHandler { try { callable.call(); } catch (Exception e) { + LOG.error("Catch exception when call RSProcedureCallable: ", e); error = e; } ((HRegionServer) server).reportProcedureDone(procId, error); http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java index c390d09..2057057 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java @@ -49,7 +49,7 @@ public abstract class BaseReplicationEndpoint extends AbstractService if (this.ctx != null){ ReplicationPeer peer = this.ctx.getReplicationPeer(); if (peer != null){ - peer.trackPeerConfigChanges(this); + peer.registerPeerConfigListener(this); } else { LOG.warn("Not tracking replication peer config changes for Peer Id " + this.ctx.getPeerId() + " because there's no such peer"); http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java new file mode 100644 index 0000000..b392985 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandler.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public interface PeerProcedureHandler { + + public void addPeer(String peerId) throws ReplicationException, IOException; + + public void removePeer(String peerId) throws ReplicationException, IOException; + + public void disablePeer(String peerId) throws ReplicationException, IOException; + + public void enablePeer(String peerId) throws ReplicationException, IOException; + + public void updatePeerConfig(String peerId) throws ReplicationException, IOException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java new file mode 100644 index 0000000..9b493d9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeer; +import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.log4j.Logger; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class PeerProcedureHandlerImpl implements PeerProcedureHandler { + private static final Logger LOG = Logger.getLogger(PeerProcedureHandlerImpl.class); + + private ReplicationSourceManager replicationSourceManager; + + public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager) { + this.replicationSourceManager = replicationSourceManager; + } + + @Override + public void addPeer(String peerId) throws ReplicationException, IOException { + replicationSourceManager.addPeer(peerId); + } + + @Override + public void removePeer(String peerId) throws ReplicationException, IOException { + replicationSourceManager.removePeer(peerId); + } + + @Override + public void disablePeer(String peerId) throws ReplicationException, IOException { + ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId); + if (peer != null) { + PeerState peerState = peer.getPeerState(true); + LOG.info("disablePeer state, peer id: " + peerId + ", state: " + peerState); + } else { + throw new ReplicationException("No connected peer found, peerId=" + peerId); + } + } + + @Override + public void enablePeer(String peerId) throws ReplicationException, IOException { + ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId); + if (peer != null) { + PeerState peerState = peer.getPeerState(true); + LOG.info("enablePeer state, peer id: " + peerId + ", state: " + peerState); + } else { + throw new ReplicationException("No connected peer found, peerId=" + peerId); + } + } + + @Override + public void updatePeerConfig(String peerId) throws ReplicationException, IOException { + ReplicationPeer peer = replicationSourceManager.getReplicationPeers().getConnectedPeer(peerId); + if (peer == null) { + throw new ReplicationException("No connected peer found, peerId=" + peerId); + } + ReplicationPeerConfig rpc = peer.getPeerConfig(true); + peer.triggerPeerConfigChange(rpc); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java index a47a483..c3f33aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java @@ -17,27 +17,29 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType; +import org.apache.log4j.Logger; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter; /** - * The callable executed at RS side to refresh the peer config/state. - * <p> - * TODO: only a dummy implementation for verifying the framework, will add implementation later. + * The callable executed at RS side to refresh the peer config/state. <br/> */ @InterfaceAudience.Private public class RefreshPeerCallable implements RSProcedureCallable { + private static final Logger LOG = Logger.getLogger(RefreshPeerCallable.class); private HRegionServer rs; private String peerId; + private PeerModificationType type; + private Exception initError; @Override @@ -45,9 +47,27 @@ public class RefreshPeerCallable implements RSProcedureCallable { if (initError != null) { throw initError; } - Path dir = new Path("/" + peerId); - if (rs.getFileSystem().exists(dir)) { - rs.getFileSystem().create(new Path(dir, rs.getServerName().toString())).close(); + + LOG.info("Received a peer change event, peerId=" + peerId + ", type=" + type); + PeerProcedureHandler handler = rs.getReplicationSourceService().getPeerProcedureHandler(); + switch (type) { + case ADD_PEER: + handler.addPeer(this.peerId); + break; + case REMOVE_PEER: + handler.removePeer(this.peerId); + break; + case ENABLE_PEER: + handler.enablePeer(this.peerId); + break; + case DISABLE_PEER: + handler.disablePeer(this.peerId); + break; + case UPDATE_PEER_CONFIG: + handler.updatePeerConfig(this.peerId); + break; + default: + throw new IllegalArgumentException("Unknown peer modification type: " + type); } return null; } @@ -56,10 +76,11 @@ public class RefreshPeerCallable implements RSProcedureCallable { public void init(byte[] parameter, HRegionServer rs) { this.rs = rs; try { - this.peerId = RefreshPeerParameter.parseFrom(parameter).getPeerId(); + RefreshPeerParameter param = RefreshPeerParameter.parseFrom(parameter); + this.peerId = param.getPeerId(); + this.type = param.getType(); } catch (InvalidProtocolBufferException e) { initError = e; - return; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index d8212e9..571ee75 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -87,6 +87,8 @@ public class Replication implements // ReplicationLoad to access replication metrics private ReplicationLoad replicationLoad; + private PeerProcedureHandler peerProcedureHandler; + /** * Instantiate the replication management (if rep is enabled). * @param server Hosting server @@ -151,6 +153,8 @@ public class Replication implements this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod); this.replicationLoad = new ReplicationLoad(); + + this.peerProcedureHandler = new PeerProcedureHandlerImpl(replicationManager); } /** @@ -168,6 +172,12 @@ public class Replication implements public WALActionsListener getWALActionsListener() { return this; } + + @Override + public PeerProcedureHandler getPeerProcedureHandler() { + return peerProcedureHandler; + } + /** * Stops replication service. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index f4f35ae..19ea240 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -446,12 +445,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf public void terminate(String reason, Exception cause, boolean join) { if (cause == null) { - LOG.info("Closing source " - + this.peerClusterZnode + " because: " + reason); - + LOG.info("Closing source " + this.peerClusterZnode + " because: " + reason); } else { - LOG.error("Closing source " + this.peerClusterZnode - + " because an error occurred: " + reason, cause); + LOG.error("Closing source " + this.peerClusterZnode + " because an error occurred: " + reason, + cause); } this.sourceRunning = false; Collection<ReplicationSourceShipper> workers = workerThreads.values(); http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 07c53e1..a263fc3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -564,6 +564,18 @@ public class ReplicationSourceManager implements ReplicationListener { this.walsById.remove(src.getPeerClusterZnode()); } + public void addPeer(String id) throws ReplicationException, IOException { + LOG.info("Trying to add peer, peerId: " + id); + boolean added = this.replicationPeers.peerConnected(id); + if (added) { + LOG.info("Peer " + id + " connected success, trying to start the replication source thread."); + addSource(id); + if (replicationForBulkLoadDataEnabled) { + this.replicationQueues.addPeerToHFileRefs(id); + } + } + } + /** * Thie method first deletes all the recovered sources for the specified * id, then deletes the normal source (deleting all related data in ZK). @@ -611,6 +623,8 @@ public class ReplicationSourceManager implements ReplicationListener { } deleteSource(id, true); } + // Remove HFile Refs znode from zookeeper + this.replicationQueues.removePeerFromHFileRefs(id); } @Override @@ -618,29 +632,6 @@ public class ReplicationSourceManager implements ReplicationListener { transferQueues(regionserver); } - @Override - public void peerRemoved(String peerId) { - removePeer(peerId); - this.replicationQueues.removePeerFromHFileRefs(peerId); - } - - @Override - public void peerListChanged(List<String> peerIds) { - for (String id : peerIds) { - try { - boolean added = this.replicationPeers.peerConnected(id); - if (added) { - addSource(id); - if (replicationForBulkLoadDataEnabled) { - this.replicationQueues.addPeerToHFileRefs(id); - } - } - } catch (Exception e) { - LOG.error("Error while adding a new peer", e); - } - } - } - /** * Class responsible to setup new ReplicationSources to take care of the * queues from dead region servers. http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index d2a16aa..fb29e9e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -764,4 +764,4 @@ public class TestReplicationAdmin { assertEquals(2097152, admin.getPeerConfig(ID_ONE).getBandwidth()); admin.removePeer(ID_ONE); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminUsingProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminUsingProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminUsingProcedure.java new file mode 100644 index 0000000..b09a8a7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminUsingProcedure.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client.replication; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.TestReplicationBase; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestReplicationAdminUsingProcedure extends TestReplicationBase { + + private static final String PEER_ID = "2"; + private static final Logger LOG = Logger.getLogger(TestReplicationAdminUsingProcedure.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1.setInt("hbase.multihconnection.threads.max", 10); + + // Start the master & slave mini cluster. + TestReplicationBase.setUpBeforeClass(); + + // Remove the replication peer + hbaseAdmin.removeReplicationPeer(PEER_ID); + } + + private void loadData(int startRowKey, int endRowKey) throws IOException { + for (int i = startRowKey; i < endRowKey; i++) { + byte[] rowKey = Bytes.add(row, Bytes.toBytes(i)); + Put put = new Put(rowKey); + put.addColumn(famName, null, Bytes.toBytes(i)); + htable1.put(put); + } + } + + private void waitForReplication(int expectedRows, int retries) + throws IOException, InterruptedException { + Scan scan; + for (int i = 0; i < retries; i++) { + scan = new Scan(); + if (i == retries - 1) { + throw new IOException("Waited too much time for normal batch replication"); + } + try (ResultScanner scanner = htable2.getScanner(scan)) { + int count = 0; + for (Result res : scanner) { + count++; + } + if (count != expectedRows) { + LOG.info("Only got " + count + " rows, expected rows: " + expectedRows); + Thread.sleep(SLEEP_TIME); + } else { + return; + } + } + } + } + + @Before + public void setUp() throws IOException { + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(utility2.getClusterKey()); + hbaseAdmin.addReplicationPeer(PEER_ID, rpc); + + utility1.waitUntilAllRegionsAssigned(tableName); + utility2.waitUntilAllRegionsAssigned(tableName); + } + + @After + public void tearDown() throws IOException { + hbaseAdmin.removeReplicationPeer(PEER_ID); + truncateBoth(); + } + + private void truncateBoth() throws IOException { + utility1.deleteTableData(tableName); + utility2.deleteTableData(tableName); + } + + @Test + public void testAddPeer() throws Exception { + // Load data + loadData(0, NB_ROWS_IN_BATCH); + + // Wait the replication finished + waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES); + } + + @Test + public void testRemovePeer() throws Exception { + // prev-check + waitForReplication(0, NB_RETRIES); + + // Load data + loadData(0, NB_ROWS_IN_BATCH); + + // Wait the replication finished + waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES); + + // Remove the peer id + hbaseAdmin.removeReplicationPeer(PEER_ID); + + // Load data again + loadData(NB_ROWS_IN_BATCH, 2 * NB_ROWS_IN_BATCH); + + // Wait the replication again + boolean foundException = false; + try { + waitForReplication(NB_ROWS_IN_BATCH * 2, NB_RETRIES); + } catch (IOException e) { + foundException = true; + } + Assert.assertTrue(foundException); + + // Truncate the table in source cluster + truncateBoth(); + + // Add peer again + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(utility2.getClusterKey()); + hbaseAdmin.addReplicationPeer(PEER_ID, rpc); + + // Load data again + loadData(0, NB_ROWS_IN_BATCH); + + // Wait the replication finished + waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES); + } + + @Test + public void testDisableAndEnablePeer() throws Exception { + // disable peer + hbaseAdmin.disableReplicationPeer(PEER_ID); + + // Load data + loadData(0, NB_ROWS_IN_BATCH); + + // Will failed to wait the replication. + boolean foundException = false; + try { + waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES); + } catch (IOException e) { + foundException = true; + } + Assert.assertTrue(foundException); + + // Enable the peer + hbaseAdmin.enableReplicationPeer(PEER_ID); + waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES); + + // Load more data + loadData(NB_ROWS_IN_BATCH, NB_ROWS_IN_BATCH * 2); + + // Wait replication again. + waitForReplication(NB_ROWS_IN_BATCH * 2, NB_RETRIES); + } + + @Test + public void testUpdatePeerConfig() throws Exception { + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(utility2.getClusterKey()); + rpc.setExcludeTableCFsMap( + ImmutableMap.of(tableName, ImmutableList.of(Bytes.toString(famName)))); + + // Update the peer config to exclude the test table name. + hbaseAdmin.updateReplicationPeerConfig(PEER_ID, rpc); + + // Load data + loadData(0, NB_ROWS_IN_BATCH); + + // Will failed to wait the replication + boolean foundException = false; + try { + waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES); + } catch (IOException e) { + foundException = true; + } + Assert.assertTrue(foundException); + + // Truncate the table in source cluster + truncateBoth(); + + // Update the peer config to include the test table name. + ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); + rpc2.setClusterKey(utility2.getClusterKey()); + hbaseAdmin.updateReplicationPeerConfig(PEER_ID, rpc2); + + // Load data again + loadData(0, NB_ROWS_IN_BATCH); + + // Wait the replication finished + waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java deleted file mode 100644 index ed7c6fa..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.replication; - -import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; - -public class DummyModifyPeerProcedure extends ModifyPeerProcedure { - - public DummyModifyPeerProcedure() { - } - - public DummyModifyPeerProcedure(String peerId) { - super(peerId); - } - - @Override - public PeerOperationType getPeerOperationType() { - return PeerOperationType.ADD; - } - - @Override - protected void prePeerModification(MasterProcedureEnv env) { - } - - @Override - protected void updatePeerStorage(MasterProcedureEnv env) { - } - - @Override - protected void postPeerModification(MasterProcedureEnv env) { - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDummyModifyPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDummyModifyPeerProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDummyModifyPeerProcedure.java deleted file mode 100644 index ec06306..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDummyModifyPeerProcedure.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.replication; - -import static org.junit.Assert.assertTrue; - -import java.util.HashSet; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.MasterTests; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ MasterTests.class, LargeTests.class }) -public class TestDummyModifyPeerProcedure { - - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - - private static String PEER_ID; - - private static Path DIR; - - @BeforeClass - public static void setUp() throws Exception { - UTIL.startMiniCluster(3); - PEER_ID = "testPeer"; - DIR = new Path("/" + PEER_ID); - UTIL.getTestFileSystem().mkdirs(DIR); - } - - @AfterClass - public static void tearDown() throws Exception { - UTIL.shutdownMiniCluster(); - } - - @Test - public void test() throws Exception { - ProcedureExecutor<?> executor = - UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); - long procId = executor.submitProcedure(new DummyModifyPeerProcedure(PEER_ID)); - UTIL.waitFor(30000, new Waiter.Predicate<Exception>() { - - @Override - public boolean evaluate() throws Exception { - return executor.isFinished(procId); - } - }); - Set<String> serverNames = UTIL.getHBaseCluster().getRegionServerThreads().stream() - .map(t -> t.getRegionServer().getServerName().toString()) - .collect(Collectors.toCollection(HashSet::new)); - for (FileStatus s : UTIL.getTestFileSystem().listStatus(DIR)) { - assertTrue(serverNames.remove(s.getPath().getName())); - } - assertTrue(serverNames.isEmpty()); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java index a04d524..f118ca3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; @@ -156,41 +155,6 @@ public class TestReplicationTrackerZKImpl { } @Test(timeout = 30000) - public void testPeerRemovedEvent() throws Exception { - rp.registerPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); - rt.registerListener(new DummyReplicationListener()); - rp.unregisterPeer("5"); - // wait for event - while (peerRemovedCount.get() < 1) { - Thread.sleep(5); - } - assertEquals("5", peerRemovedData); - } - - @Test(timeout = 30000) - public void testPeerListChangedEvent() throws Exception { - // add a peer - rp.registerPeer("5", new ReplicationPeerConfig().setClusterKey(utility.getClusterKey())); - zkw.getRecoverableZooKeeper().getZooKeeper().getChildren("/hbase/replication/peers/5", true); - rt.registerListener(new DummyReplicationListener()); - rp.disablePeer("5"); - int tmp = plChangedCount.get(); - LOG.info("Peer count=" + tmp); - ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5/peer-state"); - // wait for event - while (plChangedCount.get() <= tmp) { - Thread.sleep(100); - LOG.info("Peer count=" + tmp); - } - assertEquals(1, plChangedData.size()); - assertTrue(plChangedData.contains("5")); - - // clean up - //ZKUtil.deleteNode(zkw, "/hbase/replication/peers/5"); - rp.unregisterPeer("5"); - } - - @Test(timeout = 30000) public void testPeerNameControl() throws Exception { int exists = 0; int hyphen = 0; @@ -222,21 +186,6 @@ public class TestReplicationTrackerZKImpl { rsRemovedCount.getAndIncrement(); LOG.debug("Received regionServerRemoved event: " + regionServer); } - - @Override - public void peerRemoved(String peerId) { - peerRemovedData = peerId; - peerRemovedCount.getAndIncrement(); - LOG.debug("Received peerDisconnected event: " + peerId); - } - - @Override - public void peerListChanged(List<String> peerIds) { - plChangedData.clear(); - plChangedData.addAll(peerIds); - int count = plChangedCount.getAndIncrement(); - LOG.debug("Received peerListChanged event " + count); - } } private class DummyServer implements Server { http://git-wip-us.apache.org/repos/asf/hbase/blob/3885f322/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 307ea7f..9f234a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -176,6 +176,12 @@ public abstract class TestReplicationSourceManager { replication = new Replication(new DummyServer(), fs, logDir, oldLogDir); managerOfCluster = getManagerFromCluster(); + if (managerOfCluster != null) { + // After replication procedure, we need to add peer by hand (other than by receiving + // notification from zk) + managerOfCluster.addPeer(slaveId); + } + manager = replication.getReplicationManager(); manager.addSource(slaveId); if (managerOfCluster != null) { @@ -535,18 +541,16 @@ public abstract class TestReplicationSourceManager { final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); final long sizeOfLatestPath = getSizeOfLatestPath(); addPeerAndWait(peerId, peerConfig, true); - assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, - globalSource.getSizeOfLogQueue()); + assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); ReplicationSourceInterface source = manager.getSource(peerId); // Sanity check assertNotNull(source); final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); // Enqueue log and check if metrics updated source.enqueueLog(new Path("abc")); - assertEquals(1 + sizeOfSingleLogQueue, - source.getSourceMetrics().getSizeOfLogQueue()); - assertEquals(source.getSourceMetrics().getSizeOfLogQueue() - + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); + assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); + assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, + globalSource.getSizeOfLogQueue()); // Removing the peer should reset the global metrics removePeerAndWait(peerId); @@ -556,9 +560,8 @@ public abstract class TestReplicationSourceManager { addPeerAndWait(peerId, peerConfig, true); source = manager.getSource(peerId); assertNotNull(source); - assertEquals(sizeOfLatestPath, source.getSourceMetrics().getSizeOfLogQueue()); - assertEquals(source.getSourceMetrics().getSizeOfLogQueue() - + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); + assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, + globalSource.getSizeOfLogQueue()); } finally { removePeerAndWait(peerId); } @@ -575,8 +578,14 @@ public abstract class TestReplicationSourceManager { final boolean waitForSource) throws Exception { final ReplicationPeers rp = manager.getReplicationPeers(); rp.registerPeer(peerId, peerConfig); + try { + manager.addPeer(peerId); + } catch (Exception e) { + // ignore the failed exception, because we'll test both success & failed case. + } waitPeer(peerId, manager, waitForSource); if (managerOfCluster != null) { + managerOfCluster.addPeer(peerId); waitPeer(peerId, managerOfCluster, waitForSource); } } @@ -609,6 +618,11 @@ public abstract class TestReplicationSourceManager { final ReplicationPeers rp = manager.getReplicationPeers(); if (rp.getAllPeerIds().contains(peerId)) { rp.unregisterPeer(peerId); + try { + manager.removePeer(peerId); + } catch (Exception e) { + // ignore the failed exception and continue. + } } Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { @Override public boolean evaluate() throws Exception {