This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new fa48a58e191 HBASE-29697 Add a listAllPeerIds methd to
ReplicationQueueStorage (#7429)
fa48a58e191 is described below
commit fa48a58e191e25651da864cd5f3f40071f3f7d3a
Author: Duo Zhang <[email protected]>
AuthorDate: Tue Nov 11 23:11:01 2025 +0800
HBASE-29697 Add a listAllPeerIds methd to ReplicationQueueStorage (#7429)
Signed-off-by: Nihal Jain <[email protected]>
(cherry picked from commit 8fbcb9da6018013bca17bf27c460c378be07441f)
---
.../hbase/replication/ReplicationQueueStorage.java | 9 +++
.../replication/TableReplicationQueueStorage.java | 51 +++++++++++-----
.../OfflineTableReplicationQueueStorage.java | 6 ++
.../TestTableReplicationQueueStorage.java | 67 ++++++++++++----------
4 files changed, 89 insertions(+), 44 deletions(-)
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
index b5bc64eb55a..0f624dafb2d 100644
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java
@@ -74,6 +74,15 @@ public interface ReplicationQueueStorage {
List<ReplicationQueueId> listAllQueueIds(String peerId, ServerName
serverName)
throws ReplicationException;
+ /**
+ * Get a list of all peer ids.
+ * <p>
+ * This method is designed for HBCK, where we may have some dirty data left
in the storage after a
+ * broken procedure run. In normal logic, you should call
+ * {@link ReplicationPeerStorage#listPeerIds()} for getting all the
replication peer ids.
+ */
+ List<String> listAllPeerIds() throws ReplicationException;
+
/**
* Get a list of all queues and the offsets.
*/
diff --git
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
index 415c312ddcd..e21f7c5712b 100644
---
a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
+++
b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
@@ -213,6 +213,40 @@ public class TableReplicationQueueStorage implements
ReplicationQueueStorage {
queueIds);
}
+ private String getNextPeerId(Table table, String previousPeerId) throws
IOException {
+ Scan peerScan =
+ new Scan().addFamily(QUEUE_FAMILY).setOneRowLimit().setFilter(new
KeyOnlyFilter());
+ if (previousPeerId != null) {
+
peerScan.withStartRow(ReplicationQueueId.getScanStartRowForNextPeerId(previousPeerId));
+ }
+ try (ResultScanner scanner = table.getScanner(peerScan)) {
+ Result result = scanner.next();
+ if (result == null) {
+ return null;
+ }
+ return ReplicationQueueId.getPeerId(Bytes.toString(result.getRow()));
+ }
+ }
+
+ @Override
+ public List<String> listAllPeerIds() throws ReplicationException {
+ List<String> peerIds = new ArrayList<>();
+ String previousPeerId = null;
+ try (Table table = conn.getTable(tableName)) {
+ for (;;) {
+ String peerId = getNextPeerId(table, previousPeerId);
+ if (peerId == null) {
+ break;
+ }
+ peerIds.add(peerId);
+ previousPeerId = peerId;
+ }
+ } catch (IOException e) {
+ throw new ReplicationException("failed to listAllPeerIds", e);
+ }
+ return peerIds;
+ }
+
@Override
public List<ReplicationQueueId> listAllQueueIds(String peerId) throws
ReplicationException {
Scan scan = new
Scan().setStartStopRowForPrefixScan(ReplicationQueueId.getScanPrefix(peerId))
@@ -231,23 +265,12 @@ public class TableReplicationQueueStorage implements
ReplicationQueueStorage {
throws ReplicationException {
List<ReplicationQueueId> queueIds = new ArrayList<>();
try (Table table = conn.getTable(tableName)) {
- KeyOnlyFilter keyOnlyFilter = new KeyOnlyFilter();
String previousPeerId = null;
for (;;) {
// first, get the next peerId
- Scan peerScan =
- new
Scan().addFamily(QUEUE_FAMILY).setOneRowLimit().setFilter(keyOnlyFilter);
- if (previousPeerId != null) {
-
peerScan.withStartRow(ReplicationQueueId.getScanStartRowForNextPeerId(previousPeerId));
- }
- String peerId;
- try (ResultScanner scanner = table.getScanner(peerScan)) {
- Result result = scanner.next();
- if (result == null) {
- // no more peers, break
- break;
- }
- peerId =
ReplicationQueueId.getPeerId(Bytes.toString(result.getRow()));
+ String peerId = getNextPeerId(table, previousPeerId);
+ if (peerId == null) {
+ break;
}
listAllQueueIds(table, peerId, serverName, queueIds);
previousPeerId = peerId;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java
index 9faca74f710..45bb86f935a 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java
@@ -204,6 +204,12 @@ public class OfflineTableReplicationQueueStorage
implements ReplicationQueueStor
return ImmutableMap.copyOf(offsetMap);
}
+ @Override
+ public List<String> listAllPeerIds() throws ReplicationException {
+ return
offsets.keySet().stream().map(ReplicationQueueId::getPeerId).distinct()
+ .collect(Collectors.toList());
+ }
+
@Override
public synchronized List<ReplicationQueueId> listAllQueueIds(String peerId)
throws ReplicationException {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java
index 9041831d0e8..a2afd3a4d93 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java
@@ -18,12 +18,13 @@
package org.apache.hadoop.hbase.replication;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
@@ -32,15 +33,14 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNameTestRule;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -51,48 +51,41 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.zookeeper.KeeperException;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsEmptyCollection;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
-@Category({ ReplicationTests.class, MediumTests.class })
+@Tag(ReplicationTests.TAG)
+@Tag(MediumTests.TAG)
public class TestTableReplicationQueueStorage {
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestTableReplicationQueueStorage.class);
-
private static final Logger LOG =
LoggerFactory.getLogger(TestTableReplicationQueueStorage.class);
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
- @Rule
- public TableNameTestRule tableNameRule = new TableNameTestRule();
-
private TableReplicationQueueStorage storage;
- @BeforeClass
+ @BeforeAll
public static void setUp() throws Exception {
UTIL.startMiniCluster();
}
- @AfterClass
+ @AfterAll
public static void tearDown() throws IOException {
UTIL.shutdownMiniCluster();
}
- @Before
- public void setUpBeforeTest() throws Exception {
- TableName tableName = tableNameRule.getTableName();
+ @BeforeEach
+ public void setUpBeforeTest(TestInfo testInfo) throws Exception {
+ TableName tableName =
TableName.valueOf(testInfo.getTestMethod().get().getName());
TableDescriptor td =
ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
UTIL.getAdmin().createTable(td);
UTIL.waitTableAvailable(tableName);
@@ -135,11 +128,6 @@ public class TestTableReplicationQueueStorage {
}
}
- @Test
- public void testGetSetOffset() {
-
- }
-
private void assertQueueId(String peerId, ServerName serverName,
ReplicationQueueId queueId) {
assertEquals(peerId, queueId.getPeerId());
assertEquals(serverName, queueId.getServerName());
@@ -471,4 +459,23 @@ public class TestTableReplicationQueueStorage {
assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size());
assertEquals(3, storage.getReplicableHFiles(peerId2).size());
}
+
+ @Test
+ public void testListAllPeerIds() throws ReplicationException {
+ assertThat(storage.listAllPeerIds(), empty());
+
+ for (int i = 0; i < 20; i++) {
+ int numQueues = ThreadLocalRandom.current().nextInt(10, 100);
+ for (int j = 0; j < numQueues; j++) {
+ ReplicationQueueId queueId = new ReplicationQueueId(getServerName(j),
"Peer_" + i);
+ storage.setOffset(queueId, "group-" + j, new
ReplicationGroupOffset("file-" + j, j * 100),
+ Collections.emptyMap());
+ }
+ }
+ List<String> peerIds = storage.listAllPeerIds();
+ assertThat(peerIds, hasSize(20));
+ for (int i = 0; i < 20; i++) {
+ assertThat(peerIds, hasItem("Peer_" + i));
+ }
+ }
}