Repository: helix Updated Branches: refs/heads/master f9f6cfed3 -> 96eb69186 (forced update)
Fix resource monitor race condition. The async monitor processing may cause resource mbean deleting failure. This will leave unnecessary mbean in the mbean server. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/96eb6918 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/96eb6918 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/96eb6918 Branch: refs/heads/master Commit: 96eb69186072b1386eea875ffb0fbdcba6070453 Parents: 5245f3b Author: Jiajun Wang <[email protected]> Authored: Tue May 29 17:09:28 2018 -0700 Committer: jiajunwang <[email protected]> Committed: Tue Jul 10 11:29:49 2018 -0700 ---------------------------------------------------------------------- .../stages/CurrentStateComputationStage.java | 18 ++---------------- .../stages/ExternalViewComputeStage.java | 7 +++++++ .../monitoring/mbeans/ClusterStatusMonitor.java | 10 +++++++--- 3 files changed, 16 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/96eb6918/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index 4d9318c..64cd4f4 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -31,7 +31,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; /** @@ -73,21 +72,8 @@ public class CurrentStateComputationStage extends AbstractBaseStage { } if (!cache.isTaskCache()) { - final ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name()); - asyncExecute(cache.getAsyncTasksThreadPool(), new Callable<Object>() { - @Override - public Object call() { - for (Resource resource : resourceMap.values()) { - int totalPendingMessageCount = 0; - for (Partition partition : resource.getPartitions()) { - totalPendingMessageCount += - currentStateOutput.getPendingMessageMap(resource.getResourceName(), partition).size(); - } - clusterStatusMonitor.updatePendingMessages(resource.getResourceName(), totalPendingMessageCount); - } - return null; - } - }); + ClusterStatusMonitor clusterStatusMonitor = + event.getAttribute(AttributeName.clusterStatusMonitor.name()); // TODO Update the status async -- jjwang updateTopStateStatus(cache, clusterStatusMonitor, resourceMap, currentStateOutput); } http://git-wip-us.apache.org/repos/asf/helix/blob/96eb6918/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java index 591867d..1f455cb 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java @@ -75,6 +75,8 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage { view.setBucketSize(currentStateOutput.getBucketSize(resourceName)); } + int totalPendingMessageCount = 0; + for (Partition partition : resource.getPartitions()) { Map<String, String> currentStateMap = currentStateOutput.getCurrentStateMap(resourceName, partition); @@ -88,7 +90,10 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage { // } } } + totalPendingMessageCount += + currentStateOutput.getPendingMessageMap(resource.getResourceName(), partition).size(); } + // Update cluster status monitor mbean IdealState idealState = cache.getIdealState(resourceName); if (!cache.isTaskCache()) { @@ -105,6 +110,8 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage { clusterStatusMonitor .setResourceStatus(view, cache.getIdealState(view.getResourceName()), stateModelDef); + clusterStatusMonitor + .updatePendingMessages(resource.getResourceName(), totalPendingMessageCount); } } else { // Drop the metrics if the resource is dropped, or the MonitorDisabled is changed to true. http://git-wip-us.apache.org/repos/asf/helix/blob/96eb6918/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java index 133d8ce..5d398eb 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java @@ -448,10 +448,14 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { } public synchronized void updatePendingMessages(String resourceName, int messageCount) { - ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName); + try { + ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName); - if (resourceMonitor != null) { - resourceMonitor.updatePendingStateTransitionMessages(messageCount); + if (resourceMonitor != null) { + resourceMonitor.updatePendingStateTransitionMessages(messageCount); + } + } catch (Exception e) { + LOG.error("Fail to update resource pending messages, resource: " + resourceName, e); } }
