This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 1bb2168bf3 Fix controller mode HAService removeConnection (#9897)
1bb2168bf3 is described below
commit 1bb2168bf35351890154017766de4bc0be2f1e59
Author: littleboy <[email protected]>
AuthorDate: Fri Dec 5 11:28:07 2025 +0800
Fix controller mode HAService removeConnection (#9897)
* fix controller mode HAService removeConnection
* fix store test
---
.../store/ha/autoswitch/AutoSwitchHAService.java | 17 ++++++++++++-----
.../store/ha/autoswitch/AutoSwitchHATest.java | 19 ++++++++++++++-----
2 files changed, 26 insertions(+), 10 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 64dad9aef2..9af47693f1 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -100,12 +100,19 @@ public class AutoSwitchHAService extends DefaultHAService
{
@Override
public void removeConnection(HAConnection conn) {
if (!defaultMessageStore.isShutdown()) {
- final Set<Long> syncStateSet = getLocalSyncStateSet();
Long slave = ((AutoSwitchHAConnection) conn).getSlaveId();
- if (syncStateSet.contains(slave)) {
- syncStateSet.remove(slave);
- markSynchronizingSyncStateSet(syncStateSet);
- notifySyncStateSetChanged(syncStateSet);
+ this.writeLock.lock();
+ try {
+ final Set<Long> newSyncStateSet = new
HashSet<>(this.syncStateSet);
+ if (newSyncStateSet.contains(slave)) {
+ newSyncStateSet.remove(slave);
+ markSynchronizingSyncStateSet(newSyncStateSet);
+ notifySyncStateSetChanged(newSyncStateSet);
+ this.syncStateSet.clear();
+ this.syncStateSet.addAll(newSyncStateSet);
+ }
+ } finally {
+ this.writeLock.unlock();
}
}
super.removeConnection(conn);
diff --git
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index 86371ea900..519af44159 100644
---
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -221,12 +221,12 @@ public class AutoSwitchHATest {
// Step2, shutdown store2
this.messageStore2.shutdown();
- // Put message, which should put failed.
+ // Put message, which should succeed because slave is removed from
syncStateSet, only master remains
final PutMessageResult putMessageResult =
this.messageStore1.putMessage(buildMessage());
- assertEquals(putMessageResult.getPutMessageStatus(),
PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
+
assertEquals(PutMessageStatus.PUT_OK,putMessageResult.getPutMessageStatus());
- // The confirmOffset still don't change, because syncStateSet contains
broker2, but broker2 shutdown
- assertEquals(confirmOffset, this.messageStore1.getConfirmOffset());
+ // The confirmOffset should update because syncStateSet only contains
master after slave shutdown
+ assertTrue(this.messageStore1.getConfirmOffset() >= confirmOffset);
// Step3, shutdown store1, start store2, change store2 to master,
epoch = 2
this.messageStore1.shutdown();
@@ -296,10 +296,19 @@ public class AutoSwitchHATest {
this.messageStore2.shutdown();
this.messageStore2.destroy();
+ // Wait for connection to be removed and syncStateSet to be updated by
removeConnection
+ await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ AutoSwitchHAService haService = (AutoSwitchHAService)
this.messageStore1.getHaService();
+ return haService.getConnectionCount().get() == 0
+ && haService.getLocalSyncStateSet().size() == 1;
+ });
+
+ // Now manually set syncStateSet back to {1, 2} to test the scenario
where
+ // syncStateSet contains a disconnected slave
((AutoSwitchHAService)
this.messageStore1.getHaService()).setSyncStateSet(result);
final PutMessageResult putMessageResult =
this.messageStore1.putMessage(buildMessage());
- assertEquals(putMessageResult.getPutMessageStatus(),
PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
+
assertEquals(PutMessageStatus.FLUSH_SLAVE_TIMEOUT,putMessageResult.getPutMessageStatus());
}
@Ignore