Fix the issue that resource MBean may not be cleaned up when the resource is dropped.
If a resource is not successfully created on any participant, and it is removed in this situation, the corresponding MBean maybe left over by the controller. This fix will ensure all resource MBeans that are no longer related to any living resource to be removed. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3deeeaba Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3deeeaba Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3deeeaba Branch: refs/heads/master Commit: 3deeeabaa988bf40c0ba953209dd6b26df984552 Parents: 964b802 Author: Jiajun Wang <[email protected]> Authored: Thu Jul 12 15:20:19 2018 -0700 Committer: jiajunwang <[email protected]> Committed: Tue Jul 17 11:53:26 2018 -0700 ---------------------------------------------------------------------- .../stages/ExternalViewComputeStage.java | 71 ++++++++++----- .../monitoring/mbeans/ClusterStatusMonitor.java | 53 +++++++---- .../mbeans/TestDropResourceMetricsReset.java | 94 ++++++++++++++++---- 3 files changed, 160 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/3deeeaba/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java index bf7be01..d901327 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java @@ -19,21 +19,41 @@ package org.apache.helix.controller.stages; * under the License. */ -import org.apache.helix.*; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixManager; +import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.ZNRecord; +import org.apache.helix.ZNRecordDelta; import org.apache.helix.ZNRecordDelta.MergeOperation; import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; import org.apache.helix.controller.pipeline.AsyncWorkerType; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory; -import org.apache.helix.model.*; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.Message; import org.apache.helix.model.Message.MessageType; +import org.apache.helix.model.Partition; +import org.apache.helix.model.Resource; +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.model.StatusUpdate; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; public class ExternalViewComputeStage extends AbstractAsyncBaseStage { private static Logger LOG = LoggerFactory.getLogger(ExternalViewComputeStage.class); @@ -60,8 +80,11 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage { CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name()); + ClusterStatusMonitor clusterStatusMonitor = + event.getAttribute(AttributeName.clusterStatusMonitor.name()); List<ExternalView> newExtViews = new ArrayList<>(); + Set<String> monitoringResources = new HashSet<>(); Map<String, ExternalView> curExtViews = cache.getExternalViews(); @@ -100,24 +123,19 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage { IdealState idealState = cache.getIdealState(resourceName); if (!cache.isTaskCache()) { ResourceConfig resourceConfig = cache.getResourceConfig(resourceName); - ClusterStatusMonitor clusterStatusMonitor = - event.getAttribute(AttributeName.clusterStatusMonitor.name()); if (clusterStatusMonitor != null) { - if (idealState != null && (resourceConfig == null || !resourceConfig - .isMonitoringDisabled())) { - if (!idealState.getStateModelDefRef() - .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) { - StateModelDefinition stateModelDef = - cache.getStateModelDef(idealState.getStateModelDefRef()); - clusterStatusMonitor - .setResourceStatus(view, cache.getIdealState(view.getResourceName()), - stateModelDef); - clusterStatusMonitor - .updatePendingMessages(resource.getResourceName(), totalPendingMessageCount); - } - } else { - // Drop the metrics if the resource is dropped, or the MonitorDisabled is changed to true. - clusterStatusMonitor.unregisterResource(view.getResourceName()); + if (idealState != null // has ideal state + && (resourceConfig == null || !resourceConfig.isMonitoringDisabled()) // monitoring not disabled + && !idealState.getStateModelDefRef() // and not a job resource + .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) { + StateModelDefinition stateModelDef = + cache.getStateModelDef(idealState.getStateModelDefRef()); + clusterStatusMonitor + .setResourceStatus(view, cache.getIdealState(view.getResourceName()), + stateModelDef); + clusterStatusMonitor + .updatePendingMessages(resource.getResourceName(), totalPendingMessageCount); + monitoringResources.add(resourceName); } } } @@ -145,7 +163,12 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage { } } - List<String> externalviewsToRemove = new ArrayList<>(); + // Keep MBeans for existing resources and unregister MBeans for dropped resources + if (clusterStatusMonitor != null) { + clusterStatusMonitor.retainResourceMonitor(monitoringResources); + } + + List<String> externalViewsToRemove = new ArrayList<>(); // TODO: consider not setting the externalview of SCHEDULER_TASK_QUEUE at all. // Are there any entity that will be interested in its change? @@ -163,7 +186,7 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage { LogUtil .logInfo(LOG, _eventId, "Remove externalView for resource: " + resourceName); dataAccessor.removeProperty(keyBuilder.externalView(resourceName)); - externalviewsToRemove.add(resourceName); + externalViewsToRemove.add(resourceName); } } else { keys.add(keyBuilder.externalView(resourceName)); @@ -181,10 +204,10 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage { if (!resourceMap.keySet().contains(resourceName)) { LogUtil.logInfo(LOG, _eventId, "Remove externalView for resource: " + resourceName); dataAccessor.removeProperty(keyBuilder.externalView(resourceName)); - externalviewsToRemove.add(resourceName); + externalViewsToRemove.add(resourceName); } } - cache.removeExternalViews(externalviewsToRemove); + cache.removeExternalViews(externalViewsToRemove); } private void updateScheduledTaskStatus(ExternalView ev, HelixManager manager, http://git-wip-us.apache.org/repos/asf/helix/blob/3deeeaba/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java index 52d06e3..e3655d8 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java @@ -19,12 +19,22 @@ package org.apache.helix.monitoring.mbeans; * under the License. */ +import com.google.common.base.Joiner; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import java.util.concurrent.atomic.AtomicLong; import org.apache.helix.controller.stages.BestPossibleStateOutput; -import org.apache.helix.model.*; -import org.apache.helix.task.*; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.Message; +import org.apache.helix.model.Partition; +import org.apache.helix.model.Resource; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.WorkflowConfig; +import org.apache.helix.task.WorkflowContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,8 +43,16 @@ import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import java.lang.management.ManagementFactory; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { @@ -398,20 +416,21 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean { } /** - * Indicate that a resource has been dropped, thus making it OK to drop its metrics - * @param resourceName the resource that has been dropped + * Cleanup resource monitors. Keep the monitors if only exist in the input set. + * @param resourceNames the resources that still exist */ - public void unregisterResource(String resourceName) { - if (_resourceMbeanMap.containsKey(resourceName)) { - synchronized (this) { - if (_resourceMbeanMap.containsKey(resourceName)) { - try { - unregisterResources(Arrays.asList(resourceName)); - } catch (MalformedObjectNameException e) { - LOG.error("Could not unregister beans for " + resourceName, e); - } - } - } + public void retainResourceMonitor(Set<String> resourceNames) { + Set<String> resourcesToRemove = new HashSet<>(); + synchronized (this) { + resourcesToRemove.addAll(_resourceMbeanMap.keySet()); + } + resourcesToRemove.removeAll(resourceNames); + + try { + unregisterResources(resourcesToRemove); + } catch (MalformedObjectNameException e) { + LOG.error(String.format("Could not unregister beans for the following resources: %s", + Joiner.on(',').join(resourcesToRemove)), e); } } http://git-wip-us.apache.org/repos/asf/helix/blob/3deeeaba/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java index b815160..04e79e6 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java @@ -19,15 +19,6 @@ package org.apache.helix.monitoring.mbeans; * under the License. */ -import java.io.IOException; -import java.util.Date; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import javax.management.InstanceNotFoundException; -import javax.management.MBeanServerConnection; -import javax.management.MBeanServerNotification; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; import org.apache.helix.TestHelper; import org.apache.helix.ZkUnitTestBase; import org.apache.helix.integration.manager.ClusterControllerManager; @@ -35,23 +26,39 @@ import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.tools.ClusterSetup; import org.testng.Assert; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanServerConnection; +import javax.management.MBeanServerNotification; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import java.io.IOException; +import java.util.Date; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + public class TestDropResourceMetricsReset extends ZkUnitTestBase { - private final CountDownLatch _registerLatch = new CountDownLatch(1); - private final CountDownLatch _unregisterLatch = new CountDownLatch(1); + private CountDownLatch _registerLatch; + private CountDownLatch _unregisterLatch; + private String _className = TestHelper.getTestClassName(); + + @BeforeMethod + public void beforeMethod() { + _registerLatch = new CountDownLatch(1); + _unregisterLatch = new CountDownLatch(1); + } @Test public void testBasic() throws Exception { final int NUM_PARTICIPANTS = 4; final int NUM_PARTITIONS = 64; final int NUM_REPLICAS = 1; - final String RESOURCE_NAME = "TestDB0"; + final String RESOURCE_NAME = "BasicDB0"; - String className = TestHelper.getTestClassName(); String methodName = TestHelper.getTestMethodName(); - String clusterName = className + "_" + methodName; - System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + String clusterName = _className + "_" + methodName; ParticipantMonitorListener listener = new ParticipantMonitorListener("ClusterStatus", clusterName, RESOURCE_NAME); @@ -59,7 +66,7 @@ public class TestDropResourceMetricsReset extends ZkUnitTestBase { // Set up cluster TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port "localhost", // participant name prefix - "TestDB", // resource name prefix + "BasicDB", // resource name prefix 1, // resources NUM_PARTITIONS, // partitions per resource NUM_PARTICIPANTS, // number of nodes @@ -97,7 +104,60 @@ public class TestDropResourceMetricsReset extends ZkUnitTestBase { participant.syncStop(); } TestHelper.dropCluster(clusterName, _gZkClient); - System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + + @Test (dependsOnMethods = "testBasic") + public void testDropWithNoCurrentState() throws Exception { + final int NUM_PARTICIPANTS = 1; + final int NUM_PARTITIONS = 1; + final int NUM_REPLICAS = 1; + final String RESOURCE_NAME = "TestDB0"; + + String methodName = TestHelper.getTestMethodName(); + String clusterName = _className + "_" + methodName; + + ParticipantMonitorListener listener = + new ParticipantMonitorListener("ClusterStatus", clusterName, RESOURCE_NAME); + + // Set up cluster + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + NUM_PARTITIONS, // partitions per resource + NUM_PARTICIPANTS, // number of nodes + NUM_REPLICAS, // replicas + "MasterSlave", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to test node tagging + true); // do rebalance + + // Start participants and controller + ClusterSetup setupTool = new ClusterSetup(_gZkClient); + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12918"); + participant.syncStart(); + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0"); + controller.syncStart(); + + // Verify that the bean was created + boolean noTimeout = _registerLatch.await(30000, TimeUnit.MILLISECONDS); + Assert.assertTrue(noTimeout); + + // stop the participant, so the resource does not exist in any current states. + participant.syncStop(); + + // Drop the resource + setupTool.dropResourceFromCluster(clusterName, RESOURCE_NAME); + + // Verify that the bean was removed + noTimeout = _unregisterLatch.await(30000, TimeUnit.MILLISECONDS); + Assert.assertTrue(noTimeout); + + // Clean up + listener.disconnect(); + controller.syncStop(); + + TestHelper.dropCluster(clusterName, _gZkClient); } private ObjectName getObjectName(String resourceName, String clusterName)
