This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new d5d5a637f [ISSUE #6226] Shutdown flowMonitor when connection disconnect (#6227) d5d5a637f is described below commit d5d5a637f6c4f2eeb9430e068c8d620b4010c16d Author: rongtong <jinrongto...@163.com> AuthorDate: Fri Mar 3 22:28:15 2023 +0800 [ISSUE #6226] Shutdown flowMonitor when connection disconnect (#6227) * Shutdown flowMonitor when connection disconnect * Whether the master is existed has more accurate judgment * Shutdown flowMonitor when HAClient service end * Pass the check style * Modify according to comments --- .../org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java | 3 ++- .../src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java | 2 ++ .../main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java | 4 ++++ .../org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java | 3 +++ .../apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java | 5 +++++ 5 files changed, 16 insertions(+), 1 deletion(-) diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java index 997dee3c5..2674dabf5 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.controller.impl.manager; import java.util.HashSet; import java.util.Set; +import org.apache.commons.lang3.StringUtils; /** * Manages the syncStateSet of broker replicas. @@ -53,7 +54,7 @@ public class SyncStateInfo { } public boolean isMasterExist() { - return !this.masterAddress.isEmpty(); + return !StringUtils.isBlank(masterAddress); } public String getClusterName() { diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java index 06878c185..530d295ae 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java @@ -309,6 +309,7 @@ public class DefaultHAClient extends ServiceThread implements HAClient { try { switch (this.currentState) { case SHUTDOWN: + this.flowMonitor.shutdown(true); return; case READY: if (!this.connectMaster()) { @@ -339,6 +340,7 @@ public class DefaultHAClient extends ServiceThread implements HAClient { } } + this.flowMonitor.shutdown(true); log.info(this.getServiceName() + " service end"); } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java index e05e0ce23..5dd24410e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java @@ -195,6 +195,8 @@ public class DefaultHAConnection implements HAConnection { log.error("", e); } + flowMonitor.shutdown(true); + log.info(this.getServiceName() + " service end"); } @@ -398,6 +400,8 @@ public class DefaultHAConnection implements HAConnection { DefaultHAConnection.log.error("", e); } + flowMonitor.shutdown(true); + DefaultHAConnection.log.info(this.getServiceName() + " service end"); } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java index 554c64fd1..b95d3814a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java @@ -398,6 +398,7 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient { try { switch (this.currentState) { case SHUTDOWN: + this.flowMonitor.shutdown(true); return; case READY: // Truncate invalid msg first @@ -437,6 +438,8 @@ public class AutoSwitchHAClient extends ServiceThread implements HAClient { } } + this.flowMonitor.shutdown(true); + LOGGER.info(this.getServiceName() + " service end"); } /** diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java index 7401574e5..57f9e9619 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java @@ -294,6 +294,8 @@ public class AutoSwitchHAConnection implements HAConnection { AutoSwitchHAConnection.LOGGER.error("", e); } + flowMonitor.shutdown(true); + AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service end"); } @@ -739,6 +741,9 @@ public class AutoSwitchHAConnection implements HAConnection { } catch (IOException e) { AutoSwitchHAConnection.LOGGER.error("", e); } + + flowMonitor.shutdown(true); + AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " service end"); }