Repository: helix Updated Branches: refs/heads/master 37f3d4c8d -> 542fbc840
[HELIX-709] Move external view calculation to async stage and re-organize pipeline Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/542fbc84 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/542fbc84 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/542fbc84 Branch: refs/heads/master Commit: 542fbc840a167986a40bd57f3c5660d294acb63c Parents: 37f3d4c Author: Harry Zhang <[email protected]> Authored: Mon Jul 9 12:16:56 2018 -0700 Committer: Harry Zhang <[email protected]> Committed: Mon Jul 9 12:16:56 2018 -0700 ---------------------------------------------------------------------- .../controller/GenericHelixController.java | 29 ++++++++++---------- .../controller/pipeline/AsyncWorkerType.java | 3 +- .../stages/ExternalViewComputeStage.java | 12 ++++++-- .../helix/manager/zk/CallbackHandler.java | 5 ++-- .../java/org/apache/helix/ZkUnitTestBase.java | 12 +++++++- 5 files changed, 39 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/542fbc84/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 7603975..474d621 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -248,11 +248,14 @@ public class GenericHelixController implements IdealStateChangeListener, Pipeline dataRefresh = new Pipeline(pipelineName); dataRefresh.addStage(new ReadClusterDataStage()); + // data pre-process pipeline + Pipeline dataPreprocess = new Pipeline(pipelineName); + dataPreprocess.addStage(new ResourceComputationStage()); + dataPreprocess.addStage(new ResourceValidationStage()); + dataPreprocess.addStage(new CurrentStateComputationStage()); + // rebalance pipeline Pipeline rebalancePipeline = new Pipeline(pipelineName); - rebalancePipeline.addStage(new ResourceComputationStage()); - rebalancePipeline.addStage(new ResourceValidationStage()); - rebalancePipeline.addStage(new CurrentStateComputationStage()); rebalancePipeline.addStage(new BestPossibleStateCalcStage()); rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new MessageGenerationPhase()); @@ -270,18 +273,16 @@ public class GenericHelixController implements IdealStateChangeListener, Pipeline liveInstancePipeline = new Pipeline(pipelineName); liveInstancePipeline.addStage(new CompatibilityCheckStage()); - registry.register(ClusterEventType.IdealStateChange, dataRefresh, rebalancePipeline); - registry.register(ClusterEventType.CurrentStateChange, dataRefresh, rebalancePipeline, externalViewPipeline); - registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, rebalancePipeline); - registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, rebalancePipeline); - registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, rebalancePipeline); - registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, liveInstancePipeline, rebalancePipeline, - externalViewPipeline); - registry.register(ClusterEventType.MessageChange, dataRefresh, rebalancePipeline); + registry.register(ClusterEventType.IdealStateChange, dataRefresh, dataPreprocess, rebalancePipeline); + registry.register(ClusterEventType.CurrentStateChange, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline); + registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, dataPreprocess, rebalancePipeline); + registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, dataPreprocess, rebalancePipeline); + registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, dataPreprocess, rebalancePipeline); + registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, liveInstancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline); + registry.register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline); registry.register(ClusterEventType.ExternalViewChange, dataRefresh); - registry.register(ClusterEventType.Resume, dataRefresh, rebalancePipeline, externalViewPipeline); - registry - .register(ClusterEventType.PeriodicalRebalance, dataRefresh, rebalancePipeline, externalViewPipeline); + registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline); + registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline); return registry; } } http://git-wip-us.apache.org/repos/asf/helix/blob/542fbc84/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java index 62e324c..443db31 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java +++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java @@ -28,5 +28,6 @@ package org.apache.helix.controller.pipeline; public enum AsyncWorkerType { TargetExternalViewCalcWorker, - PersistAssignmentWorker + PersistAssignmentWorker, + ExternalViewComputeWorker } http://git-wip-us.apache.org/repos/asf/helix/blob/542fbc84/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java index e2bd2a9..591867d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java @@ -22,7 +22,8 @@ package org.apache.helix.controller.stages; import org.apache.helix.*; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.ZNRecordDelta.MergeOperation; -import org.apache.helix.controller.pipeline.AbstractBaseStage; +import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; +import org.apache.helix.controller.pipeline.AsyncWorkerType; import org.apache.helix.controller.pipeline.StageException; import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory; import org.apache.helix.model.*; @@ -33,11 +34,16 @@ import org.slf4j.LoggerFactory; import java.util.*; -public class ExternalViewComputeStage extends AbstractBaseStage { +public class ExternalViewComputeStage extends AbstractAsyncBaseStage { private static Logger LOG = LoggerFactory.getLogger(ExternalViewComputeStage.class); @Override - public void process(ClusterEvent event) throws Exception { + public AsyncWorkerType getAsyncWorkerType() { + return AsyncWorkerType.ExternalViewComputeWorker; + } + + @Override + public void execute(final ClusterEvent event) throws Exception { HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name()); ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name()); http://git-wip-us.apache.org/repos/asf/helix/blob/542fbc84/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java index 12f7d0f..cd446e8 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java @@ -643,9 +643,8 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { @Override public void handleChildChange(String parentPath, List<String> currentChilds) { if (logger.isDebugEnabled()) { - logger.debug( - "Data change callback: child changed, path: " + parentPath + ", current child count: " - + currentChilds.size()); + logger.debug("Data change callback: child changed, path: {} , current child count: {}", + parentPath, currentChilds == null ? 0 : currentChilds.size()); } try { http://git-wip-us.apache.org/repos/asf/helix/blob/542fbc84/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java index b29375b..483f0af 100644 --- a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java @@ -28,6 +28,7 @@ import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.ZkServer; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.controller.pipeline.Stage; import org.apache.helix.controller.pipeline.StageContext; @@ -349,7 +350,16 @@ public class ZkUnitTestBase { StageContext context = new StageContext(); stage.init(context); stage.preProcess(); - stage.process(event); + + // AbstractAsyncBaseStage will run asynchronously, and it's main logics are implemented in + // execute() function call + // TODO (harry): duplicated code in ZkIntegrationTestBase, consider moving runStage() + // to a shared library + if (stage instanceof AbstractAsyncBaseStage) { + ((AbstractAsyncBaseStage) stage).execute(event); + } else { + stage.process(event); + } stage.postProcess(); }
