This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1bb2168bf3 Fix controller mode HAService removeConnection (#9897)
1bb2168bf3 is described below

commit 1bb2168bf35351890154017766de4bc0be2f1e59
Author: littleboy <[email protected]>
AuthorDate: Fri Dec 5 11:28:07 2025 +0800

    Fix controller mode HAService removeConnection (#9897)
    
    * fix controller mode HAService removeConnection
    
    * fix store test
---
 .../store/ha/autoswitch/AutoSwitchHAService.java      | 17 ++++++++++++-----
 .../store/ha/autoswitch/AutoSwitchHATest.java         | 19 ++++++++++++++-----
 2 files changed, 26 insertions(+), 10 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 64dad9aef2..9af47693f1 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -100,12 +100,19 @@ public class AutoSwitchHAService extends DefaultHAService 
{
     @Override
     public void removeConnection(HAConnection conn) {
         if (!defaultMessageStore.isShutdown()) {
-            final Set<Long> syncStateSet = getLocalSyncStateSet();
             Long slave = ((AutoSwitchHAConnection) conn).getSlaveId();
-            if (syncStateSet.contains(slave)) {
-                syncStateSet.remove(slave);
-                markSynchronizingSyncStateSet(syncStateSet);
-                notifySyncStateSetChanged(syncStateSet);
+            this.writeLock.lock();
+            try {
+                final Set<Long> newSyncStateSet = new 
HashSet<>(this.syncStateSet);
+                if (newSyncStateSet.contains(slave)) {
+                    newSyncStateSet.remove(slave);
+                    markSynchronizingSyncStateSet(newSyncStateSet);
+                    notifySyncStateSetChanged(newSyncStateSet);
+                    this.syncStateSet.clear();
+                    this.syncStateSet.addAll(newSyncStateSet);
+                }
+            } finally {
+                this.writeLock.unlock();
             }
         }
         super.removeConnection(conn);
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index 86371ea900..519af44159 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -221,12 +221,12 @@ public class AutoSwitchHATest {
         // Step2, shutdown store2
         this.messageStore2.shutdown();
 
-        // Put message, which should put failed.
+        // Put message, which should succeed because slave is removed from 
syncStateSet, only master remains
         final PutMessageResult putMessageResult = 
this.messageStore1.putMessage(buildMessage());
-        assertEquals(putMessageResult.getPutMessageStatus(), 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
+        
assertEquals(PutMessageStatus.PUT_OK,putMessageResult.getPutMessageStatus());
 
-        // The confirmOffset still don't change, because syncStateSet contains 
broker2, but broker2 shutdown
-        assertEquals(confirmOffset, this.messageStore1.getConfirmOffset());
+        // The confirmOffset should update because syncStateSet only contains 
master after slave shutdown
+        assertTrue(this.messageStore1.getConfirmOffset() >= confirmOffset);
 
         // Step3, shutdown store1, start store2, change store2 to master, 
epoch = 2
         this.messageStore1.shutdown();
@@ -296,10 +296,19 @@ public class AutoSwitchHATest {
         this.messageStore2.shutdown();
         this.messageStore2.destroy();
 
+        // Wait for connection to be removed and syncStateSet to be updated by 
removeConnection
+        await().atMost(10, TimeUnit.SECONDS).until(() -> {
+            AutoSwitchHAService haService = (AutoSwitchHAService) 
this.messageStore1.getHaService();
+            return haService.getConnectionCount().get() == 0
+                && haService.getLocalSyncStateSet().size() == 1;
+        });
+
+        // Now manually set syncStateSet back to {1, 2} to test the scenario 
where
+        // syncStateSet contains a disconnected slave
         ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setSyncStateSet(result);
 
         final PutMessageResult putMessageResult = 
this.messageStore1.putMessage(buildMessage());
-        assertEquals(putMessageResult.getPutMessageStatus(), 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
+        
assertEquals(PutMessageStatus.FLUSH_SLAVE_TIMEOUT,putMessageResult.getPutMessageStatus());
     }
 
     @Ignore

Reply via email to