This is an automated email from the ASF dual-hosted git repository.
lupeng pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new f790ab13f53 HBASE-29376
ReplicationLogCleaner.preClean/getDeletableFiles should return early when
asyncClusterConnection closes during HMaster stopping (#7071)
f790ab13f53 is described below
commit f790ab13f539fbb5a62f06b440ab240eb086ee4d
Author: Peng Lu <[email protected]>
AuthorDate: Sat Sep 6 22:27:11 2025 +0800
HBASE-29376 ReplicationLogCleaner.preClean/getDeletableFiles should return
early when asyncClusterConnection closes during HMaster stopping (#7071)
Signed-off-by: Duo Zhang <[email protected]>
---
.../replication/master/ReplicationLogCleaner.java | 33 ++++++++++++---
.../hbase/master/cleaner/TestLogsCleaner.java | 4 ++
.../master/TestReplicationLogCleaner.java | 47 +++++++++++++++++++++-
3 files changed, 76 insertions(+), 8 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 7fc8feae72a..caf4bac088d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -28,6 +28,7 @@ import java.util.stream.Stream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
@@ -65,6 +66,7 @@ public class ReplicationLogCleaner extends
BaseLogCleanerDelegate {
// queue for a given peer, that why we can use a String peerId as key
instead of
// ReplicationQueueId.
private Map<ServerName, Map<String, Map<String, ReplicationGroupOffset>>>
replicationOffsets;
+ private MasterServices masterService;
private ReplicationLogCleanerBarrier barrier;
private ReplicationPeerManager rpm;
private Supplier<Set<ServerName>> getNotFullyDeadServers;
@@ -74,9 +76,12 @@ public class ReplicationLogCleaner extends
BaseLogCleanerDelegate {
@Override
public void preClean() {
- if (this.getConf() == null) {
+ if (this.getConf() == null || isAsyncClusterConnectionClosedOrNull()) {
+ LOG.warn(
+ "Skipping preClean because configuration is null or
asyncClusterConnection is unavailable.");
return;
}
+
try {
if (!rpm.getQueueStorage().hasData()) {
return;
@@ -192,6 +197,13 @@ public class ReplicationLogCleaner extends
BaseLogCleanerDelegate {
if (this.getConf() == null) {
return files;
}
+
+ if (isAsyncClusterConnectionClosedOrNull()) {
+ LOG.warn("Skip getting deletable files because asyncClusterConnection is
unavailable.");
+ // asyncClusterConnection is unavailable, we shouldn't delete any files.
+ return Collections.emptyList();
+ }
+
try {
if (!rpm.getQueueStorage().hasData()) {
return files;
@@ -251,11 +263,11 @@ public class ReplicationLogCleaner extends
BaseLogCleanerDelegate {
super.init(params);
if (MapUtils.isNotEmpty(params)) {
Object master = params.get(HMaster.MASTER);
- if (master != null && master instanceof MasterServices) {
- MasterServices m = (MasterServices) master;
- barrier = m.getReplicationLogCleanerBarrier();
- rpm = m.getReplicationPeerManager();
- getNotFullyDeadServers = () -> getNotFullyDeadServers(m);
+ if (master instanceof MasterServices) {
+ masterService = (MasterServices) master;
+ barrier = masterService.getReplicationLogCleanerBarrier();
+ rpm = masterService.getReplicationPeerManager();
+ getNotFullyDeadServers = () -> getNotFullyDeadServers(masterService);
return;
}
}
@@ -271,4 +283,13 @@ public class ReplicationLogCleaner extends
BaseLogCleanerDelegate {
public boolean isStopped() {
return this.stopped;
}
+
+ /**
+ * Check if asyncClusterConnection is null or closed.
+ * @return true if asyncClusterConnection is null or is closed, false
otherwise
+ */
+ private boolean isAsyncClusterConnectionClosedOrNull() {
+ AsyncClusterConnection asyncClusterConnection =
masterService.getAsyncClusterConnection();
+ return asyncClusterConnection == null || asyncClusterConnection.isClosed();
+ }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 699d9f963da..8a05986f592 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNameTestRule;
import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
@@ -134,6 +135,9 @@ public class TestLogsCleaner {
when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection());
when(masterServices.getReplicationLogCleanerBarrier())
.thenReturn(new ReplicationLogCleanerBarrier());
+ AsyncClusterConnection asyncClusterConnection =
mock(AsyncClusterConnection.class);
+
when(masterServices.getAsyncClusterConnection()).thenReturn(asyncClusterConnection);
+ when(asyncClusterConnection.isClosed()).thenReturn(false);
ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
when(masterServices.getReplicationPeerManager()).thenReturn(rpm);
when(rpm.getQueueStorage()).thenReturn(queueStorage);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java
index a1850b68eba..487ae63a6d3 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
@@ -39,6 +40,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerManager;
@@ -61,6 +63,7 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@@ -77,16 +80,21 @@ public class TestReplicationLogCleaner {
private ReplicationLogCleaner cleaner;
+ private ReplicationPeerManager rpm;
+
@Before
public void setUp() throws ReplicationException {
services = mock(MasterServices.class);
when(services.getReplicationLogCleanerBarrier()).thenReturn(new
ReplicationLogCleanerBarrier());
- ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
+ AsyncClusterConnection asyncClusterConnection =
mock(AsyncClusterConnection.class);
+
when(services.getAsyncClusterConnection()).thenReturn(asyncClusterConnection);
+ when(asyncClusterConnection.isClosed()).thenReturn(false);
+ rpm = mock(ReplicationPeerManager.class);
when(services.getReplicationPeerManager()).thenReturn(rpm);
when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class);
when(rpm.getQueueStorage()).thenReturn(rqs);
- when(rpm.getQueueStorage().hasData()).thenReturn(true);
+ when(rqs.hasData()).thenReturn(true);
when(rqs.listAllQueues()).thenReturn(new ArrayList<>());
ServerManager sm = mock(ServerManager.class);
when(services.getServerManager()).thenReturn(sm);
@@ -383,4 +391,39 @@ public class TestReplicationLogCleaner {
assertSame(file, iter.next());
assertFalse(iter.hasNext());
}
+
+ @Test
+ public void testPreCleanWhenAsyncClusterConnectionClosed() throws
ReplicationException {
+ assertFalse(services.getAsyncClusterConnection().isClosed());
+ verify(services.getAsyncClusterConnection(), Mockito.times(1)).isClosed();
+ cleaner.preClean();
+ verify(services.getAsyncClusterConnection(), Mockito.times(2)).isClosed();
+ verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
+
+ when(services.getAsyncClusterConnection().isClosed()).thenReturn(true);
+ assertTrue(services.getAsyncClusterConnection().isClosed());
+ verify(services.getAsyncClusterConnection(), Mockito.times(3)).isClosed();
+ cleaner.preClean();
+ verify(services.getAsyncClusterConnection(), Mockito.times(4)).isClosed();
+ // rpm.getQueueStorage().hasData() was not executed, indicating an early
return.
+ verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
+ }
+
+ @Test
+ public void testGetDeletableFilesWhenAsyncClusterConnectionClosed() throws
ReplicationException {
+ List<FileStatus> files = List.of(new FileStatus());
+ assertFalse(services.getAsyncClusterConnection().isClosed());
+ verify(services.getAsyncClusterConnection(), Mockito.times(1)).isClosed();
+ cleaner.getDeletableFiles(files);
+ verify(services.getAsyncClusterConnection(), Mockito.times(2)).isClosed();
+ verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
+
+ when(services.getAsyncClusterConnection().isClosed()).thenReturn(true);
+ assertTrue(services.getAsyncClusterConnection().isClosed());
+ verify(services.getAsyncClusterConnection(), Mockito.times(3)).isClosed();
+ cleaner.getDeletableFiles(files);
+ verify(services.getAsyncClusterConnection(), Mockito.times(4)).isClosed();
+ // rpm.getQueueStorage().hasData() was not executed, indicating an early
return.
+ verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
+ }
}