This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase-operator-tools.git


The following commit(s) were added to refs/heads/master by this push:
     new 58fe768  HBASE-29671 [hbase-operator-tools] Support different 
replication peer/queue storage (#156)
58fe768 is described below

commit 58fe76868ca34fcdc634e20285118dfdb2d0340b
Author: Duo Zhang <[email protected]>
AuthorDate: Tue Nov 18 15:15:26 2025 +0800

    HBASE-29671 [hbase-operator-tools] Support different replication peer/queue 
storage (#156)
    
    Signed-off-by: Peter Somogyi <[email protected]>
    Signed-off-by: Nihal Jain <[email protected]>
---
 .../java/org/apache/hbase/ReplicationFsck.java     |   3 +-
 .../hbase/ReplicationStorageFactoryHelper.java     |  89 +++++++
 .../java/org/apache/hbase/hbck1/HBaseFsck.java     |  10 +-
 .../org/apache/hbase/hbck1/ReplicationChecker.java | 262 ++++++++++++++++-----
 4 files changed, 296 insertions(+), 68 deletions(-)

diff --git a/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationFsck.java 
b/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationFsck.java
index 282d7ab..68b3ba2 100644
--- a/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationFsck.java
+++ b/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationFsck.java
@@ -41,11 +41,10 @@ public class ReplicationFsck implements Closeable {
 
   int fsck(List<String> tables, boolean fix) throws IOException {
     try (HBaseFsck hbaseFsck = new HBaseFsck(this.configuration)) {
+      hbaseFsck.connect();
       hbaseFsck.setFixReplication(fix);
       hbaseFsck.checkAndFixReplication();
       if (tables != null && !tables.isEmpty()) {
-        // Below needs connection to be up; uses admin.
-        hbaseFsck.connect();
         hbaseFsck.setCleanReplicationBarrier(fix);
         for (String table : tables) {
           hbaseFsck.setCleanReplicationBarrierTable(table);
diff --git 
a/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationStorageFactoryHelper.java
 
b/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationStorageFactoryHelper.java
new file mode 100644
index 0000000..e54a2d0
--- /dev/null
+++ 
b/hbase-hbck2/src/main/java/org/apache/hbase/ReplicationStorageFactoryHelper.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class for supporting different versions of HBase for creating
+ * {@link ReplicationQueueStorage} and {@link ReplicationPeerStorage}.
+ */
+public final class ReplicationStorageFactoryHelper {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationStorageFactoryHelper.class);
+
+  private ReplicationStorageFactoryHelper() {
+  }
+
+  public static ReplicationPeerStorage getReplicationPeerStorage(Configuration 
conf, ZKWatcher zkw,
+    FileSystem fs) {
+    // Case HBase >= 2.6.0: Invoke the method that requires three parameters
+    try {
+      Method method = 
ReplicationStorageFactory.class.getMethod("getReplicationPeerStorage",
+        FileSystem.class, ZKWatcher.class, Configuration.class);
+      return (ReplicationPeerStorage) method.invoke(null, fs, zkw, conf);
+    } catch (NoSuchMethodException e) {
+      LOG.debug("No getReplicationPeerStorage method with FileSystem as a 
parameter, "
+        + "should be HBase 2.6-", e);
+    } catch (IllegalAccessException | InvocationTargetException e) {
+      // getReplicationPeerStorage method does not throw any exceptions, so 
should not arrive here
+      throw new RuntimeException(e);
+    }
+    // Case HBase < 2.6.0: Fall back to the method that requires only two 
parameters
+    try {
+      Method method = 
ReplicationStorageFactory.class.getMethod("getReplicationPeerStorage",
+        ZKWatcher.class, Configuration.class);
+      return (ReplicationPeerStorage) method.invoke(null, zkw, conf);
+    } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static ReplicationQueueStorage 
getReplicationQueueStorage(Configuration conf,
+    ZKWatcher zkw, Connection conn) {
+    try {
+      Method method = 
ReplicationStorageFactory.class.getMethod("getReplicationQueueStorage",
+        Connection.class, Configuration.class);
+      return (ReplicationQueueStorage) method.invoke(null, conn, conf);
+    } catch (NoSuchMethodException e) {
+      LOG.debug("No getReplicationQueueStorage method with Connection as a 
parameter, "
+        + "should be HBase 2.x", e);
+    } catch (IllegalAccessException | InvocationTargetException e) {
+      // getReplicationQueueStorage method does not throw any exceptions, so 
should not arrive here
+      throw new RuntimeException(e);
+    }
+    try {
+      Method method = 
ReplicationStorageFactory.class.getMethod("getReplicationQueueStorage",
+        ZKWatcher.class, Configuration.class);
+      return (ReplicationQueueStorage) method.invoke(null, zkw, conf);
+    } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
+      // getReplicationQueueStorage method does not throw any exceptions, so 
should not arrive here
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/HBaseFsck.java 
b/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/HBaseFsck.java
index 2448dd4..601e913 100644
--- a/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/HBaseFsck.java
+++ b/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/HBaseFsck.java
@@ -126,7 +126,6 @@ import 
org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
@@ -155,6 +154,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hbase.HBCKFsUtils;
 import org.apache.hbase.HBCKMetaTableAccessor;
+import org.apache.hbase.ReplicationStorageFactoryHelper;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.apache.zookeeper.KeeperException;
@@ -1050,9 +1050,9 @@ public class HBaseFsck extends Configured implements 
Closeable {
         HFile.Reader hf = null;
         try {
           hf = HFile.createReader(fs, hfile.getPath(), CacheConfig.DISABLED, 
true, getConf());
-          Optional<Cell> startKv = hf.getFirstKey();
+          Optional<? extends Cell> startKv = hf.getFirstKey();
           start = CellUtil.cloneRow(startKv.get());
-          Optional<Cell> endKv = hf.getLastKey();
+          Optional<? extends Cell> endKv = hf.getLastKey();
           end = CellUtil.cloneRow(endKv.get());
         } catch (IOException ioe) {
           LOG.warn("Problem reading orphan file " + hfile + ", skipping");
@@ -3841,7 +3841,7 @@ public class HBaseFsck extends Configured implements 
Closeable {
   }
 
   public void checkAndFixReplication() throws ReplicationException {
-    ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, 
errors);
+    ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, 
rootFs, connection, errors);
     checker.checkUnDeletedQueues();
 
     if (checker.hasUnDeletedQueues() && this.fixReplication) {
@@ -5475,7 +5475,7 @@ public class HBaseFsck extends Configured implements 
Closeable {
       return;
     }
     ReplicationQueueStorage queueStorage =
-      ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
+      ReplicationStorageFactoryHelper.getReplicationQueueStorage(getConf(), 
zkw, connection);
     List<ReplicationPeerDescription> peerDescriptions = 
admin.listReplicationPeers();
     if (peerDescriptions != null && peerDescriptions.size() > 0) {
       List<String> peers = peerDescriptions.stream()
diff --git 
a/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/ReplicationChecker.java 
b/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/ReplicationChecker.java
index 14359f9..5f1a2a9 100644
--- a/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/ReplicationChecker.java
+++ b/hbase-hbck2/src/main/java/org/apache/hbase/hbck1/ReplicationChecker.java
@@ -17,10 +17,10 @@
  */
 package org.apache.hbase.hbck1;
 
-import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -29,16 +29,19 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hbase.ReplicationStorageFactoryHelper;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+
 /**
  * Check and fix undeleted replication queues for removed peerId. Copied over 
wholesale from hbase.
  * Unaltered except for package and imports.
@@ -49,68 +52,199 @@ public class ReplicationChecker {
   private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationChecker.class);
 
   private final HBaseFsck.ErrorReporter errorReporter;
-  // replicator with its queueIds for removed peers
-  private Map<ServerName, List<String>> undeletedQueueIds = new HashMap<>();
+
+  private final UnDeletedQueueChecker unDeletedQueueChecker;
+
   // replicator with its undeleted queueIds for removed peers in hfile-refs 
queue
-  private Set<String> undeletedHFileRefsPeerIds = new HashSet<>();
+  private Set<String> undeletedHFileRefsPeerIds = Collections.emptySet();
 
   private final ReplicationPeerStorage peerStorage;
   private final ReplicationQueueStorage queueStorage;
 
-  public ReplicationChecker(Configuration conf, ZKWatcher zkw,
+  private UnDeletedQueueChecker initUnDeletedQueueChecker() {
+    try {
+      ReplicationQueueStorage.class.getMethod("listAllPeerIds");
+      return new UnDeletedQueueChecker3();
+    } catch (NoSuchMethodException e) {
+      LOG.debug("No listAllPeerIds method, should be hbase 2", e);
+      return new UnDeletedQueueChecker2();
+    }
+  }
+
+  public ReplicationChecker(Configuration conf, ZKWatcher zkw, FileSystem fs, 
Connection conn,
     HBaseFsck.ErrorReporter errorReporter) {
-    this.peerStorage = getReplicationPeerStorage(conf, zkw);
-    this.queueStorage = 
ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
+    this.peerStorage = 
ReplicationStorageFactoryHelper.getReplicationPeerStorage(conf, zkw, fs);
+    this.queueStorage = 
ReplicationStorageFactoryHelper.getReplicationQueueStorage(conf, zkw, conn);
     this.errorReporter = errorReporter;
+    this.unDeletedQueueChecker = initUnDeletedQueueChecker();
   }
 
-  private ReplicationPeerStorage getReplicationPeerStorage(Configuration conf, 
ZKWatcher zkw)
-    throws AssertionError {
-    ReplicationPeerStorage peerStorage;
-    try {
-      // Case HBase >= 2.6.0: Invoke the method that requires three parameters
-      Method method = 
ReplicationStorageFactory.class.getMethod("getReplicationPeerStorage",
-        FileSystem.class, ZKWatcher.class, Configuration.class);
-      FileSystem fileSystem = FileSystem.get(conf);
-      peerStorage = (ReplicationPeerStorage) method.invoke(null, fileSystem, 
zkw, conf);
-    } catch (IOException | NoSuchMethodException | IllegalAccessException
-      | InvocationTargetException e1) {
-      // Case HBase < 2.6.0: Fall back to the method that requires only two 
parameters
+  public boolean hasUnDeletedQueues() {
+    return errorReporter.getErrorList()
+      
.contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
+  }
+
+  private interface UnDeletedQueueChecker {
+
+    void check() throws ReplicationException;
+
+    void fix() throws ReplicationException;
+  }
+
+  private final class UnDeletedQueueChecker2 implements UnDeletedQueueChecker {
+
+    private final Method getListOfReplicators;
+
+    private final Method getAllQueues;
+
+    private final Method removeQueue;
+
+    private final Method removeReplicatorIfQueueIsEmpty;
+
+    // replicator with its queueIds for removed peers
+    private Map<ServerName, List<String>> undeletedQueueIds = 
Collections.emptyMap();
+
+    UnDeletedQueueChecker2() {
       try {
-        Method method = 
ReplicationStorageFactory.class.getMethod("getReplicationPeerStorage",
-          ZKWatcher.class, Configuration.class);
-        peerStorage = (ReplicationPeerStorage) method.invoke(null, zkw, conf);
-      } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e2) {
-        throw new AssertionError("should not happen", e2);
+        getListOfReplicators = 
ReplicationQueueStorage.class.getMethod("getListOfReplicators");
+        getAllQueues = ReplicationQueueStorage.class.getMethod("getAllQueues", 
ServerName.class);
+        removeQueue =
+          ReplicationQueueStorage.class.getMethod("removeQueue", 
ServerName.class, String.class);
+        removeReplicatorIfQueueIsEmpty = ReplicationQueueStorage.class
+          .getMethod("removeReplicatorIfQueueIsEmpty", ServerName.class);
+      } catch (NoSuchMethodException e) {
+        throw new RuntimeException("method unavailable", e);
       }
     }
-    return peerStorage;
-  }
 
-  public boolean hasUnDeletedQueues() {
-    return errorReporter.getErrorList()
-      
.contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE);
+    private Map<ServerName, List<String>> getUnDeletedQueues() throws 
ReplicationException {
+      Map<ServerName, List<String>> undeletedQueues = new HashMap<>();
+      Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
+      try {
+        for (ServerName replicator : (List<ServerName>) 
getListOfReplicators.invoke(queueStorage)) {
+          for (String queueId : (List<String>) 
getAllQueues.invoke(queueStorage, replicator)) {
+            ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+            if (!peerIds.contains(queueInfo.getPeerId())) {
+              undeletedQueues.computeIfAbsent(replicator, key -> new 
ArrayList<>()).add(queueId);
+              LOG.debug(
+                "Undeleted replication queue for removed peer found: "
+                  + "[removedPeerId={}, replicator={}, queueId={}]",
+                queueInfo.getPeerId(), replicator, queueId);
+            }
+          }
+        }
+      } catch (InvocationTargetException e) {
+        Throwables.throwIfInstanceOf(e.getCause(), ReplicationException.class);
+        Throwables.throwIfUnchecked(e.getCause());
+        throw new RuntimeException(e.getCause());
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
+      return undeletedQueues;
+    }
+
+    @Override
+    public void check() throws ReplicationException {
+      undeletedQueueIds = getUnDeletedQueues();
+      undeletedQueueIds.forEach((replicator, queueIds) -> {
+        queueIds.forEach(queueId -> {
+          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+          String msg = "Undeleted replication queue for removed peer found: "
+            + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", 
queueInfo.getPeerId(),
+              replicator, queueId);
+          
errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
+            msg);
+        });
+      });
+    }
+
+    @Override
+    public void fix() throws ReplicationException {
+      try {
+        for (Map.Entry<ServerName, List<String>> replicatorAndQueueIds : 
undeletedQueueIds
+          .entrySet()) {
+          ServerName replicator = replicatorAndQueueIds.getKey();
+          for (String queueId : replicatorAndQueueIds.getValue()) {
+            removeQueue.invoke(queueStorage, replicator, queueId);
+          }
+          removeReplicatorIfQueueIsEmpty.invoke(queueStorage, replicator);
+        }
+      } catch (InvocationTargetException e) {
+        Throwables.throwIfInstanceOf(e.getCause(), ReplicationException.class);
+        Throwables.throwIfUnchecked(e.getCause());
+        throw new RuntimeException(e.getCause());
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
+    }
   }
 
-  private Map<ServerName, List<String>> getUnDeletedQueues() throws 
ReplicationException {
-    Map<ServerName, List<String>> undeletedQueues = new HashMap<>();
-    Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
-    for (ServerName replicator : queueStorage.getListOfReplicators()) {
-      for (String queueId : queueStorage.getAllQueues(replicator)) {
-        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
-        if (!peerIds.contains(queueInfo.getPeerId())) {
-          undeletedQueues.computeIfAbsent(replicator, key -> new 
ArrayList<>()).add(queueId);
-          LOG.debug(
-            "Undeleted replication queue for removed peer found: "
-              + "[removedPeerId={}, replicator={}, queueId={}]",
-            queueInfo.getPeerId(), replicator, queueId);
+  private final class UnDeletedQueueChecker3 implements UnDeletedQueueChecker {
+
+    private final Method listAllPeerIds;
+
+    private final Method removeAllQueues;
+
+    private List<String> unDeletedPeerIds = Collections.emptyList();
+
+    UnDeletedQueueChecker3() {
+      try {
+        listAllPeerIds = 
ReplicationQueueStorage.class.getMethod("listAllPeerIds");
+        removeAllQueues = 
ReplicationQueueStorage.class.getMethod("removeAllQueues", String.class);
+      } catch (NoSuchMethodException e) {
+        throw new RuntimeException("method unavailable", e);
+      }
+    }
+
+    private List<String> getUnDeletedPeerIds() throws ReplicationException {
+      List<String> unDeletedPeerIds = new ArrayList<>();
+      try {
+        Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
+        for (String peerId : (List<String>) 
listAllPeerIds.invoke(queueStorage)) {
+          if (!peerIds.contains(peerId)) {
+            unDeletedPeerIds.add(peerId);
+          }
         }
+      } catch (InvocationTargetException e) {
+        Throwables.throwIfInstanceOf(e.getCause(), ReplicationException.class);
+        Throwables.throwIfUnchecked(e.getCause());
+        throw new RuntimeException(e.getCause());
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
       }
+      return unDeletedPeerIds;
     }
-    return undeletedQueues;
+
+    @Override
+    public void check() throws ReplicationException {
+      unDeletedPeerIds = getUnDeletedPeerIds();
+      unDeletedPeerIds.forEach(peerId -> {
+        String msg = "Undeleted replication queue for removed peer found: "
+          + String.format("[removedPeerId=%s]", peerId);
+        
errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
+          msg);
+      });
+    }
+
+    @Override
+    public void fix() throws ReplicationException {
+      try {
+        for (String peerId : unDeletedPeerIds) {
+          removeAllQueues.invoke(queueStorage, peerId);
+        }
+      } catch (InvocationTargetException e) {
+        Throwables.throwIfInstanceOf(e.getCause(), ReplicationException.class);
+        Throwables.throwIfUnchecked(e.getCause());
+        throw new RuntimeException(e.getCause());
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
   }
 
   private Set<String> getUndeletedHFileRefsPeers() throws ReplicationException 
{
+
     Set<String> undeletedHFileRefsPeerIds =
       new HashSet<>(queueStorage.getAllPeersFromHFileRefsQueue());
     Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
@@ -123,18 +257,30 @@ public class ReplicationChecker {
     return undeletedHFileRefsPeerIds;
   }
 
+  private boolean hasData() throws ReplicationException {
+    Method hasDataMethod;
+    try {
+      hasDataMethod = ReplicationQueueStorage.class.getMethod("hasData");
+    } catch (NoSuchMethodException e) {
+      LOG.debug("No hasData method, should be hbase 2", e);
+      return true;
+    }
+    try {
+      return (boolean) hasDataMethod.invoke(queueStorage);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException(e);
+    } catch (InvocationTargetException e) {
+      Throwables.throwIfInstanceOf(e.getCause(), ReplicationException.class);
+      Throwables.throwIfUnchecked(e.getCause());
+      throw new RuntimeException(e.getCause());
+    }
+  }
+
   public void checkUnDeletedQueues() throws ReplicationException {
-    undeletedQueueIds = getUnDeletedQueues();
-    undeletedQueueIds.forEach((replicator, queueIds) -> {
-      queueIds.forEach(queueId -> {
-        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
-        String msg = "Undeleted replication queue for removed peer found: "
-          + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", 
queueInfo.getPeerId(),
-            replicator, queueId);
-        
errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
-          msg);
-      });
-    });
+    if (!hasData()) {
+      return;
+    }
+    unDeletedQueueChecker.check();
     undeletedHFileRefsPeerIds = getUndeletedHFileRefsPeers();
     undeletedHFileRefsPeerIds.stream()
       .map(peerId -> "Undeleted replication hfile-refs queue for removed peer 
" + peerId + " found")
@@ -143,13 +289,7 @@ public class ReplicationChecker {
   }
 
   public void fixUnDeletedQueues() throws ReplicationException {
-    for (Map.Entry<ServerName, List<String>> replicatorAndQueueIds : 
undeletedQueueIds.entrySet()) {
-      ServerName replicator = replicatorAndQueueIds.getKey();
-      for (String queueId : replicatorAndQueueIds.getValue()) {
-        queueStorage.removeQueue(replicator, queueId);
-      }
-      queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
-    }
+    unDeletedQueueChecker.fix();
     for (String peerId : undeletedHFileRefsPeerIds) {
       queueStorage.removePeerFromHFileRefs(peerId);
     }

Reply via email to