HBASE-19543 Abstract a replication storage interface to extract the zk specific code
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2f1b3580 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2f1b3580 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2f1b3580 Branch: refs/heads/HBASE-19397-branch-2 Commit: 2f1b3580aa76c37b6b473663dc9fcb70e025587a Parents: 7398495 Author: zhangduo <zhang...@apache.org> Authored: Fri Dec 22 14:37:28 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Wed Mar 7 18:15:25 2018 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/util/CollectionUtils.java | 3 + hbase-replication/pom.xml | 12 + .../replication/ReplicationPeerStorage.java | 74 ++++ .../replication/ReplicationQueueStorage.java | 164 +++++++ .../replication/ReplicationStateZKBase.java | 1 - .../replication/ReplicationStorageFactory.java | 49 +++ .../replication/ZKReplicationPeerStorage.java | 164 +++++++ .../replication/ZKReplicationQueueStorage.java | 425 +++++++++++++++++++ .../replication/ZKReplicationStorageBase.java | 75 ++++ .../TestZKReplicationPeerStorage.java | 171 ++++++++ .../TestZKReplicationQueueStorage.java | 171 ++++++++ .../org/apache/hadoop/hbase/master/HMaster.java | 34 +- .../hadoop/hbase/master/MasterServices.java | 6 +- .../master/procedure/MasterProcedureEnv.java | 24 +- .../master/replication/AddPeerProcedure.java | 6 +- .../replication/DisablePeerProcedure.java | 7 +- .../master/replication/EnablePeerProcedure.java | 6 +- .../master/replication/ModifyPeerProcedure.java | 41 +- .../master/replication/RemovePeerProcedure.java | 6 +- .../master/replication/ReplicationManager.java | 199 --------- .../replication/ReplicationPeerManager.java | 331 +++++++++++++++ .../replication/UpdatePeerConfigProcedure.java | 7 +- .../replication/TestReplicationAdmin.java | 64 ++- .../hbase/master/MockNoopMasterServices.java | 13 +- .../hbase/master/TestMasterNoCluster.java | 3 +- .../TestReplicationDisableInactivePeer.java | 6 +- 26 files changed, 1749 insertions(+), 313 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1b3580/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java index 875b124..8bbb6f1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CollectionUtils.java @@ -107,6 +107,9 @@ public class CollectionUtils { return list.get(list.size() - 1); } + public static <T> List<T> nullToEmpty(List<T> list) { + return list != null ? list : Collections.emptyList(); + } /** * In HBASE-16648 we found that ConcurrentHashMap.get is much faster than computeIfAbsent if the * value already exists. Notice that the implementation does not guarantee that the supplier will http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1b3580/hbase-replication/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-replication/pom.xml b/hbase-replication/pom.xml index 136e832..282e9ca 100644 --- a/hbase-replication/pom.xml +++ b/hbase-replication/pom.xml @@ -97,6 +97,18 @@ <groupId>org.apache.hbase</groupId> <artifactId>hbase-zookeeper</artifactId> </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-zookeeper</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> <!-- General dependencies --> <dependency> <groupId>org.apache.commons</groupId> http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1b3580/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java new file mode 100644 index 0000000..e00cd0d --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; +import java.util.Optional; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Perform read/write to the replication peer storage. + */ +@InterfaceAudience.Private +public interface ReplicationPeerStorage { + + /** + * Add a replication peer. + * @throws ReplicationException if there are errors accessing the storage service. + */ + void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + throws ReplicationException; + + /** + * Remove a replication peer. + * @throws ReplicationException if there are errors accessing the storage service. + */ + void removePeer(String peerId) throws ReplicationException; + + /** + * Set the state of peer, {@code true} to {@code ENABLED}, otherwise to {@code DISABLED}. + * @throws ReplicationException if there are errors accessing the storage service. + */ + void setPeerState(String peerId, boolean enabled) throws ReplicationException; + + /** + * Update the config a replication peer. + * @throws ReplicationException if there are errors accessing the storage service. + */ + void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) + throws ReplicationException; + + /** + * Return the peer ids of all replication peers. + * @throws ReplicationException if there are errors accessing the storage service. + */ + List<String> listPeerIds() throws ReplicationException; + + /** + * Test whether a replication peer is enabled. + * @throws ReplicationException if there are errors accessing the storage service. + */ + boolean isPeerEnabled(String peerId) throws ReplicationException; + + /** + * Get the peer config of a replication peer. + * @throws ReplicationException if there are errors accessing the storage service. + */ + Optional<ReplicationPeerConfig> getPeerConfig(String peerId) throws ReplicationException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1b3580/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java new file mode 100644 index 0000000..7210d9a --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.List; +import java.util.Set; +import java.util.SortedSet; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Perform read/write to the replication queue storage. + */ +@InterfaceAudience.Private +public interface ReplicationQueueStorage { + + /** + * Remove a replication queue for a given regionserver. + * @param serverName the name of the regionserver + * @param queueId a String that identifies the queue. + */ + void removeQueue(ServerName serverName, String queueId) throws ReplicationException; + + /** + * Add a new WAL file to the given queue for a given regionserver. If the queue does not exist it + * is created. + * @param serverName the name of the regionserver + * @param queueId a String that identifies the queue. + * @param fileName name of the WAL + */ + void addWAL(ServerName serverName, String queueId, String fileName) throws ReplicationException; + + /** + * Remove an WAL file from the given queue for a given regionserver. + * @param serverName the name of the regionserver + * @param queueId a String that identifies the queue. + * @param fileName name of the WAL + */ + void removeWAL(ServerName serverName, String queueId, String fileName) + throws ReplicationException; + + /** + * Set the current position for a specific WAL in a given queue for a given regionserver. + * @param serverName the name of the regionserver + * @param queueId a String that identifies the queue + * @param fileName name of the WAL + * @param position the current position in the file + */ + void setWALPosition(ServerName serverName, String queueId, String fileName, long position) + throws ReplicationException; + + /** + * Get the current position for a specific WAL in a given queue for a given regionserver. + * @param serverName the name of the regionserver + * @param queueId a String that identifies the queue + * @param fileName name of the WAL + * @return the current position in the file + */ + long getWALPosition(ServerName serverName, String queueId, String fileName) + throws ReplicationException; + + /** + * Get a list of all queues for the specified region server. + * @param serverName the server name of the region server that owns the set of queues + * @return a list of queueIds + */ + List<String> getAllQueues(ServerName serverName) throws ReplicationException; + + /** + * Change ownership for the queue identified by queueId and belongs to a dead region server. + * @param sourceServerName the name of the dead region server + * @param destServerName the name of the target region server + * @param queueId the id of the queue + * @return the new PeerId and A SortedSet of WALs in its queue + */ + Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId, + ServerName destServerName) throws ReplicationException; + + /** + * Remove the record of region server if the queue is empty. + */ + void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException; + + /** + * Get a list of all region servers that have outstanding replication queues. These servers could + * be alive, dead or from a previous run of the cluster. + * @return a list of server names + */ + List<ServerName> getListOfReplicators() throws ReplicationException; + + /** + * Load all wals in all replication queues. This method guarantees to return a snapshot which + * contains all WALs in the zookeeper at the start of this call even there is concurrent queue + * failover. However, some newly created WALs during the call may not be included. + */ + Set<String> getAllWALs() throws ReplicationException; + + /** + * Add a peer to hfile reference queue if peer does not exist. + * @param peerId peer cluster id to be added + * @throws ReplicationException if fails to add a peer id to hfile reference queue + */ + void addPeerToHFileRefs(String peerId) throws ReplicationException; + + /** + * Remove a peer from hfile reference queue. + * @param peerId peer cluster id to be removed + */ + void removePeerFromHFileRefs(String peerId) throws ReplicationException; + + /** + * Add new hfile references to the queue. + * @param peerId peer cluster id to which the hfiles need to be replicated + * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which + * will be added in the queue } + * @throws ReplicationException if fails to add a hfile reference + */ + void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException; + + /** + * Remove hfile references from the queue. + * @param peerId peer cluster id from which this hfile references needs to be removed + * @param files list of hfile references to be removed + */ + void removeHFileRefs(String peerId, List<String> files) throws ReplicationException; + + /** + * Get the change version number of replication hfile references node. This can be used as + * optimistic locking to get a consistent snapshot of the replication queues of hfile references. + * @return change version number of hfile references node + */ + int getHFileRefsNodeChangeVersion() throws ReplicationException; + + /** + * Get list of all peers from hfile reference queue. + * @return a list of peer ids + */ + List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException; + + /** + * Get a list of all hfile references in the given peer. + * @param peerId a String that identifies the peer + * @return a list of hfile references + */ + List<String> getReplicableHFiles(String peerId) throws ReplicationException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1b3580/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java index 4e9479f..a48683e 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -63,7 +63,6 @@ public abstract class ReplicationStateZKBase { protected final Configuration conf; protected final Abortable abortable; - // Public for testing public static final byte[] ENABLED_ZNODE_BYTES = toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); public static final byte[] DISABLED_ZNODE_BYTES = http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1b3580/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java new file mode 100644 index 0000000..60d0749 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Used to create replication storage(peer, queue) classes. + * <p> + * For now we only have zk based implementation. + */ +@InterfaceAudience.Private +public class ReplicationStorageFactory { + + private ReplicationStorageFactory() { + } + + /** + * Create a new {@link ReplicationPeerStorage}. + */ + public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, Configuration conf) { + return new ZKReplicationPeerStorage(zk, conf); + } + + /** + * Create a new {@link ReplicationQueueStorage}. + */ + public static ReplicationQueueStorage getReplicationQueueStorage(ZKWatcher zk, + Configuration conf) { + return new ZKReplicationQueueStorage(zk, conf); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1b3580/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java new file mode 100644 index 0000000..49af4c3 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.util.CollectionUtils; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; + +/** + * ZK based replication peer storage. + */ +@InterfaceAudience.Private +class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements ReplicationPeerStorage { + + private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationPeerStorage.class); + + public static final byte[] ENABLED_ZNODE_BYTES = + toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); + public static final byte[] DISABLED_ZNODE_BYTES = + toByteArray(ReplicationProtos.ReplicationState.State.DISABLED); + + /** + * The name of the znode that contains the replication status of a remote slave (i.e. peer) + * cluster. + */ + private final String peerStateNodeName; + + /** + * The name of the znode that contains a list of all remote slave (i.e. peer) clusters. + */ + private final String peersZNode; + + public ZKReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) { + super(zookeeper, conf); + this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); + String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); + this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName); + } + + private String getPeerStateNode(String peerId) { + return ZNodePaths.joinZNode(getPeerNode(peerId), peerStateNodeName); + } + + private String getPeerNode(String peerId) { + return ZNodePaths.joinZNode(peersZNode, peerId); + } + + @Override + public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + throws ReplicationException { + try { + ZKUtil.createWithParents(zookeeper, peersZNode); + ZKUtil.multiOrSequential(zookeeper, + Arrays.asList( + ZKUtilOp.createAndFailSilent(getPeerNode(peerId), + ReplicationPeerConfigUtil.toByteArray(peerConfig)), + ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId), + enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)), + false); + } catch (KeeperException e) { + throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>" + + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e); + } + } + + @Override + public void removePeer(String peerId) throws ReplicationException { + try { + ZKUtil.deleteNodeRecursively(zookeeper, getPeerNode(peerId)); + } catch (KeeperException e) { + throw new ReplicationException("Could not remove peer with id=" + peerId, e); + } + } + + @Override + public void setPeerState(String peerId, boolean enabled) throws ReplicationException { + byte[] stateBytes = enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES; + try { + ZKUtil.setData(zookeeper, getPeerStateNode(peerId), stateBytes); + } catch (KeeperException e) { + throw new ReplicationException("Unable to change state of the peer with id=" + peerId, e); + } + } + + @Override + public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) + throws ReplicationException { + try { + ZKUtil.setData(this.zookeeper, getPeerNode(peerId), + ReplicationPeerConfigUtil.toByteArray(peerConfig)); + } catch (KeeperException e) { + throw new ReplicationException( + "There was a problem trying to save changes to the " + "replication peer " + peerId, e); + } + } + + @Override + public List<String> listPeerIds() throws ReplicationException { + try { + return CollectionUtils.nullToEmpty(ZKUtil.listChildrenAndWatchThem(zookeeper, peersZNode)); + } catch (KeeperException e) { + throw new ReplicationException("Cannot get the list of peers", e); + } + } + + @Override + public boolean isPeerEnabled(String peerId) throws ReplicationException { + try { + return Arrays.equals(ENABLED_ZNODE_BYTES, + ZKUtil.getData(zookeeper, getPeerStateNode(peerId))); + } catch (KeeperException | InterruptedException e) { + throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e); + } + } + + @Override + public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) throws ReplicationException { + byte[] data; + try { + data = ZKUtil.getData(zookeeper, getPeerNode(peerId)); + } catch (KeeperException | InterruptedException e) { + throw new ReplicationException("Error getting configuration for peer with id=" + peerId, e); + } + if (data == null || data.length == 0) { + return Optional.empty(); + } + try { + return Optional.of(ReplicationPeerConfigUtil.parsePeerFrom(data)); + } catch (DeserializationException e) { + LOG.warn("Failed to parse replication peer config for peer with id=" + peerId, e); + return Optional.empty(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1b3580/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java new file mode 100644 index 0000000..7015d7f --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -0,0 +1,425 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.hbase.util.CollectionUtils.nullToEmpty; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CollectionUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.BadVersionException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.KeeperException.NotEmptyException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; + +/** + * ZK based replication queue storage. + */ +@InterfaceAudience.Private +class ZKReplicationQueueStorage extends ZKReplicationStorageBase + implements ReplicationQueueStorage { + + private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class); + + public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY = + "zookeeper.znode.replication.hfile.refs"; + public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; + + /** + * The name of the znode that contains all replication queues + */ + private final String queuesZNode; + + /** + * The name of the znode that contains queues of hfile references to be replicated + */ + private final String hfileRefsZNode; + + public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) { + super(zookeeper, conf); + + String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); + String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, + ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); + this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName); + this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName); + } + + private String getRsNode(ServerName serverName) { + return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName()); + } + + private String getQueueNode(ServerName serverName, String queueId) { + return ZNodePaths.joinZNode(getRsNode(serverName), queueId); + } + + private String getFileNode(String queueNode, String fileName) { + return ZNodePaths.joinZNode(queueNode, fileName); + } + + private String getFileNode(ServerName serverName, String queueId, String fileName) { + return getFileNode(getQueueNode(serverName, queueId), fileName); + } + + @Override + public void removeQueue(ServerName serverName, String queueId) throws ReplicationException { + try { + ZKUtil.deleteNodeRecursively(zookeeper, getQueueNode(serverName, queueId)); + } catch (KeeperException e) { + throw new ReplicationException( + "Failed to delete queue (serverName=" + serverName + ", queueId=" + queueId + ")", e); + } + } + + @Override + public void addWAL(ServerName serverName, String queueId, String fileName) + throws ReplicationException { + try { + ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, fileName)); + } catch (KeeperException e) { + throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ")", e); + } + } + + @Override + public void removeWAL(ServerName serverName, String queueId, String fileName) + throws ReplicationException { + String fileNode = getFileNode(serverName, queueId, fileName); + try { + ZKUtil.deleteNode(zookeeper, fileNode); + } catch (NoNodeException e) { + LOG.warn(fileNode + " has already been deleted when removing log"); + } catch (KeeperException e) { + throw new ReplicationException("Failed to remove wal from queue (serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ")", e); + } + } + + @Override + public void setWALPosition(ServerName serverName, String queueId, String fileName, long position) + throws ReplicationException { + try { + ZKUtil.setData(zookeeper, getFileNode(serverName, queueId, fileName), + ZKUtil.positionToByteArray(position)); + } catch (KeeperException e) { + throw new ReplicationException("Failed to set log position (serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e); + } + } + + @Override + public long getWALPosition(ServerName serverName, String queueId, String fileName) + throws ReplicationException { + byte[] bytes; + try { + bytes = ZKUtil.getData(zookeeper, getFileNode(serverName, queueId, fileName)); + } catch (KeeperException | InterruptedException e) { + throw new ReplicationException("Failed to get log position (serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ")", e); + } + try { + return ZKUtil.parseWALPositionFrom(bytes); + } catch (DeserializationException de) { + LOG.warn("Failed to parse log position (serverName=" + serverName + ", queueId=" + queueId + + ", fileName=" + fileName + ")"); + } + // if we can not parse the position, start at the beginning of the wal file again + return 0; + } + + @Override + public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId, + ServerName destServerName) throws ReplicationException { + LOG.info( + "Atomically moving " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName); + try { + ZKUtil.createWithParents(zookeeper, getRsNode(destServerName)); + } catch (KeeperException e) { + throw new ReplicationException( + "Claim queue queueId=" + queueId + " from " + sourceServerName + " to " + destServerName + + " failed when creating the node for " + destServerName, + e); + } + try { + String oldQueueNode = getQueueNode(sourceServerName, queueId); + List<String> wals = ZKUtil.listChildrenNoWatch(zookeeper, oldQueueNode); + String newQueueId = queueId + "-" + sourceServerName; + if (CollectionUtils.isEmpty(wals)) { + ZKUtil.deleteNodeFailSilent(zookeeper, oldQueueNode); + LOG.info("Removed " + sourceServerName + "/" + queueId + " since it's empty"); + return new Pair<>(newQueueId, Collections.emptySortedSet()); + } + String newQueueNode = getQueueNode(destServerName, newQueueId); + List<ZKUtilOp> listOfOps = new ArrayList<>(); + SortedSet<String> logQueue = new TreeSet<>(); + // create the new cluster znode + listOfOps.add(ZKUtilOp.createAndFailSilent(newQueueNode, HConstants.EMPTY_BYTE_ARRAY)); + // get the offset of the logs and set it to new znodes + for (String wal : wals) { + String oldWalNode = getFileNode(oldQueueNode, wal); + byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalNode); + if (LOG.isDebugEnabled()) { + LOG.debug("Creating " + wal + " with data " + Bytes.toStringBinary(logOffset)); + } + String newWalNode = getFileNode(newQueueNode, wal); + listOfOps.add(ZKUtilOp.createAndFailSilent(newWalNode, logOffset)); + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalNode)); + logQueue.add(wal); + } + // add delete op for peer + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldQueueNode)); + + if (LOG.isTraceEnabled()) { + LOG.trace("The multi list size is: " + listOfOps.size()); + } + ZKUtil.multiOrSequential(zookeeper, listOfOps, false); + + LOG.info( + "Atomically moved " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName); + return new Pair<>(newQueueId, logQueue); + } catch (NoNodeException | NodeExistsException | NotEmptyException | BadVersionException e) { + // Multi call failed; it looks like some other regionserver took away the logs. + // These exceptions mean that zk tells us the request can not be execute so it is safe to just + // return a null. For other types of exception should be thrown out to notify the upper layer. + LOG.info( + "Claim queue queueId=" + queueId + " from " + sourceServerName + " to " + destServerName + + " failed with " + e.toString() + ", maybe someone else has already took away the logs"); + return null; + } catch (KeeperException | InterruptedException e) { + throw new ReplicationException("Claim queue queueId=" + queueId + " from " + + sourceServerName + " to " + destServerName + " failed", e); + } + } + + @Override + public void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException { + try { + ZKUtil.deleteNodeFailSilent(zookeeper, getRsNode(serverName)); + } catch (NotEmptyException e) { + // keep silence to avoid logging too much. + } catch (KeeperException e) { + throw new ReplicationException("Failed to remove replicator for " + serverName, e); + } + } + + private List<ServerName> getListOfReplicators0() throws KeeperException { + return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode)).stream() + .map(ServerName::parseServerName).collect(toList()); + } + + @Override + public List<ServerName> getListOfReplicators() throws ReplicationException { + try { + return getListOfReplicators0(); + } catch (KeeperException e) { + throw new ReplicationException("Failed to get list of replicators", e); + } + } + + private List<String> getLogsInQueue0(ServerName serverName, String queueId) + throws KeeperException { + return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, queueId))); + } + + private List<String> getAllQueues0(ServerName serverName) throws KeeperException { + return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName))); + } + + @Override + public List<String> getAllQueues(ServerName serverName) throws ReplicationException { + try { + return getAllQueues0(serverName); + } catch (KeeperException e) { + throw new ReplicationException("Failed to get all queues (serverName=" + serverName + ")", e); + } + } + + private int getQueuesZNodeCversion() throws KeeperException { + Stat stat = new Stat(); + ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); + return stat.getCversion(); + } + + @Override + public Set<String> getAllWALs() throws ReplicationException { + try { + for (int retry = 0;; retry++) { + int v0 = getQueuesZNodeCversion(); + List<ServerName> rss = getListOfReplicators0(); + if (rss.isEmpty()) { + LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); + return Collections.emptySet(); + } + Set<String> wals = Sets.newHashSet(); + for (ServerName rs : rss) { + for (String queueId : getAllQueues0(rs)) { + wals.addAll(getLogsInQueue0(rs, queueId)); + } + } + int v1 = getQueuesZNodeCversion(); + if (v0 == v1) { + return wals; + } + LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d", + v0, v1, retry)); + } + } catch (KeeperException e) { + throw new ReplicationException("Failed to get all wals", e); + } + } + + private String getHFileRefsPeerNode(String peerId) { + return ZNodePaths.joinZNode(hfileRefsZNode, peerId); + } + + private String getHFileNode(String peerNode, String fileName) { + return ZNodePaths.joinZNode(peerNode, fileName); + } + + @Override + public void addPeerToHFileRefs(String peerId) throws ReplicationException { + String peerNode = getHFileRefsPeerNode(peerId); + try { + if (ZKUtil.checkExists(zookeeper, peerNode) == -1) { + LOG.info("Adding peer " + peerId + " to hfile reference queue."); + ZKUtil.createWithParents(zookeeper, peerNode); + } + } catch (KeeperException e) { + throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.", + e); + } + } + + @Override + public void removePeerFromHFileRefs(String peerId) throws ReplicationException { + String peerNode = getHFileRefsPeerNode(peerId); + try { + if (ZKUtil.checkExists(zookeeper, peerNode) == -1) { + if (LOG.isDebugEnabled()) { + LOG.debug("Peer " + peerNode + " not found in hfile reference queue."); + } + } else { + LOG.info("Removing peer " + peerNode + " from hfile reference queue."); + ZKUtil.deleteNodeRecursively(zookeeper, peerNode); + } + } catch (KeeperException e) { + throw new ReplicationException( + "Failed to remove peer " + peerId + " from hfile reference queue.", e); + } + } + + @Override + public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) + throws ReplicationException { + String peerNode = getHFileRefsPeerNode(peerId); + boolean debugEnabled = LOG.isDebugEnabled(); + if (debugEnabled) { + LOG.debug("Adding hfile references " + pairs + " in queue " + peerNode); + } + List<ZKUtilOp> listOfOps = + pairs.stream().map(p -> p.getSecond().getName()).map(n -> getHFileNode(peerNode, n)) + .map(f -> ZKUtilOp.createAndFailSilent(f, HConstants.EMPTY_BYTE_ARRAY)).collect(toList()); + if (debugEnabled) { + LOG.debug("The multi list size for adding hfile references in zk for node " + peerNode + + " is " + listOfOps.size()); + } + try { + ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); + } catch (KeeperException e) { + throw new ReplicationException("Failed to add hfile reference to peer " + peerId, e); + } + } + + @Override + public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException { + String peerNode = getHFileRefsPeerNode(peerId); + boolean debugEnabled = LOG.isDebugEnabled(); + if (debugEnabled) { + LOG.debug("Removing hfile references " + files + " from queue " + peerNode); + } + + List<ZKUtilOp> listOfOps = files.stream().map(n -> getHFileNode(peerNode, n)) + .map(ZKUtilOp::deleteNodeFailSilent).collect(toList()); + if (debugEnabled) { + LOG.debug("The multi list size for removing hfile references in zk for node " + peerNode + + " is " + listOfOps.size()); + } + try { + ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true); + } catch (KeeperException e) { + throw new ReplicationException("Failed to remove hfile reference from peer " + peerId, e); + } + } + + @Override + public int getHFileRefsNodeChangeVersion() throws ReplicationException { + Stat stat = new Stat(); + try { + ZKUtil.getDataNoWatch(zookeeper, hfileRefsZNode, stat); + } catch (KeeperException e) { + throw new ReplicationException("Failed to get stat of replication hfile references node.", e); + } + return stat.getCversion(); + } + + @Override + public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException { + try { + return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode)); + } catch (KeeperException e) { + throw new ReplicationException("Failed to get list of all peers in hfile references node.", + e); + } + } + + @Override + public List<String> getReplicableHFiles(String peerId) throws ReplicationException { + try { + return nullToEmpty(ZKUtil.listChildrenNoWatch(this.zookeeper, getHFileRefsPeerNode(peerId))); + } catch (KeeperException e) { + throw new ReplicationException("Failed to get list of hfile references for peer " + peerId, + e); + } + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1b3580/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java new file mode 100644 index 0000000..b8a2044 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.zookeeper.ZNodePaths; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; + +/** + * This is a base class for maintaining replication related data,for example, peer, queue, etc, in + * zookeeper. + */ +@InterfaceAudience.Private +class ZKReplicationStorageBase { + + /** The name of the base znode that contains all replication state. */ + protected final String replicationZNode; + + protected final ZKWatcher zookeeper; + protected final Configuration conf; + + protected ZKReplicationStorageBase(ZKWatcher zookeeper, Configuration conf) { + this.zookeeper = zookeeper; + this.conf = conf; + String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); + + this.replicationZNode = + ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, replicationZNodeName); + + } + + /** + * Serialized protobuf of <code>state</code> with pb magic prefix prepended suitable for use as + * content of a peer-state znode under a peer cluster id as in + * /hbase/replication/peers/PEER_ID/peer-state. + */ + protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.State state) { + ReplicationProtos.ReplicationState msg = + ReplicationProtos.ReplicationState.newBuilder().setState(state).build(); + // There is no toByteArray on this pb Message? + // 32 bytes is default which seems fair enough here. + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + CodedOutputStream cos = CodedOutputStream.newInstance(baos, 16); + msg.writeTo(cos); + cos.flush(); + baos.flush(); + return ProtobufUtil.prependPBMagic(baos.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1b3580/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java new file mode 100644 index 0000000..a3be1e6 --- /dev/null +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.stream.Stream; + +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestZKReplicationPeerStorage { + + private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); + + private static ZKReplicationPeerStorage STORAGE; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniZKCluster(); + STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDown() throws IOException { + UTIL.shutdownMiniZKCluster(); + } + + private Set<String> randNamespaces(Random rand) { + return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5)) + .collect(toSet()); + } + + private Map<TableName, List<String>> randTableCFs(Random rand) { + int size = rand.nextInt(5); + Map<TableName, List<String>> map = new HashMap<>(); + for (int i = 0; i < size; i++) { + TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong())); + List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong())) + .limit(rand.nextInt(5)).collect(toList()); + map.put(tn, cfs); + } + return map; + } + + private ReplicationPeerConfig getConfig(int seed) { + Random rand = new Random(seed); + ReplicationPeerConfig config = new ReplicationPeerConfig(); + config.setClusterKey(Long.toHexString(rand.nextLong())); + config.setReplicationEndpointImpl(Long.toHexString(rand.nextLong())); + config.setNamespaces(randNamespaces(rand)); + config.setExcludeNamespaces(randNamespaces(rand)); + config.setTableCFsMap(randTableCFs(rand)); + config.setReplicateAllUserTables(rand.nextBoolean()); + config.setBandwidth(rand.nextInt(1000)); + return config; + } + + private void assertSetEquals(Set<String> expected, Set<String> actual) { + if (expected == null || expected.size() == 0) { + assertTrue(actual == null || actual.size() == 0); + return; + } + assertEquals(expected.size(), actual.size()); + expected.forEach(s -> assertTrue(actual.contains(s))); + } + + private void assertMapEquals(Map<TableName, List<String>> expected, + Map<TableName, List<String>> actual) { + if (expected == null || expected.size() == 0) { + assertTrue(actual == null || actual.size() == 0); + return; + } + assertEquals(expected.size(), actual.size()); + expected.forEach((expectedTn, expectedCFs) -> { + List<String> actualCFs = actual.get(expectedTn); + if (expectedCFs == null || expectedCFs.size() == 0) { + assertTrue(actual.containsKey(expectedTn)); + assertTrue(actualCFs == null || actualCFs.size() == 0); + } else { + assertNotNull(actualCFs); + assertEquals(expectedCFs.size(), actualCFs.size()); + for (Iterator<String> expectedIt = expectedCFs.iterator(), actualIt = actualCFs.iterator(); + expectedIt.hasNext();) { + assertEquals(expectedIt.next(), actualIt.next()); + } + } + }); + } + + private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) { + assertEquals(expected.getClusterKey(), actual.getClusterKey()); + assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl()); + assertSetEquals(expected.getNamespaces(), actual.getNamespaces()); + assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces()); + assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap()); + assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap()); + assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); + assertEquals(expected.getBandwidth(), actual.getBandwidth()); + } + + @Test + public void test() throws ReplicationException { + int peerCount = 10; + for (int i = 0; i < peerCount; i++) { + STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0); + } + List<String> peerIds = STORAGE.listPeerIds(); + assertEquals(peerCount, peerIds.size()); + for (String peerId : peerIds) { + int seed = Integer.parseInt(peerId); + assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId).get()); + } + for (int i = 0; i < peerCount; i++) { + STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1)); + } + for (String peerId : peerIds) { + int seed = Integer.parseInt(peerId); + assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId).get()); + } + for (int i = 0; i < peerCount; i++) { + assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i))); + } + for (int i = 0; i < peerCount; i++) { + STORAGE.setPeerState(Integer.toString(i), i % 2 != 0); + } + for (int i = 0; i < peerCount; i++) { + assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i))); + } + String toRemove = Integer.toString(peerCount / 2); + STORAGE.removePeer(toRemove); + peerIds = STORAGE.listPeerIds(); + assertEquals(peerCount - 1, peerIds.size()); + assertFalse(peerIds.contains(toRemove)); + assertFalse(STORAGE.getPeerConfig(toRemove).isPresent()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1b3580/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java new file mode 100644 index 0000000..d5bba0d --- /dev/null +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; + +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestZKReplicationQueueStorage { + private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); + + private static ZKReplicationQueueStorage STORAGE; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniZKCluster(); + STORAGE = new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDown() throws IOException { + UTIL.shutdownMiniZKCluster(); + } + + @After + public void tearDownAfterTest() throws ReplicationException { + for (ServerName serverName : STORAGE.getListOfReplicators()) { + for (String queue : STORAGE.getAllQueues(serverName)) { + STORAGE.removeQueue(serverName, queue); + } + STORAGE.removeReplicatorIfQueueIsEmpty(serverName); + } + for (String peerId : STORAGE.getAllPeersFromHFileRefsQueue()) { + STORAGE.removePeerFromHFileRefs(peerId); + } + } + + private ServerName getServerName(int i) { + return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i); + } + + @Test + public void testReplicator() throws ReplicationException { + assertTrue(STORAGE.getListOfReplicators().isEmpty()); + String queueId = "1"; + for (int i = 0; i < 10; i++) { + STORAGE.addWAL(getServerName(i), queueId, "file" + i); + } + List<ServerName> replicators = STORAGE.getListOfReplicators(); + assertEquals(10, replicators.size()); + for (int i = 0; i < 10; i++) { + assertThat(replicators, hasItems(getServerName(i))); + } + for (int i = 0; i < 5; i++) { + STORAGE.removeQueue(getServerName(i), queueId); + } + for (int i = 0; i < 10; i++) { + STORAGE.removeReplicatorIfQueueIsEmpty(getServerName(i)); + } + replicators = STORAGE.getListOfReplicators(); + assertEquals(5, replicators.size()); + for (int i = 5; i < 10; i++) { + assertThat(replicators, hasItems(getServerName(i))); + } + } + + private String getFileName(String base, int i) { + return String.format(base + "-%04d", i); + } + + @Test + public void testAddRemoveLog() throws ReplicationException { + ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); + assertTrue(STORAGE.getAllQueues(serverName1).isEmpty()); + String queue1 = "1"; + String queue2 = "2"; + for (int i = 0; i < 10; i++) { + STORAGE.addWAL(serverName1, queue1, getFileName("file1", i)); + STORAGE.addWAL(serverName1, queue2, getFileName("file2", i)); + } + List<String> queueIds = STORAGE.getAllQueues(serverName1); + assertEquals(2, queueIds.size()); + assertThat(queueIds, hasItems("1", "2")); + + for (int i = 0; i < 10; i++) { + assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i))); + assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i))); + STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100); + STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10); + } + + for (int i = 0; i < 10; i++) { + assertEquals((i + 1) * 100, + STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i))); + assertEquals((i + 1) * 100 + 10, + STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i))); + } + + for (int i = 0; i < 10; i++) { + if (i % 2 == 0) { + STORAGE.removeWAL(serverName1, queue1, getFileName("file1", i)); + } else { + STORAGE.removeWAL(serverName1, queue2, getFileName("file2", i)); + } + } + + queueIds = STORAGE.getAllQueues(serverName1); + assertEquals(2, queueIds.size()); + assertThat(queueIds, hasItems("1", "2")); + + ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001); + Pair<String, SortedSet<String>> peer1 = STORAGE.claimQueue(serverName1, "1", serverName2); + + assertEquals("1-" + serverName1.getServerName(), peer1.getFirst()); + assertEquals(5, peer1.getSecond().size()); + int i = 1; + for (String wal : peer1.getSecond()) { + assertEquals(getFileName("file1", i), wal); + assertEquals((i + 1) * 100, + STORAGE.getWALPosition(serverName2, peer1.getFirst(), getFileName("file1", i))); + i += 2; + } + + queueIds = STORAGE.getAllQueues(serverName1); + assertEquals(1, queueIds.size()); + assertThat(queueIds, hasItems("2")); + + queueIds = STORAGE.getAllQueues(serverName2); + assertEquals(1, queueIds.size()); + assertThat(queueIds, hasItems(peer1.getFirst())); + + Set<String> allWals = STORAGE.getAllWALs(); + assertEquals(10, allWals.size()); + for (i = 0; i < 10; i++) { + assertThat(allWals, hasItems(i % 2 == 0 ? getFileName("file2", i) : getFileName("file1", i))); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1b3580/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 4032a71..034b71c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -133,7 +134,7 @@ import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure; import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure; import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure; import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure; -import org.apache.hadoop.hbase.master.replication.ReplicationManager; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.mob.MobConstants; @@ -327,7 +328,7 @@ public class HMaster extends HRegionServer implements MasterServices { private AssignmentManager assignmentManager; // manager of replication - private ReplicationManager replicationManager; + private ReplicationPeerManager replicationPeerManager; // buffer for "fatal error" notices from region servers // in the cluster. This is only used for assisting @@ -718,8 +719,8 @@ public class HMaster extends HRegionServer implements MasterServices { /** * Initialize all ZK based system trackers. */ - void initializeZKBasedSystemTrackers() throws IOException, - InterruptedException, KeeperException { + void initializeZKBasedSystemTrackers() + throws IOException, InterruptedException, KeeperException, ReplicationException { this.balancer = LoadBalancerFactory.getLoadBalancer(conf); this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf); this.normalizer.setMasterServices(this); @@ -737,7 +738,7 @@ public class HMaster extends HRegionServer implements MasterServices { this.assignmentManager = new AssignmentManager(this); this.assignmentManager.start(); - this.replicationManager = new ReplicationManager(conf, zooKeeper, this); + this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf); this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager); this.regionServerTracker.start(); @@ -783,8 +784,7 @@ public class HMaster extends HRegionServer implements MasterServices { * </ol> */ private void finishActiveMasterInitialization(MonitoredTask status) - throws IOException, InterruptedException, KeeperException { - + throws IOException, InterruptedException, KeeperException, ReplicationException { Thread zombieDetector = new Thread(new InitializationMonitor(this), "ActiveMasterInitializationMonitor-" + System.currentTimeMillis()); zombieDetector.setDaemon(true); @@ -3414,18 +3414,19 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override - public ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException, - IOException { + public ReplicationPeerConfig getReplicationPeerConfig(String peerId) + throws ReplicationException, IOException { if (cpHost != null) { cpHost.preGetReplicationPeerConfig(peerId); } - final ReplicationPeerConfig peerConfig = this.replicationManager.getPeerConfig(peerId); - LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId + ", config=" - + peerConfig); + LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId); + Optional<ReplicationPeerConfig> peerConfig = + this.replicationPeerManager.getPeerConfig(peerId); + if (cpHost != null) { cpHost.postGetReplicationPeerConfig(peerId); } - return peerConfig; + return peerConfig.orElse(null); } @Override @@ -3444,7 +3445,8 @@ public class HMaster extends HRegionServer implements MasterServices { } LOG.info(getClientIdAuditPrefix() + " list replication peers, regex=" + regex); Pattern pattern = regex == null ? null : Pattern.compile(regex); - List<ReplicationPeerDescription> peers = this.replicationManager.listReplicationPeers(pattern); + List<ReplicationPeerDescription> peers = + this.replicationPeerManager.listPeers(pattern); if (cpHost != null) { cpHost.postListReplicationPeers(regex); } @@ -3593,8 +3595,8 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override - public ReplicationManager getReplicationManager() { - return replicationManager; + public ReplicationPeerManager getReplicationPeerManager() { + return replicationPeerManager; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1b3580/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index b0bf9ca..0e552d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.replication.ReplicationManager; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.LockedResource; @@ -459,9 +459,9 @@ public interface MasterServices extends Server { IOException; /** - * Returns the {@link ReplicationManager}. + * Returns the {@link ReplicationPeerManager}. */ - ReplicationManager getReplicationManager(); + ReplicationPeerManager getReplicationPeerManager(); /** * Update the peerConfig for the specified peer http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1b3580/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 19e6f9b..7fb187f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -24,24 +24,24 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.conf.ConfigurationObserver; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.assignment.AssignmentManager; -import org.apache.hadoop.hbase.master.replication.ReplicationManager; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.Superusers; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @InterfaceAudience.Private @InterfaceStability.Evolving @@ -138,8 +138,8 @@ public class MasterProcedureEnv implements ConfigurationObserver { return remoteDispatcher; } - public ReplicationManager getReplicationManager() { - return master.getReplicationManager(); + public ReplicationPeerManager getReplicationPeerManager() { + return master.getReplicationPeerManager(); } public boolean isRunning() { @@ -151,22 +151,22 @@ public class MasterProcedureEnv implements ConfigurationObserver { return master.isInitialized(); } - public boolean waitInitialized(Procedure proc) { + public boolean waitInitialized(Procedure<?> proc) { return master.getInitializedEvent().suspendIfNotReady(proc); } - public boolean waitServerCrashProcessingEnabled(Procedure proc) { + public boolean waitServerCrashProcessingEnabled(Procedure<?> proc) { if (master instanceof HMaster) { return ((HMaster)master).getServerCrashProcessingEnabledEvent().suspendIfNotReady(proc); } return false; } - public boolean waitFailoverCleanup(Procedure proc) { + public boolean waitFailoverCleanup(Procedure<?> proc) { return master.getAssignmentManager().getFailoverCleanupEvent().suspendIfNotReady(proc); } - public void setEventReady(ProcedureEvent event, boolean isReady) { + public void setEventReady(ProcedureEvent<?> event, boolean isReady) { if (isReady) { event.wake(procSched); } else { http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1b3580/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java index 066c3e7..a4f9b32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java @@ -58,16 +58,18 @@ public class AddPeerProcedure extends ModifyPeerProcedure { } @Override - protected void prePeerModification(MasterProcedureEnv env) throws IOException { + protected void prePeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException { MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { cpHost.preAddReplicationPeer(peerId, peerConfig); } + env.getReplicationPeerManager().preAddPeer(peerId, peerConfig); } @Override protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { - env.getReplicationManager().addReplicationPeer(peerId, peerConfig, enabled); + env.getReplicationPeerManager().addPeer(peerId, peerConfig, enabled); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1b3580/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java index 9a28de6..10e35a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/DisablePeerProcedure.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,12 +52,12 @@ public class DisablePeerProcedure extends ModifyPeerProcedure { if (cpHost != null) { cpHost.preDisableReplicationPeer(peerId); } + env.getReplicationPeerManager().preDisablePeer(peerId); } @Override - protected void updatePeerStorage(MasterProcedureEnv env) - throws IllegalArgumentException, Exception { - env.getReplicationManager().disableReplicationPeer(peerId); + protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { + env.getReplicationPeerManager().disablePeer(peerId); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1b3580/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java index 4855901..f2a9f01 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/EnablePeerProcedure.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,11 +52,12 @@ public class EnablePeerProcedure extends ModifyPeerProcedure { if (cpHost != null) { cpHost.preEnableReplicationPeer(peerId); } + env.getReplicationPeerManager().preEnablePeer(peerId); } @Override - protected void updatePeerStorage(MasterProcedureEnv env) throws Exception { - env.getReplicationManager().enableReplicationPeer(peerId); + protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { + env.getReplicationPeerManager().enablePeer(peerId); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1b3580/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index c4552ed..279fbc7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; @@ -27,6 +26,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,17 +67,16 @@ public abstract class ModifyPeerProcedure } /** - * Called before we start the actual processing. If an exception is thrown then we will give up - * and mark the procedure as failed directly. + * Called before we start the actual processing. The implementation should call the pre CP hook, + * and also the pre-check for the peer modification. + * <p> + * If an IOException is thrown then we will give up and mark the procedure as failed directly. If + * all checks passes then the procedure can not be rolled back any more. */ - protected abstract void prePeerModification(MasterProcedureEnv env) throws IOException; + protected abstract void prePeerModification(MasterProcedureEnv env) + throws IOException, ReplicationException; - /** - * We will give up and mark the procedure as failure if {@link IllegalArgumentException} is - * thrown, for other type of Exception we will retry. - */ - protected abstract void updatePeerStorage(MasterProcedureEnv env) - throws IllegalArgumentException, Exception; + protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException; /** * Called before we finish the procedure. The implementation can do some logging work, and also @@ -100,23 +99,24 @@ public abstract class ModifyPeerProcedure try { prePeerModification(env); } catch (IOException e) { - LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId + - ", mark the procedure as failure and give up", e); - setFailure("prePeerModification", e); + LOG.warn( + getClass().getName() + " failed to call CP hook or the pre check is failed for peer " + + peerId + ", mark the procedure as failure and give up", + e); + setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e); releaseLatch(); return Flow.NO_MORE_STATE; + } catch (ReplicationException e) { + LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId + + ", retry", e); + throw new ProcedureYieldException(); } setNextState(PeerModificationState.UPDATE_PEER_STORAGE); return Flow.HAS_MORE_STATE; case UPDATE_PEER_STORAGE: try { updatePeerStorage(env); - } catch (IllegalArgumentException e) { - setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", - new DoNotRetryIOException(e)); - releaseLatch(); - return Flow.NO_MORE_STATE; - } catch (Exception e) { + } catch (ReplicationException e) { LOG.warn( getClass().getName() + " update peer storage for peer " + peerId + " failed, retry", e); throw new ProcedureYieldException(); @@ -158,8 +158,7 @@ public abstract class ModifyPeerProcedure @Override protected void rollbackState(MasterProcedureEnv env, PeerModificationState state) throws IOException, InterruptedException { - if (state == PeerModificationState.PRE_PEER_MODIFICATION || - state == PeerModificationState.UPDATE_PEER_STORAGE) { + if (state == PeerModificationState.PRE_PEER_MODIFICATION) { // actually the peer related operations has no rollback, but if we haven't done any // modifications on the peer storage, we can just return. return; http://git-wip-us.apache.org/repos/asf/hbase/blob/2f1b3580/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java index d40df02..6e9c384 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,11 +52,12 @@ public class RemovePeerProcedure extends ModifyPeerProcedure { if (cpHost != null) { cpHost.preRemoveReplicationPeer(peerId); } + env.getReplicationPeerManager().preRemovePeer(peerId); } @Override - protected void updatePeerStorage(MasterProcedureEnv env) throws Exception { - env.getReplicationManager().removeReplicationPeer(peerId); + protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { + env.getReplicationPeerManager().removePeer(peerId); } @Override