This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d5d5a637f [ISSUE #6226] Shutdown flowMonitor when connection 
disconnect (#6227)
d5d5a637f is described below

commit d5d5a637f6c4f2eeb9430e068c8d620b4010c16d
Author: rongtong <jinrongto...@163.com>
AuthorDate: Fri Mar 3 22:28:15 2023 +0800

    [ISSUE #6226] Shutdown flowMonitor when connection disconnect (#6227)
    
    * Shutdown flowMonitor when connection disconnect
    
    * Whether the master is existed has more accurate judgment
    
    * Shutdown flowMonitor when HAClient service end
    
    * Pass the check style
    
    * Modify according to comments
---
 .../org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java   | 3 ++-
 .../src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java  | 2 ++
 .../main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java  | 4 ++++
 .../org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java  | 3 +++
 .../apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java  | 5 +++++
 5 files changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
index 997dee3c5..2674dabf5 100644
--- 
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
+++ 
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.controller.impl.manager;
 
 import java.util.HashSet;
 import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
 
 /**
  * Manages the syncStateSet of broker replicas.
@@ -53,7 +54,7 @@ public class SyncStateInfo {
     }
 
     public boolean isMasterExist() {
-        return !this.masterAddress.isEmpty();
+        return !StringUtils.isBlank(masterAddress);
     }
 
     public String getClusterName() {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java 
b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java
index 06878c185..530d295ae 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java
@@ -309,6 +309,7 @@ public class DefaultHAClient extends ServiceThread 
implements HAClient {
             try {
                 switch (this.currentState) {
                     case SHUTDOWN:
+                        this.flowMonitor.shutdown(true);
                         return;
                     case READY:
                         if (!this.connectMaster()) {
@@ -339,6 +340,7 @@ public class DefaultHAClient extends ServiceThread 
implements HAClient {
             }
         }
 
+        this.flowMonitor.shutdown(true);
         log.info(this.getServiceName() + " service end");
     }
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java 
b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java
index e05e0ce23..5dd24410e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java
@@ -195,6 +195,8 @@ public class DefaultHAConnection implements HAConnection {
                 log.error("", e);
             }
 
+            flowMonitor.shutdown(true);
+
             log.info(this.getServiceName() + " service end");
         }
 
@@ -398,6 +400,8 @@ public class DefaultHAConnection implements HAConnection {
                 DefaultHAConnection.log.error("", e);
             }
 
+            flowMonitor.shutdown(true);
+
             DefaultHAConnection.log.info(this.getServiceName() + " service 
end");
         }
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 554c64fd1..b95d3814a 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -398,6 +398,7 @@ public class AutoSwitchHAClient extends ServiceThread 
implements HAClient {
             try {
                 switch (this.currentState) {
                     case SHUTDOWN:
+                        this.flowMonitor.shutdown(true);
                         return;
                     case READY:
                         // Truncate invalid msg first
@@ -437,6 +438,8 @@ public class AutoSwitchHAClient extends ServiceThread 
implements HAClient {
             }
         }
 
+        this.flowMonitor.shutdown(true);
+        LOGGER.info(this.getServiceName() + " service end");
     }
 
     /**
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index 7401574e5..57f9e9619 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -294,6 +294,8 @@ public class AutoSwitchHAConnection implements HAConnection 
{
                 AutoSwitchHAConnection.LOGGER.error("", e);
             }
 
+            flowMonitor.shutdown(true);
+
             AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " 
service end");
         }
 
@@ -739,6 +741,9 @@ public class AutoSwitchHAConnection implements HAConnection 
{
             } catch (IOException e) {
                 AutoSwitchHAConnection.LOGGER.error("", e);
             }
+
+            flowMonitor.shutdown(true);
+
             AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + " 
service end");
         }
 

Reply via email to