This is an automated email from the ASF dual-hosted git repository.

lupeng pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new f790ab13f53 HBASE-29376 
ReplicationLogCleaner.preClean/getDeletableFiles should return early when 
asyncClusterConnection closes during HMaster stopping (#7071)
f790ab13f53 is described below

commit f790ab13f539fbb5a62f06b440ab240eb086ee4d
Author: Peng Lu <[email protected]>
AuthorDate: Sat Sep 6 22:27:11 2025 +0800

    HBASE-29376 ReplicationLogCleaner.preClean/getDeletableFiles should return 
early when asyncClusterConnection closes during HMaster stopping (#7071)
    
    Signed-off-by: Duo Zhang <[email protected]>
---
 .../replication/master/ReplicationLogCleaner.java  | 33 ++++++++++++---
 .../hbase/master/cleaner/TestLogsCleaner.java      |  4 ++
 .../master/TestReplicationLogCleaner.java          | 47 +++++++++++++++++++++-
 3 files changed, 76 insertions(+), 8 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 7fc8feae72a..caf4bac088d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -28,6 +28,7 @@ import java.util.stream.Stream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
@@ -65,6 +66,7 @@ public class ReplicationLogCleaner extends 
BaseLogCleanerDelegate {
   // queue for a given peer, that why we can use a String peerId as key 
instead of
   // ReplicationQueueId.
   private Map<ServerName, Map<String, Map<String, ReplicationGroupOffset>>> 
replicationOffsets;
+  private MasterServices masterService;
   private ReplicationLogCleanerBarrier barrier;
   private ReplicationPeerManager rpm;
   private Supplier<Set<ServerName>> getNotFullyDeadServers;
@@ -74,9 +76,12 @@ public class ReplicationLogCleaner extends 
BaseLogCleanerDelegate {
 
   @Override
   public void preClean() {
-    if (this.getConf() == null) {
+    if (this.getConf() == null || isAsyncClusterConnectionClosedOrNull()) {
+      LOG.warn(
+        "Skipping preClean because configuration is null or 
asyncClusterConnection is unavailable.");
       return;
     }
+
     try {
       if (!rpm.getQueueStorage().hasData()) {
         return;
@@ -192,6 +197,13 @@ public class ReplicationLogCleaner extends 
BaseLogCleanerDelegate {
     if (this.getConf() == null) {
       return files;
     }
+
+    if (isAsyncClusterConnectionClosedOrNull()) {
+      LOG.warn("Skip getting deletable files because asyncClusterConnection is 
unavailable.");
+      // asyncClusterConnection is unavailable, we shouldn't delete any files.
+      return Collections.emptyList();
+    }
+
     try {
       if (!rpm.getQueueStorage().hasData()) {
         return files;
@@ -251,11 +263,11 @@ public class ReplicationLogCleaner extends 
BaseLogCleanerDelegate {
     super.init(params);
     if (MapUtils.isNotEmpty(params)) {
       Object master = params.get(HMaster.MASTER);
-      if (master != null && master instanceof MasterServices) {
-        MasterServices m = (MasterServices) master;
-        barrier = m.getReplicationLogCleanerBarrier();
-        rpm = m.getReplicationPeerManager();
-        getNotFullyDeadServers = () -> getNotFullyDeadServers(m);
+      if (master instanceof MasterServices) {
+        masterService = (MasterServices) master;
+        barrier = masterService.getReplicationLogCleanerBarrier();
+        rpm = masterService.getReplicationPeerManager();
+        getNotFullyDeadServers = () -> getNotFullyDeadServers(masterService);
         return;
       }
     }
@@ -271,4 +283,13 @@ public class ReplicationLogCleaner extends 
BaseLogCleanerDelegate {
   public boolean isStopped() {
     return this.stopped;
   }
+
+  /**
+   * Check if asyncClusterConnection is null or closed.
+   * @return true if asyncClusterConnection is null or is closed, false 
otherwise
+   */
+  private boolean isAsyncClusterConnectionClosedOrNull() {
+    AsyncClusterConnection asyncClusterConnection = 
masterService.getAsyncClusterConnection();
+    return asyncClusterConnection == null || asyncClusterConnection.isClosed();
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 699d9f963da..8a05986f592 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNameTestRule;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MasterServices;
@@ -134,6 +135,9 @@ public class TestLogsCleaner {
     when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection());
     when(masterServices.getReplicationLogCleanerBarrier())
       .thenReturn(new ReplicationLogCleanerBarrier());
+    AsyncClusterConnection asyncClusterConnection = 
mock(AsyncClusterConnection.class);
+    
when(masterServices.getAsyncClusterConnection()).thenReturn(asyncClusterConnection);
+    when(asyncClusterConnection.isClosed()).thenReturn(false);
     ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
     when(masterServices.getReplicationPeerManager()).thenReturn(rpm);
     when(rpm.getQueueStorage()).thenReturn(queueStorage);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java
index a1850b68eba..487ae63a6d3 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
@@ -39,6 +40,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.ServerManager;
@@ -61,6 +63,7 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 
@@ -77,16 +80,21 @@ public class TestReplicationLogCleaner {
 
   private ReplicationLogCleaner cleaner;
 
+  private ReplicationPeerManager rpm;
+
   @Before
   public void setUp() throws ReplicationException {
     services = mock(MasterServices.class);
     when(services.getReplicationLogCleanerBarrier()).thenReturn(new 
ReplicationLogCleanerBarrier());
-    ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
+    AsyncClusterConnection asyncClusterConnection = 
mock(AsyncClusterConnection.class);
+    
when(services.getAsyncClusterConnection()).thenReturn(asyncClusterConnection);
+    when(asyncClusterConnection.isClosed()).thenReturn(false);
+    rpm = mock(ReplicationPeerManager.class);
     when(services.getReplicationPeerManager()).thenReturn(rpm);
     when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
     ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class);
     when(rpm.getQueueStorage()).thenReturn(rqs);
-    when(rpm.getQueueStorage().hasData()).thenReturn(true);
+    when(rqs.hasData()).thenReturn(true);
     when(rqs.listAllQueues()).thenReturn(new ArrayList<>());
     ServerManager sm = mock(ServerManager.class);
     when(services.getServerManager()).thenReturn(sm);
@@ -383,4 +391,39 @@ public class TestReplicationLogCleaner {
     assertSame(file, iter.next());
     assertFalse(iter.hasNext());
   }
+
+  @Test
+  public void testPreCleanWhenAsyncClusterConnectionClosed() throws 
ReplicationException {
+    assertFalse(services.getAsyncClusterConnection().isClosed());
+    verify(services.getAsyncClusterConnection(), Mockito.times(1)).isClosed();
+    cleaner.preClean();
+    verify(services.getAsyncClusterConnection(), Mockito.times(2)).isClosed();
+    verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
+
+    when(services.getAsyncClusterConnection().isClosed()).thenReturn(true);
+    assertTrue(services.getAsyncClusterConnection().isClosed());
+    verify(services.getAsyncClusterConnection(), Mockito.times(3)).isClosed();
+    cleaner.preClean();
+    verify(services.getAsyncClusterConnection(), Mockito.times(4)).isClosed();
+    // rpm.getQueueStorage().hasData() was not executed, indicating an early 
return.
+    verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
+  }
+
+  @Test
+  public void testGetDeletableFilesWhenAsyncClusterConnectionClosed() throws 
ReplicationException {
+    List<FileStatus> files = List.of(new FileStatus());
+    assertFalse(services.getAsyncClusterConnection().isClosed());
+    verify(services.getAsyncClusterConnection(), Mockito.times(1)).isClosed();
+    cleaner.getDeletableFiles(files);
+    verify(services.getAsyncClusterConnection(), Mockito.times(2)).isClosed();
+    verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
+
+    when(services.getAsyncClusterConnection().isClosed()).thenReturn(true);
+    assertTrue(services.getAsyncClusterConnection().isClosed());
+    verify(services.getAsyncClusterConnection(), Mockito.times(3)).isClosed();
+    cleaner.getDeletableFiles(files);
+    verify(services.getAsyncClusterConnection(), Mockito.times(4)).isClosed();
+    // rpm.getQueueStorage().hasData() was not executed, indicating an early 
return.
+    verify(rpm.getQueueStorage(), Mockito.times(1)).hasData();
+  }
 }

Reply via email to