This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fbcb9da601 HBASE-29697 Add a listAllPeerIds methd to 
ReplicationQueueStorage (#7429)
8fbcb9da601 is described below

commit 8fbcb9da6018013bca17bf27c460c378be07441f
Author: Duo Zhang <[email protected]>
AuthorDate: Tue Nov 11 23:11:01 2025 +0800

    HBASE-29697 Add a listAllPeerIds methd to ReplicationQueueStorage (#7429)
    
    Signed-off-by: Nihal Jain <[email protected]>
---
 .../hbase/replication/ReplicationQueueStorage.java |  9 +++
 .../replication/TableReplicationQueueStorage.java  | 51 +++++++++++-----
 .../OfflineTableReplicationQueueStorage.java       |  6 ++
 .../TestTableReplicationQueueStorage.java          | 67 ++++++++++++----------
 4 files changed, 89 insertions(+), 44 deletions(-)

diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
index b5bc64eb55a..0f624dafb2d 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
@@ -74,6 +74,15 @@ public interface ReplicationQueueStorage {
   List<ReplicationQueueId> listAllQueueIds(String peerId, ServerName 
serverName)
     throws ReplicationException;
 
+  /**
+   * Get a list of all peer ids.
+   * <p>
+   * This method is designed for HBCK, where we may have some dirty data left 
in the storage after a
+   * broken procedure run. In normal logic, you should call
+   * {@link ReplicationPeerStorage#listPeerIds()} for getting all the 
replication peer ids.
+   */
+  List<String> listAllPeerIds() throws ReplicationException;
+
   /**
    * Get a list of all queues and the offsets.
    */
diff --git 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
index 415c312ddcd..e21f7c5712b 100644
--- 
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
+++ 
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
@@ -213,6 +213,40 @@ public class TableReplicationQueueStorage implements 
ReplicationQueueStorage {
       queueIds);
   }
 
+  private String getNextPeerId(Table table, String previousPeerId) throws 
IOException {
+    Scan peerScan =
+      new Scan().addFamily(QUEUE_FAMILY).setOneRowLimit().setFilter(new 
KeyOnlyFilter());
+    if (previousPeerId != null) {
+      
peerScan.withStartRow(ReplicationQueueId.getScanStartRowForNextPeerId(previousPeerId));
+    }
+    try (ResultScanner scanner = table.getScanner(peerScan)) {
+      Result result = scanner.next();
+      if (result == null) {
+        return null;
+      }
+      return ReplicationQueueId.getPeerId(Bytes.toString(result.getRow()));
+    }
+  }
+
+  @Override
+  public List<String> listAllPeerIds() throws ReplicationException {
+    List<String> peerIds = new ArrayList<>();
+    String previousPeerId = null;
+    try (Table table = conn.getTable(tableName)) {
+      for (;;) {
+        String peerId = getNextPeerId(table, previousPeerId);
+        if (peerId == null) {
+          break;
+        }
+        peerIds.add(peerId);
+        previousPeerId = peerId;
+      }
+    } catch (IOException e) {
+      throw new ReplicationException("failed to listAllPeerIds", e);
+    }
+    return peerIds;
+  }
+
   @Override
   public List<ReplicationQueueId> listAllQueueIds(String peerId) throws 
ReplicationException {
     Scan scan = new 
Scan().setStartStopRowForPrefixScan(ReplicationQueueId.getScanPrefix(peerId))
@@ -231,23 +265,12 @@ public class TableReplicationQueueStorage implements 
ReplicationQueueStorage {
     throws ReplicationException {
     List<ReplicationQueueId> queueIds = new ArrayList<>();
     try (Table table = conn.getTable(tableName)) {
-      KeyOnlyFilter keyOnlyFilter = new KeyOnlyFilter();
       String previousPeerId = null;
       for (;;) {
         // first, get the next peerId
-        Scan peerScan =
-          new 
Scan().addFamily(QUEUE_FAMILY).setOneRowLimit().setFilter(keyOnlyFilter);
-        if (previousPeerId != null) {
-          
peerScan.withStartRow(ReplicationQueueId.getScanStartRowForNextPeerId(previousPeerId));
-        }
-        String peerId;
-        try (ResultScanner scanner = table.getScanner(peerScan)) {
-          Result result = scanner.next();
-          if (result == null) {
-            // no more peers, break
-            break;
-          }
-          peerId = 
ReplicationQueueId.getPeerId(Bytes.toString(result.getRow()));
+        String peerId = getNextPeerId(table, previousPeerId);
+        if (peerId == null) {
+          break;
         }
         listAllQueueIds(table, peerId, serverName, queueIds);
         previousPeerId = peerId;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java
index 9faca74f710..45bb86f935a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java
@@ -204,6 +204,12 @@ public class OfflineTableReplicationQueueStorage 
implements ReplicationQueueStor
     return ImmutableMap.copyOf(offsetMap);
   }
 
+  @Override
+  public List<String> listAllPeerIds() throws ReplicationException {
+    return 
offsets.keySet().stream().map(ReplicationQueueId::getPeerId).distinct()
+      .collect(Collectors.toList());
+  }
+
   @Override
   public synchronized List<ReplicationQueueId> listAllQueueIds(String peerId)
     throws ReplicationException {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java
index 9041831d0e8..a2afd3a4d93 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java
@@ -18,12 +18,13 @@
 package org.apache.hadoop.hbase.replication;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -32,15 +33,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNameTestRule;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -51,48 +51,41 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.zookeeper.KeeperException;
 import org.hamcrest.Matchers;
 import org.hamcrest.collection.IsEmptyCollection;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
 
-@Category({ ReplicationTests.class, MediumTests.class })
+@Tag(ReplicationTests.TAG)
+@Tag(MediumTests.TAG)
 public class TestTableReplicationQueueStorage {
 
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-    HBaseClassTestRule.forClass(TestTableReplicationQueueStorage.class);
-
   private static final Logger LOG = 
LoggerFactory.getLogger(TestTableReplicationQueueStorage.class);
 
   private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
 
-  @Rule
-  public TableNameTestRule tableNameRule = new TableNameTestRule();
-
   private TableReplicationQueueStorage storage;
 
-  @BeforeClass
+  @BeforeAll
   public static void setUp() throws Exception {
     UTIL.startMiniCluster();
   }
 
-  @AfterClass
+  @AfterAll
   public static void tearDown() throws IOException {
     UTIL.shutdownMiniCluster();
   }
 
-  @Before
-  public void setUpBeforeTest() throws Exception {
-    TableName tableName = tableNameRule.getTableName();
+  @BeforeEach
+  public void setUpBeforeTest(TestInfo testInfo) throws Exception {
+    TableName tableName = 
TableName.valueOf(testInfo.getTestMethod().get().getName());
     TableDescriptor td = 
ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
     UTIL.getAdmin().createTable(td);
     UTIL.waitTableAvailable(tableName);
@@ -135,11 +128,6 @@ public class TestTableReplicationQueueStorage {
     }
   }
 
-  @Test
-  public void testGetSetOffset() {
-
-  }
-
   private void assertQueueId(String peerId, ServerName serverName, 
ReplicationQueueId queueId) {
     assertEquals(peerId, queueId.getPeerId());
     assertEquals(serverName, queueId.getServerName());
@@ -471,4 +459,23 @@ public class TestTableReplicationQueueStorage {
     assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size());
     assertEquals(3, storage.getReplicableHFiles(peerId2).size());
   }
+
+  @Test
+  public void testListAllPeerIds() throws ReplicationException {
+    assertThat(storage.listAllPeerIds(), empty());
+
+    for (int i = 0; i < 20; i++) {
+      int numQueues = ThreadLocalRandom.current().nextInt(10, 100);
+      for (int j = 0; j < numQueues; j++) {
+        ReplicationQueueId queueId = new ReplicationQueueId(getServerName(j), 
"Peer_" + i);
+        storage.setOffset(queueId, "group-" + j, new 
ReplicationGroupOffset("file-" + j, j * 100),
+          Collections.emptyMap());
+      }
+    }
+    List<String> peerIds = storage.listAllPeerIds();
+    assertThat(peerIds, hasSize(20));
+    for (int i = 0; i < 20; i++) {
+      assertThat(peerIds, hasItem("Peer_" + i));
+    }
+  }
 }

Reply via email to