Repository: helix
Updated Branches:
  refs/heads/master f9f6cfed3 -> 96eb69186 (forced update)


Fix resource monitor race condition.

The async monitor processing may cause resource mbean deleting failure. This 
will leave unnecessary mbean in the mbean server.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/96eb6918
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/96eb6918
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/96eb6918

Branch: refs/heads/master
Commit: 96eb69186072b1386eea875ffb0fbdcba6070453
Parents: 5245f3b
Author: Jiajun Wang <[email protected]>
Authored: Tue May 29 17:09:28 2018 -0700
Committer: jiajunwang <[email protected]>
Committed: Tue Jul 10 11:29:49 2018 -0700

----------------------------------------------------------------------
 .../stages/CurrentStateComputationStage.java      | 18 ++----------------
 .../stages/ExternalViewComputeStage.java          |  7 +++++++
 .../monitoring/mbeans/ClusterStatusMonitor.java   | 10 +++++++---
 3 files changed, 16 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/96eb6918/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 4d9318c..64cd4f4 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -31,7 +31,6 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 
 
 /**
@@ -73,21 +72,8 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
     }
 
     if (!cache.isTaskCache()) {
-      final ClusterStatusMonitor clusterStatusMonitor = 
event.getAttribute(AttributeName.clusterStatusMonitor.name());
-      asyncExecute(cache.getAsyncTasksThreadPool(), new Callable<Object>() {
-        @Override
-        public Object call() {
-          for (Resource resource : resourceMap.values()) {
-            int totalPendingMessageCount = 0;
-            for (Partition partition : resource.getPartitions()) {
-              totalPendingMessageCount +=
-                  
currentStateOutput.getPendingMessageMap(resource.getResourceName(), 
partition).size();
-            }
-            
clusterStatusMonitor.updatePendingMessages(resource.getResourceName(), 
totalPendingMessageCount);
-          }
-          return null;
-        }
-      });
+      ClusterStatusMonitor clusterStatusMonitor =
+          event.getAttribute(AttributeName.clusterStatusMonitor.name());
       // TODO Update the status async -- jjwang
       updateTopStateStatus(cache, clusterStatusMonitor, resourceMap, 
currentStateOutput);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/96eb6918/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 591867d..1f455cb 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -75,6 +75,8 @@ public class ExternalViewComputeStage extends 
AbstractAsyncBaseStage {
         view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
       }
 
+      int totalPendingMessageCount = 0;
+
       for (Partition partition : resource.getPartitions()) {
         Map<String, String> currentStateMap =
             currentStateOutput.getCurrentStateMap(resourceName, partition);
@@ -88,7 +90,10 @@ public class ExternalViewComputeStage extends 
AbstractAsyncBaseStage {
             // }
           }
         }
+        totalPendingMessageCount +=
+            
currentStateOutput.getPendingMessageMap(resource.getResourceName(), 
partition).size();
       }
+
       // Update cluster status monitor mbean
       IdealState idealState = cache.getIdealState(resourceName);
       if (!cache.isTaskCache()) {
@@ -105,6 +110,8 @@ public class ExternalViewComputeStage extends 
AbstractAsyncBaseStage {
               clusterStatusMonitor
                   .setResourceStatus(view, 
cache.getIdealState(view.getResourceName()),
                       stateModelDef);
+              clusterStatusMonitor
+                  .updatePendingMessages(resource.getResourceName(), 
totalPendingMessageCount);
             }
           } else {
             // Drop the metrics if the resource is dropped, or the 
MonitorDisabled is changed to true.

http://git-wip-us.apache.org/repos/asf/helix/blob/96eb6918/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 133d8ce..5d398eb 100644
--- 
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -448,10 +448,14 @@ public class ClusterStatusMonitor implements 
ClusterStatusMonitorMBean {
   }
 
   public synchronized void updatePendingMessages(String resourceName, int 
messageCount) {
-    ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
+    try {
+      ResourceMonitor resourceMonitor = 
getOrCreateResourceMonitor(resourceName);
 
-    if (resourceMonitor != null) {
-      resourceMonitor.updatePendingStateTransitionMessages(messageCount);
+      if (resourceMonitor != null) {
+        resourceMonitor.updatePendingStateTransitionMessages(messageCount);
+      }
+    } catch (Exception e) {
+      LOG.error("Fail to update resource pending messages, resource: " + 
resourceName, e);
     }
   }
 

Reply via email to