This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase-operator-tools.git
The following commit(s) were added to refs/heads/master by this push:
new 58fe768 HBASE-29671 [hbase-operator-tools] Support different
replication peer/queue storage (#156)
58fe768 is described below
commit 58fe76868ca34fcdc634e20285118dfdb2d0340b
Author: Duo Zhang <[email protected]>
AuthorDate: Tue Nov 18 15:15:26 2025 +0800
HBASE-29671 [hbase-operator-tools] Support different replication peer/queue
storage (#156)
Signed-off-by: Peter Somogyi <[email protected]>
Signed-off-by: Nihal Jain <[email protected]>
---
.../java/org/apache/hbase/ReplicationFsck.java | 3 +-
.../hbase/ReplicationStorageFactoryHelper.java | 89 +++++++
.../java/org/apache/hbase/hbck1/HBaseFsck.java | 10 +-
.../org/apache/hbase/hbck1/ReplicationChecker.java | 262 ++++++++++++++++-----
4 files changed, 296 insertions(+), 68 deletions(-)
diff --git a/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationFsck.java
b/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationFsck.java
index 282d7ab..68b3ba2 100644
--- a/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationFsck.java
+++ b/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationFsck.java
@@ -41,11 +41,10 @@ public class ReplicationFsck implements Closeable {
int fsck(List<String> tables, boolean fix) throws IOException {
try (HBaseFsck hbaseFsck = new HBaseFsck(this.configuration)) {
+ hbaseFsck.connect();
hbaseFsck.setFixReplication(fix);
hbaseFsck.checkAndFixReplication();
if (tables != null && !tables.isEmpty()) {
- // Below needs connection to be up; uses admin.
- hbaseFsck.connect();
hbaseFsck.setCleanReplicationBarrier(fix);
for (String table : tables) {
hbaseFsck.setCleanReplicationBarrierTable(table);
diff --git
a/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationStorageFactoryHelper.java
b/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationStorageFactoryHelper.java
new file mode 100644
index 0000000..e54a2d0
--- /dev/null
+++
b/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationStorageFactoryHelper.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class for supporting different versions of HBase for creating
+ * {@link ReplicationQueueStorage} and {@link ReplicationPeerStorage}.
+ */
+public final class ReplicationStorageFactoryHelper {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReplicationStorageFactoryHelper.class);
+
+ private ReplicationStorageFactoryHelper() {
+ }
+
+ public static ReplicationPeerStorage getReplicationPeerStorage(Configuration
conf, ZKWatcher zkw,
+ FileSystem fs) {
+ // Case HBase >= 2.6.0: Invoke the method that requires three parameters
+ try {
+ Method method =
ReplicationStorageFactory.class.getMethod("getReplicationPeerStorage",
+ FileSystem.class, ZKWatcher.class, Configuration.class);
+ return (ReplicationPeerStorage) method.invoke(null, fs, zkw, conf);
+ } catch (NoSuchMethodException e) {
+ LOG.debug("No getReplicationPeerStorage method with FileSystem as a
parameter, "
+ + "should be HBase 2.6-", e);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ // getReplicationPeerStorage method does not throw any exceptions, so
should not arrive here
+ throw new RuntimeException(e);
+ }
+ // Case HBase < 2.6.0: Fall back to the method that requires only two
parameters
+ try {
+ Method method =
ReplicationStorageFactory.class.getMethod("getReplicationPeerStorage",
+ ZKWatcher.class, Configuration.class);
+ return (ReplicationPeerStorage) method.invoke(null, zkw, conf);
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static ReplicationQueueStorage
getReplicationQueueStorage(Configuration conf,
+ ZKWatcher zkw, Connection conn) {
+ try {
+ Method method =
ReplicationStorageFactory.class.getMethod("getReplicationQueueStorage",
+ Connection.class, Configuration.class);
+ return (ReplicationQueueStorage) method.invoke(null, conn, conf);
+ } catch (NoSuchMethodException e) {
+ LOG.debug("No getReplicationQueueStorage method with Connection as a
parameter, "
+ + "should be HBase 2.x", e);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ // getReplicationQueueStorage method does not throw any exceptions, so
should not arrive here
+ throw new RuntimeException(e);
+ }
+ try {
+ Method method =
ReplicationStorageFactory.class.getMethod("getReplicationQueueStorage",
+ ZKWatcher.class, Configuration.class);
+ return (ReplicationQueueStorage) method.invoke(null, zkw, conf);
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException e) {
+ // getReplicationQueueStorage method does not throw any exceptions, so
should not arrive here
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/HBaseFsck.java
b/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/HBaseFsck.java
index 2448dd4..601e913 100644
--- a/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/HBaseFsck.java
+++ b/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/HBaseFsck.java
@@ -126,7 +126,6 @@ import
org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
@@ -155,6 +154,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hbase.HBCKFsUtils;
import org.apache.hbase.HBCKMetaTableAccessor;
+import org.apache.hbase.ReplicationStorageFactoryHelper;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.zookeeper.KeeperException;
@@ -1050,9 +1050,9 @@ public class HBaseFsck extends Configured implements
Closeable {
HFile.Reader hf = null;
try {
hf = HFile.createReader(fs, hfile.getPath(), CacheConfig.DISABLED,
true, getConf());
- Optional<Cell> startKv = hf.getFirstKey();
+ Optional<? extends Cell> startKv = hf.getFirstKey();
start = CellUtil.cloneRow(startKv.get());
- Optional<Cell> endKv = hf.getLastKey();
+ Optional<? extends Cell> endKv = hf.getLastKey();
end = CellUtil.cloneRow(endKv.get());
} catch (IOException ioe) {
LOG.warn("Problem reading orphan file " + hfile + ", skipping");
@@ -3841,7 +3841,7 @@ public class HBaseFsck extends Configured implements
Closeable {
}
public void checkAndFixReplication() throws ReplicationException {
- ReplicationChecker checker = new ReplicationChecker(getConf(), zkw,
errors);
+ ReplicationChecker checker = new ReplicationChecker(getConf(), zkw,
rootFs, connection, errors);
checker.checkUnDeletedQueues();
if (checker.hasUnDeletedQueues() && this.fixReplication) {
@@ -5475,7 +5475,7 @@ public class HBaseFsck extends Configured implements
Closeable {
return;
}
ReplicationQueueStorage queueStorage =
- ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
+ ReplicationStorageFactoryHelper.getReplicationQueueStorage(getConf(),
zkw, connection);
List<ReplicationPeerDescription> peerDescriptions =
admin.listReplicationPeers();
if (peerDescriptions != null && peerDescriptions.size() > 0) {
List<String> peers = peerDescriptions.stream()
diff --git
a/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/ReplicationChecker.java
b/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/ReplicationChecker.java
index 14359f9..5f1a2a9 100644
--- a/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/ReplicationChecker.java
+++ b/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/ReplicationChecker.java
@@ -17,10 +17,10 @@
*/
package org.apache.hbase.hbck1;
-import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -29,16 +29,19 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hbase.ReplicationStorageFactoryHelper;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+
/**
* Check and fix undeleted replication queues for removed peerId. Copied over
wholesale from hbase.
* Unaltered except for package and imports.
@@ -49,68 +52,199 @@ public class ReplicationChecker {
private static final Logger LOG =
LoggerFactory.getLogger(ReplicationChecker.class);
private final HBaseFsck.ErrorReporter errorReporter;
- // replicator with its queueIds for removed peers
- private Map<ServerName, List<String>> undeletedQueueIds = new HashMap<>();
+
+ private final UnDeletedQueueChecker unDeletedQueueChecker;
+
// replicator with its undeleted queueIds for removed peers in hfile-refs
queue
- private Set<String> undeletedHFileRefsPeerIds = new HashSet<>();
+ private Set<String> undeletedHFileRefsPeerIds = Collections.emptySet();
private final ReplicationPeerStorage peerStorage;
private final ReplicationQueueStorage queueStorage;
- public ReplicationChecker(Configuration conf, ZKWatcher zkw,
+ private UnDeletedQueueChecker initUnDeletedQueueChecker() {
+ try {
+ ReplicationQueueStorage.class.getMethod("listAllPeerIds");
+ return new UnDeletedQueueChecker3();
+ } catch (NoSuchMethodException e) {
+ LOG.debug("No listAllPeerIds method, should be hbase 2", e);
+ return new UnDeletedQueueChecker2();
+ }
+ }
+
+ public ReplicationChecker(Configuration conf, ZKWatcher zkw, FileSystem fs,
Connection conn,
HBaseFsck.ErrorReporter errorReporter) {
- this.peerStorage = getReplicationPeerStorage(conf, zkw);
- this.queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
+ this.peerStorage =
ReplicationStorageFactoryHelper.getReplicationPeerStorage(conf, zkw, fs);
+ this.queueStorage =
ReplicationStorageFactoryHelper.getReplicationQueueStorage(conf, zkw, conn);
this.errorReporter = errorReporter;
+ this.unDeletedQueueChecker = initUnDeletedQueueChecker();
}
- private ReplicationPeerStorage getReplicationPeerStorage(Configuration conf,
ZKWatcher zkw)
- throws AssertionError {
- ReplicationPeerStorage peerStorage;
- try {
- // Case HBase >= 2.6.0: Invoke the method that requires three parameters
- Method method =
ReplicationStorageFactory.class.getMethod("getReplicationPeerStorage",
- FileSystem.class, ZKWatcher.class, Configuration.class);
- FileSystem fileSystem = FileSystem.get(conf);
- peerStorage = (ReplicationPeerStorage) method.invoke(null, fileSystem,
zkw, conf);
- } catch (IOException | NoSuchMethodException | IllegalAccessException
- | InvocationTargetException e1) {
- // Case HBase < 2.6.0: Fall back to the method that requires only two
parameters
+ public boolean hasUnDeletedQueues() {
+ return errorReporter.getErrorList()
+
.contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
+ }
+
+ private interface UnDeletedQueueChecker {
+
+ void check() throws ReplicationException;
+
+ void fix() throws ReplicationException;
+ }
+
+ private final class UnDeletedQueueChecker2 implements UnDeletedQueueChecker {
+
+ private final Method getListOfReplicators;
+
+ private final Method getAllQueues;
+
+ private final Method removeQueue;
+
+ private final Method removeReplicatorIfQueueIsEmpty;
+
+ // replicator with its queueIds for removed peers
+ private Map<ServerName, List<String>> undeletedQueueIds =
Collections.emptyMap();
+
+ UnDeletedQueueChecker2() {
try {
- Method method =
ReplicationStorageFactory.class.getMethod("getReplicationPeerStorage",
- ZKWatcher.class, Configuration.class);
- peerStorage = (ReplicationPeerStorage) method.invoke(null, zkw, conf);
- } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException e2) {
- throw new AssertionError("should not happen", e2);
+ getListOfReplicators =
ReplicationQueueStorage.class.getMethod("getListOfReplicators");
+ getAllQueues = ReplicationQueueStorage.class.getMethod("getAllQueues",
ServerName.class);
+ removeQueue =
+ ReplicationQueueStorage.class.getMethod("removeQueue",
ServerName.class, String.class);
+ removeReplicatorIfQueueIsEmpty = ReplicationQueueStorage.class
+ .getMethod("removeReplicatorIfQueueIsEmpty", ServerName.class);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException("method unavailable", e);
}
}
- return peerStorage;
- }
- public boolean hasUnDeletedQueues() {
- return errorReporter.getErrorList()
-
.contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
+ private Map<ServerName, List<String>> getUnDeletedQueues() throws
ReplicationException {
+ Map<ServerName, List<String>> undeletedQueues = new HashMap<>();
+ Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
+ try {
+ for (ServerName replicator : (List<ServerName>)
getListOfReplicators.invoke(queueStorage)) {
+ for (String queueId : (List<String>)
getAllQueues.invoke(queueStorage, replicator)) {
+ ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+ if (!peerIds.contains(queueInfo.getPeerId())) {
+ undeletedQueues.computeIfAbsent(replicator, key -> new
ArrayList<>()).add(queueId);
+ LOG.debug(
+ "Undeleted replication queue for removed peer found: "
+ + "[removedPeerId={}, replicator={}, queueId={}]",
+ queueInfo.getPeerId(), replicator, queueId);
+ }
+ }
+ }
+ } catch (InvocationTargetException e) {
+ Throwables.throwIfInstanceOf(e.getCause(), ReplicationException.class);
+ Throwables.throwIfUnchecked(e.getCause());
+ throw new RuntimeException(e.getCause());
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ return undeletedQueues;
+ }
+
+ @Override
+ public void check() throws ReplicationException {
+ undeletedQueueIds = getUnDeletedQueues();
+ undeletedQueueIds.forEach((replicator, queueIds) -> {
+ queueIds.forEach(queueId -> {
+ ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+ String msg = "Undeleted replication queue for removed peer found: "
+ + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
queueInfo.getPeerId(),
+ replicator, queueId);
+
errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
+ msg);
+ });
+ });
+ }
+
+ @Override
+ public void fix() throws ReplicationException {
+ try {
+ for (Map.Entry<ServerName, List<String>> replicatorAndQueueIds :
undeletedQueueIds
+ .entrySet()) {
+ ServerName replicator = replicatorAndQueueIds.getKey();
+ for (String queueId : replicatorAndQueueIds.getValue()) {
+ removeQueue.invoke(queueStorage, replicator, queueId);
+ }
+ removeReplicatorIfQueueIsEmpty.invoke(queueStorage, replicator);
+ }
+ } catch (InvocationTargetException e) {
+ Throwables.throwIfInstanceOf(e.getCause(), ReplicationException.class);
+ Throwables.throwIfUnchecked(e.getCause());
+ throw new RuntimeException(e.getCause());
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
- private Map<ServerName, List<String>> getUnDeletedQueues() throws
ReplicationException {
- Map<ServerName, List<String>> undeletedQueues = new HashMap<>();
- Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
- for (ServerName replicator : queueStorage.getListOfReplicators()) {
- for (String queueId : queueStorage.getAllQueues(replicator)) {
- ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
- if (!peerIds.contains(queueInfo.getPeerId())) {
- undeletedQueues.computeIfAbsent(replicator, key -> new
ArrayList<>()).add(queueId);
- LOG.debug(
- "Undeleted replication queue for removed peer found: "
- + "[removedPeerId={}, replicator={}, queueId={}]",
- queueInfo.getPeerId(), replicator, queueId);
+ private final class UnDeletedQueueChecker3 implements UnDeletedQueueChecker {
+
+ private final Method listAllPeerIds;
+
+ private final Method removeAllQueues;
+
+ private List<String> unDeletedPeerIds = Collections.emptyList();
+
+ UnDeletedQueueChecker3() {
+ try {
+ listAllPeerIds =
ReplicationQueueStorage.class.getMethod("listAllPeerIds");
+ removeAllQueues =
ReplicationQueueStorage.class.getMethod("removeAllQueues", String.class);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException("method unavailable", e);
+ }
+ }
+
+ private List<String> getUnDeletedPeerIds() throws ReplicationException {
+ List<String> unDeletedPeerIds = new ArrayList<>();
+ try {
+ Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
+ for (String peerId : (List<String>)
listAllPeerIds.invoke(queueStorage)) {
+ if (!peerIds.contains(peerId)) {
+ unDeletedPeerIds.add(peerId);
+ }
}
+ } catch (InvocationTargetException e) {
+ Throwables.throwIfInstanceOf(e.getCause(), ReplicationException.class);
+ Throwables.throwIfUnchecked(e.getCause());
+ throw new RuntimeException(e.getCause());
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
}
+ return unDeletedPeerIds;
}
- return undeletedQueues;
+
+ @Override
+ public void check() throws ReplicationException {
+ unDeletedPeerIds = getUnDeletedPeerIds();
+ unDeletedPeerIds.forEach(peerId -> {
+ String msg = "Undeleted replication queue for removed peer found: "
+ + String.format("[removedPeerId=%s]", peerId);
+
errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
+ msg);
+ });
+ }
+
+ @Override
+ public void fix() throws ReplicationException {
+ try {
+ for (String peerId : unDeletedPeerIds) {
+ removeAllQueues.invoke(queueStorage, peerId);
+ }
+ } catch (InvocationTargetException e) {
+ Throwables.throwIfInstanceOf(e.getCause(), ReplicationException.class);
+ Throwables.throwIfUnchecked(e.getCause());
+ throw new RuntimeException(e.getCause());
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
}
private Set<String> getUndeletedHFileRefsPeers() throws ReplicationException
{
+
Set<String> undeletedHFileRefsPeerIds =
new HashSet<>(queueStorage.getAllPeersFromHFileRefsQueue());
Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
@@ -123,18 +257,30 @@ public class ReplicationChecker {
return undeletedHFileRefsPeerIds;
}
+ private boolean hasData() throws ReplicationException {
+ Method hasDataMethod;
+ try {
+ hasDataMethod = ReplicationQueueStorage.class.getMethod("hasData");
+ } catch (NoSuchMethodException e) {
+ LOG.debug("No hasData method, should be hbase 2", e);
+ return true;
+ }
+ try {
+ return (boolean) hasDataMethod.invoke(queueStorage);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ Throwables.throwIfInstanceOf(e.getCause(), ReplicationException.class);
+ Throwables.throwIfUnchecked(e.getCause());
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
public void checkUnDeletedQueues() throws ReplicationException {
- undeletedQueueIds = getUnDeletedQueues();
- undeletedQueueIds.forEach((replicator, queueIds) -> {
- queueIds.forEach(queueId -> {
- ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
- String msg = "Undeleted replication queue for removed peer found: "
- + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
queueInfo.getPeerId(),
- replicator, queueId);
-
errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
- msg);
- });
- });
+ if (!hasData()) {
+ return;
+ }
+ unDeletedQueueChecker.check();
undeletedHFileRefsPeerIds = getUndeletedHFileRefsPeers();
undeletedHFileRefsPeerIds.stream()
.map(peerId -> "Undeleted replication hfile-refs queue for removed peer
" + peerId + " found")
@@ -143,13 +289,7 @@ public class ReplicationChecker {
}
public void fixUnDeletedQueues() throws ReplicationException {
- for (Map.Entry<ServerName, List<String>> replicatorAndQueueIds :
undeletedQueueIds.entrySet()) {
- ServerName replicator = replicatorAndQueueIds.getKey();
- for (String queueId : replicatorAndQueueIds.getValue()) {
- queueStorage.removeQueue(replicator, queueId);
- }
- queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
- }
+ unDeletedQueueChecker.fix();
for (String peerId : undeletedHFileRefsPeerIds) {
queueStorage.removePeerFromHFileRefs(peerId);
}