http://git-wip-us.apache.org/repos/asf/hbase/blob/12d321d4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java deleted file mode 100644 index b6f8784..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java +++ /dev/null @@ -1,199 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.replication; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; -import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationFactory; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; -import org.apache.hadoop.hbase.replication.ReplicationPeers; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * Manages and performs all replication admin operations. - * <p> - * Used to add/remove a replication peer. - */ -@InterfaceAudience.Private -public class ReplicationManager { - private final ReplicationQueuesClient replicationQueuesClient; - private final ReplicationPeers replicationPeers; - - public ReplicationManager(Configuration conf, ZKWatcher zkw, Abortable abortable) - throws IOException { - try { - this.replicationQueuesClient = ReplicationFactory - .getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw)); - this.replicationQueuesClient.init(); - this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, - this.replicationQueuesClient, abortable); - this.replicationPeers.init(); - } catch (Exception e) { - throw new IOException("Failed to construct ReplicationManager", e); - } - } - - public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) - throws ReplicationException { - checkPeerConfig(peerConfig); - replicationPeers.registerPeer(peerId, peerConfig, enabled); - replicationPeers.peerConnected(peerId); - } - - public void removeReplicationPeer(String peerId) throws ReplicationException { - replicationPeers.peerDisconnected(peerId); - replicationPeers.unregisterPeer(peerId); - } - - public void enableReplicationPeer(String peerId) throws ReplicationException { - this.replicationPeers.enablePeer(peerId); - } - - public void disableReplicationPeer(String peerId) throws ReplicationException { - this.replicationPeers.disablePeer(peerId); - } - - public ReplicationPeerConfig getPeerConfig(String peerId) - throws ReplicationException, ReplicationPeerNotFoundException { - ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(peerId); - if (peerConfig == null) { - throw new ReplicationPeerNotFoundException(peerId); - } - return peerConfig; - } - - public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) - throws ReplicationException, IOException { - checkPeerConfig(peerConfig); - this.replicationPeers.updatePeerConfig(peerId, peerConfig); - } - - public List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern) - throws ReplicationException { - List<ReplicationPeerDescription> peers = new ArrayList<>(); - List<String> peerIds = replicationPeers.getAllPeerIds(); - for (String peerId : peerIds) { - if (pattern == null || (pattern != null && pattern.matcher(peerId).matches())) { - peers.add(new ReplicationPeerDescription(peerId, - replicationPeers.getStatusOfPeerFromBackingStore(peerId), - replicationPeers.getReplicationPeerConfig(peerId))); - } - } - return peers; - } - - /** - * If replicate_all flag is true, it means all user tables will be replicated to peer cluster. - * Then allow config exclude namespaces or exclude table-cfs which can't be replicated to - * peer cluster. - * - * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster. - * Then allow to config namespaces or table-cfs which will be replicated to peer cluster. - */ - private void checkPeerConfig(ReplicationPeerConfig peerConfig) { - if (peerConfig.replicateAllUserTables()) { - if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) || - (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { - throw new IllegalArgumentException("Need clean namespaces or table-cfs config firstly " + - "when you want replicate all cluster"); - } - checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), - peerConfig.getExcludeTableCFsMap()); - } else { - if ((peerConfig.getExcludeNamespaces() != null - && !peerConfig.getExcludeNamespaces().isEmpty()) - || (peerConfig.getExcludeTableCFsMap() != null - && !peerConfig.getExcludeTableCFsMap().isEmpty())) { - throw new IllegalArgumentException( - "Need clean exclude-namespaces or exclude-table-cfs config firstly" - + " when replicate_all flag is false"); - } - checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), - peerConfig.getTableCFsMap()); - } - checkConfiguredWALEntryFilters(peerConfig); - } - - /** - * Set a namespace in the peer config means that all tables in this namespace will be replicated - * to the peer cluster. - * <ol> - * <li>If peer config already has a namespace, then not allow set any table of this namespace to - * the peer config.</li> - * <li>If peer config already has a table, then not allow set this table's namespace to the peer - * config.</li> - * </ol> - * <p> - * Set a exclude namespace in the peer config means that all tables in this namespace can't be - * replicated to the peer cluster. - * <ol> - * <li>If peer config already has a exclude namespace, then not allow set any exclude table of - * this namespace to the peer config.</li> - * <li>If peer config already has a exclude table, then not allow set this table's namespace as a - * exclude namespace.</li> - * </ol> - */ - private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces, - Map<TableName, ? extends Collection<String>> tableCfs) { - if (namespaces == null || namespaces.isEmpty()) { - return; - } - if (tableCfs == null || tableCfs.isEmpty()) { - return; - } - for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { - TableName table = entry.getKey(); - if (namespaces.contains(table.getNamespaceAsString())) { - throw new IllegalArgumentException("Table-cfs " + table + " is conflict with namespaces " - + table.getNamespaceAsString() + " in peer config"); - } - } - } - - private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) { - String filterCSV = peerConfig.getConfiguration() - .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); - if (filterCSV != null && !filterCSV.isEmpty()) { - String[] filters = filterCSV.split(","); - for (String filter : filters) { - try { - Class.forName(filter).newInstance(); - } catch (Exception e) { - throw new IllegalArgumentException("Configured WALEntryFilter " + filter + - " could not be created. Failing add/update " + "peer operation.", e); - } - } - } - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/12d321d4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java new file mode 100644 index 0000000..5abd874 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Manages and performs all replication admin operations. + * <p> + * Used to add/remove a replication peer. + */ +@InterfaceAudience.Private +public final class ReplicationPeerManager { + + private final ReplicationPeerStorage peerStorage; + + private final ReplicationQueueStorage queueStorage; + + private final ConcurrentMap<String, ReplicationPeerDescription> peers; + + private ReplicationPeerManager(ReplicationPeerStorage peerStorage, + ReplicationQueueStorage queueStorage, + ConcurrentMap<String, ReplicationPeerDescription> peers) { + this.peerStorage = peerStorage; + this.queueStorage = queueStorage; + this.peers = peers; + } + + private void checkQueuesDeleted(String peerId) + throws ReplicationException, DoNotRetryIOException { + for (ServerName replicator : queueStorage.getListOfReplicators()) { + List<String> queueIds = queueStorage.getAllQueues(replicator); + for (String queueId : queueIds) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + if (queueInfo.getPeerId().equals(peerId)) { + throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId + + ", replicator: " + replicator + ", queueId: " + queueId); + } + } + } + if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) { + throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs"); + } + } + + public void preAddPeer(String peerId, ReplicationPeerConfig peerConfig) + throws DoNotRetryIOException, ReplicationException { + if (peerId.contains("-")) { + throw new DoNotRetryIOException("Found invalid peer name: " + peerId); + } + checkPeerConfig(peerConfig); + if (peers.containsKey(peerId)) { + throw new DoNotRetryIOException("Replication peer " + peerId + " already exists"); + } + // make sure that there is no queues with the same peer id. This may happen when we create a + // peer with the same id with a old deleted peer. If the replication queues for the old peer + // have not been cleaned up yet then we should not create the new peer, otherwise the old wal + // file may also be replicated. + checkQueuesDeleted(peerId); + } + + private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException { + ReplicationPeerDescription desc = peers.get(peerId); + if (desc == null) { + throw new DoNotRetryIOException("Replication peer " + peerId + " does not exist"); + } + return desc; + } + + public void preRemovePeer(String peerId) throws DoNotRetryIOException { + checkPeerExists(peerId); + } + + public void preEnablePeer(String peerId) throws DoNotRetryIOException { + ReplicationPeerDescription desc = checkPeerExists(peerId); + if (desc.isEnabled()) { + throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled"); + } + } + + public void preDisablePeer(String peerId) throws DoNotRetryIOException { + ReplicationPeerDescription desc = checkPeerExists(peerId); + if (!desc.isEnabled()) { + throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled"); + } + } + + public void preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) + throws DoNotRetryIOException { + checkPeerConfig(peerConfig); + ReplicationPeerDescription desc = checkPeerExists(peerId); + ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); + if (!StringUtils.isBlank(peerConfig.getClusterKey()) && + !peerConfig.getClusterKey().equals(oldPeerConfig.getClusterKey())) { + throw new DoNotRetryIOException( + "Changing the cluster key on an existing peer is not allowed. Existing key '" + + oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" + + peerConfig.getClusterKey() + "'"); + } + + if (!StringUtils.isBlank(peerConfig.getReplicationEndpointImpl()) && + !peerConfig.getReplicationEndpointImpl().equals(oldPeerConfig.getReplicationEndpointImpl())) { + throw new DoNotRetryIOException("Changing the replication endpoint implementation class " + + "on an existing peer is not allowed. Existing class '" + + oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + + " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); + } + } + + private ReplicationPeerConfig copy(ReplicationPeerConfig peerConfig) { + ReplicationPeerConfig copiedPeerConfig = new ReplicationPeerConfig(); + copiedPeerConfig.getConfiguration().putAll(peerConfig.getConfiguration()); + copiedPeerConfig.getPeerData().putAll(peerConfig.getPeerData()); + copiedPeerConfig.setTableCFsMap(peerConfig.getTableCFsMap()); + copiedPeerConfig.setNamespaces(peerConfig.getNamespaces()); + copiedPeerConfig.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap()); + copiedPeerConfig.setExcludeNamespaces(peerConfig.getExcludeNamespaces()); + copiedPeerConfig.setBandwidth(peerConfig.getBandwidth()); + copiedPeerConfig.setReplicateAllUserTables(peerConfig.replicateAllUserTables()); + copiedPeerConfig.setClusterKey(peerConfig.getClusterKey()); + copiedPeerConfig.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()); + return copiedPeerConfig; + } + + public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + throws ReplicationException { + if (peers.containsKey(peerId)) { + // this should be a retry, just return + return; + } + ReplicationPeerConfig copiedPeerConfig = copy(peerConfig); + peerStorage.addPeer(peerId, copiedPeerConfig, enabled); + peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig)); + } + + public void removePeer(String peerId) throws ReplicationException { + if (!peers.containsKey(peerId)) { + // this should be a retry, just return + return; + } + peerStorage.removePeer(peerId); + peers.remove(peerId); + } + + private void setPeerState(String peerId, boolean enabled) throws ReplicationException { + ReplicationPeerDescription desc = peers.get(peerId); + if (desc.isEnabled() == enabled) { + // this should be a retry, just return + return; + } + peerStorage.setPeerState(peerId, enabled); + peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig())); + } + + public void enablePeer(String peerId) throws ReplicationException { + setPeerState(peerId, true); + } + + public void disablePeer(String peerId) throws ReplicationException { + setPeerState(peerId, false); + } + + public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) + throws ReplicationException { + // the checking rules are too complicated here so we give up checking whether this is a retry. + ReplicationPeerDescription desc = peers.get(peerId); + ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); + ReplicationPeerConfig newPeerConfig = copy(peerConfig); + // we need to use the new conf to overwrite the old one. + newPeerConfig.getConfiguration().putAll(oldPeerConfig.getConfiguration()); + newPeerConfig.getConfiguration().putAll(peerConfig.getConfiguration()); + newPeerConfig.getPeerData().putAll(oldPeerConfig.getPeerData()); + newPeerConfig.getPeerData().putAll(peerConfig.getPeerData()); + + peerStorage.updatePeerConfig(peerId, newPeerConfig); + peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig)); + } + + public List<ReplicationPeerDescription> listPeers(Pattern pattern) { + if (pattern == null) { + return new ArrayList<>(peers.values()); + } + return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches()) + .collect(Collectors.toList()); + } + + public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) { + ReplicationPeerDescription desc = peers.get(peerId); + return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); + } + + /** + * If replicate_all flag is true, it means all user tables will be replicated to peer cluster. + * Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer + * cluster. + * <p> + * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster. + * Then allow to config namespaces or table-cfs which will be replicated to peer cluster. + */ + private static void checkPeerConfig(ReplicationPeerConfig peerConfig) + throws DoNotRetryIOException { + if (peerConfig.replicateAllUserTables()) { + if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) || + (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { + throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " + + "when you want replicate all cluster"); + } + checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), + peerConfig.getExcludeTableCFsMap()); + } else { + if ((peerConfig.getExcludeNamespaces() != null && + !peerConfig.getExcludeNamespaces().isEmpty()) || + (peerConfig.getExcludeTableCFsMap() != null && + !peerConfig.getExcludeTableCFsMap().isEmpty())) { + throw new DoNotRetryIOException( + "Need clean exclude-namespaces or exclude-table-cfs config firstly" + + " when replicate_all flag is false"); + } + checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), + peerConfig.getTableCFsMap()); + } + checkConfiguredWALEntryFilters(peerConfig); + } + + /** + * Set a namespace in the peer config means that all tables in this namespace will be replicated + * to the peer cluster. + * <ol> + * <li>If peer config already has a namespace, then not allow set any table of this namespace to + * the peer config.</li> + * <li>If peer config already has a table, then not allow set this table's namespace to the peer + * config.</li> + * </ol> + * <p> + * Set a exclude namespace in the peer config means that all tables in this namespace can't be + * replicated to the peer cluster. + * <ol> + * <li>If peer config already has a exclude namespace, then not allow set any exclude table of + * this namespace to the peer config.</li> + * <li>If peer config already has a exclude table, then not allow set this table's namespace as a + * exclude namespace.</li> + * </ol> + */ + private static void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces, + Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException { + if (namespaces == null || namespaces.isEmpty()) { + return; + } + if (tableCfs == null || tableCfs.isEmpty()) { + return; + } + for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { + TableName table = entry.getKey(); + if (namespaces.contains(table.getNamespaceAsString())) { + throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces " + + table.getNamespaceAsString() + " in peer config"); + } + } + } + + private static void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) + throws DoNotRetryIOException { + String filterCSV = peerConfig.getConfiguration() + .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); + if (filterCSV != null && !filterCSV.isEmpty()) { + String[] filters = filterCSV.split(","); + for (String filter : filters) { + try { + Class.forName(filter).newInstance(); + } catch (Exception e) { + throw new DoNotRetryIOException("Configured WALEntryFilter " + filter + + " could not be created. Failing add/update " + "peer operation.", e); + } + } + } + } + + public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf) + throws ReplicationException { + ReplicationPeerStorage peerStorage = + ReplicationStorageFactory.getReplicationPeerStorage(zk, conf); + ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>(); + for (String peerId : peerStorage.listPeerIds()) { + Optional<ReplicationPeerConfig> peerConfig = peerStorage.getPeerConfig(peerId); + boolean enabled = peerStorage.isPeerEnabled(peerId); + peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig.get())); + } + return new ReplicationPeerManager(peerStorage, + ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/12d321d4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java index d8154dc..a43532d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -59,12 +60,12 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure { if (cpHost != null) { cpHost.preUpdateReplicationPeerConfig(peerId, peerConfig); } + env.getReplicationPeerManager().preUpdatePeerConfig(peerId, peerConfig); } @Override - protected void updatePeerStorage(MasterProcedureEnv env) - throws IllegalArgumentException, Exception { - env.getReplicationManager().updatePeerConfig(peerId, peerConfig); + protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { + env.getReplicationPeerManager().updatePeerConfig(peerId, peerConfig); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/12d321d4/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index ba78e6d..29a577b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -17,6 +17,13 @@ */ package org.apache.hadoop.hbase.client.replication; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -25,26 +32,24 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.regex.Pattern; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; -import org.apache.hadoop.hbase.replication.ReplicationQueues; -import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.InterClusterReplicationEndpointForTest; import org.apache.hadoop.hbase.replication.TestReplicationEndpoint.ReplicationEndpointForTest; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -53,15 +58,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * Unit testing of ReplicationAdmin @@ -73,8 +69,6 @@ public class TestReplicationAdmin { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationAdmin.class); - private static final Logger LOG = LoggerFactory.getLogger(TestReplicationAdmin.class); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -109,16 +103,17 @@ public class TestReplicationAdmin { } @After - public void cleanupPeer() { - try { - hbaseAdmin.removeReplicationPeer(ID_ONE); - } catch (Exception e) { - LOG.debug("Replication peer " + ID_ONE + " may already be removed"); + public void tearDown() throws Exception { + for (ReplicationPeerDescription desc : hbaseAdmin.listReplicationPeers()) { + hbaseAdmin.removeReplicationPeer(desc.getPeerId()); } - try { - hbaseAdmin.removeReplicationPeer(ID_SECOND); - } catch (Exception e) { - LOG.debug("Replication peer " + ID_SECOND + " may already be removed"); + ReplicationQueueStorage queueStorage = ReplicationStorageFactory + .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration()); + for (ServerName serverName : queueStorage.getListOfReplicators()) { + for (String queue : queueStorage.getAllQueues(serverName)) { + queueStorage.removeQueue(serverName, queue); + } + queueStorage.removeReplicatorIfQueueIsEmpty(serverName); } } @@ -208,32 +203,29 @@ public class TestReplicationAdmin { ReplicationPeerConfig rpc2 = new ReplicationPeerConfig(); rpc2.setClusterKey(KEY_SECOND); Configuration conf = TEST_UTIL.getConfiguration(); - ZKWatcher zkw = new ZKWatcher(conf, "Test HBaseAdmin", null); - ReplicationQueues repQueues = - ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null, zkw)); - repQueues.init("server1"); + ReplicationQueueStorage queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), conf); + ServerName serverName = ServerName.valueOf("server1", 8000, 1234); // add queue for ID_ONE - repQueues.addLog(ID_ONE, "file1"); + queueStorage.addWAL(serverName, ID_ONE, "file1"); try { admin.addPeer(ID_ONE, rpc1, null); fail(); } catch (Exception e) { // OK! } - repQueues.removeQueue(ID_ONE); - assertEquals(0, repQueues.getAllQueues().size()); + queueStorage.removeQueue(serverName, ID_ONE); + assertEquals(0, queueStorage.getAllQueues(serverName).size()); // add recovered queue for ID_ONE - repQueues.addLog(ID_ONE + "-server2", "file1"); + queueStorage.addWAL(serverName, ID_ONE + "-server2", "file1"); try { admin.addPeer(ID_ONE, rpc2, null); fail(); } catch (Exception e) { // OK! } - repQueues.removeAllQueues(); - zkw.close(); } /** @@ -429,7 +421,7 @@ public class TestReplicationAdmin { tableCFs.clear(); tableCFs.put(tableName2, null); admin.removePeerTableCFs(ID_ONE, tableCFs); - assertTrue(false); + fail(); } catch (ReplicationException e) { } tableCFs.clear(); http://git-wip-us.apache.org/repos/asf/hbase/blob/12d321d4/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index e88710e..4e66676 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.master; import static org.mockito.Mockito.mock; +import com.google.protobuf.Service; import java.io.IOException; import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.ChoreService; @@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.replication.ReplicationManager; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.LockedResource; @@ -55,9 +55,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import com.google.protobuf.Service; - public class MockNoopMasterServices implements MasterServices { + private final Configuration conf; private final MetricsMaster metricsMaster; @@ -461,7 +460,7 @@ public class MockNoopMasterServices implements MasterServices { } @Override - public ProcedureEvent getInitializedEvent() { + public ProcedureEvent<?> getInitializedEvent() { return null; } @@ -476,7 +475,7 @@ public class MockNoopMasterServices implements MasterServices { } @Override - public ReplicationManager getReplicationManager() { + public ReplicationPeerManager getReplicationPeerManager() { return null; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/12d321d4/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java index 2e1c979..5769d0e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.FSUtils; @@ -275,7 +276,7 @@ public class TestMasterNoCluster { @Override void initializeZKBasedSystemTrackers() throws IOException, InterruptedException, - KeeperException, CoordinatedStateException { + KeeperException, CoordinatedStateException, ReplicationException { super.initializeZKBasedSystemTrackers(); // Record a newer server in server manager at first getServerManager().recordNewServerWithLock(newServer, http://git-wip-us.apache.org/repos/asf/hbase/blob/12d321d4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java index 5bdcd45..bfed922 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -54,9 +53,6 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase { */ @Test(timeout = 600000) public void testDisableInactivePeer() throws Exception { - - // enabling and shutdown the peer - admin.enablePeer("2"); utility2.shutdownMiniHBaseCluster(); byte[] rowkey = Bytes.toBytes("disable inactive peer");