This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 011e67c3125fb6e6a87d8555ebf4c41949cfa011 Author: Harry Zhang <hrzh...@linkedin.com> AuthorDate: Wed Dec 5 15:17:42 2018 -0800 support HelixManager to configure enabled pipeline types --- .../main/java/org/apache/helix/HelixManager.java | 9 +++ .../helix/controller/GenericHelixController.java | 93 ++++++++++------------ .../ResourceControllerDataProvider.java | 4 +- .../WorkflowControllerDataProvider.java | 4 +- .../apache/helix/controller/pipeline/Pipeline.java | 6 ++ .../apache/helix/manager/zk/ZKHelixManager.java | 38 ++++++++- .../DistClusterControllerStateModel.java | 11 +++ .../controller/stages/DummyClusterManager.java | 7 ++ .../java/org/apache/helix/mock/MockManager.java | 7 ++ .../helix/participant/MockZKHelixManager.java | 7 ++ 10 files changed, 125 insertions(+), 61 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java index 66f83ae..1a6815f 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java @@ -21,9 +21,11 @@ package org.apache.helix; import java.util.List; +import java.util.Set; import org.apache.helix.api.listeners.ClusterConfigChangeListener; import org.apache.helix.api.listeners.ResourceConfigChangeListener; import org.apache.helix.controller.GenericHelixController; +import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.healthcheck.ParticipantHealthReportCollector; import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.model.ClusterConfig; @@ -255,6 +257,13 @@ public interface HelixManager { void addControllerMessageListener(org.apache.helix.MessageListener listener); /** + * Selectively enable controller pipeline using the given types. This will only take effect + * when called before connect(), and instance type is CONTROLLER + * @param types pipeline types to enable + */ + void setEnabledControlPipelineTypes(Set<Pipeline.Type> types); + + /** * Removes the listener. If the same listener was used for multiple changes, * all change notifications will be removed.<br/> * This will invoke onChange method on the listener with diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index ff9d32c..b287886 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -38,7 +38,6 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.NotificationContext; -import org.apache.helix.NotificationContext.Type; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.api.exceptions.HelixMetaDataAccessException; @@ -172,13 +171,7 @@ public class GenericHelixController implements IdealStateChangeListener, private long _lastPipelineEndTimestamp; private String _clusterName; - private final Set<PipelineTypes> _enabledPipelineTypes; - - // TODO: move this enum to Pipeline class - public enum PipelineTypes { - DEFAULT, - TASK - } + private final Set<Pipeline.Type> _enabledPipelineTypes; /** * Default constructor that creates a default pipeline registry. This is sufficient in most cases, @@ -186,19 +179,19 @@ public class GenericHelixController implements IdealStateChangeListener, * pipeline registry */ public GenericHelixController() { - this(createDefaultRegistry(PipelineTypes.DEFAULT.name()), - createTaskRegistry(PipelineTypes.TASK.name())); + this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()), + createTaskRegistry(Pipeline.Type.TASK.name())); } public GenericHelixController(String clusterName) { - this(createDefaultRegistry(PipelineTypes.DEFAULT.name()), - createTaskRegistry(PipelineTypes.TASK.name()), clusterName, - Sets.newHashSet(PipelineTypes.TASK, PipelineTypes.DEFAULT)); + this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()), + createTaskRegistry(Pipeline.Type.TASK.name()), clusterName, + Sets.newHashSet(Pipeline.Type.TASK, Pipeline.Type.DEFAULT)); } - public GenericHelixController(String clusterName, Set<PipelineTypes> enabledPipelins) { - this(createDefaultRegistry(PipelineTypes.DEFAULT.name()), - createTaskRegistry(PipelineTypes.TASK.name()), clusterName, enabledPipelins); + public GenericHelixController(String clusterName, Set<Pipeline.Type> enabledPipelins) { + this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()), + createTaskRegistry(Pipeline.Type.TASK.name()), clusterName, enabledPipelins); } class RebalanceTask extends TimerTask { @@ -389,11 +382,12 @@ public class GenericHelixController implements IdealStateChangeListener, // TODO: refactor the constructor as providing both registry but only enabling one looks confusing public GenericHelixController(PipelineRegistry registry, PipelineRegistry taskRegistry) { - this(registry, taskRegistry, null, Sets.newHashSet(PipelineTypes.TASK, PipelineTypes.DEFAULT)); + this(registry, taskRegistry, null, Sets.newHashSet( + Pipeline.Type.TASK, Pipeline.Type.DEFAULT)); } private GenericHelixController(PipelineRegistry registry, PipelineRegistry taskRegistry, - final String clusterName, Set<PipelineTypes> enabledPipelineTypes) { + final String clusterName, Set<Pipeline.Type> enabledPipelineTypes) { _paused = false; _enabledPipelineTypes = enabledPipelineTypes; _registry = registry; @@ -414,28 +408,28 @@ public class GenericHelixController implements IdealStateChangeListener, initializeAsyncFIFOWorkers(); // initialize pipelines at the end so we have everything else prepared - if (_enabledPipelineTypes.contains(PipelineTypes.DEFAULT)) { - logger.info("Initializing {} pipeline", PipelineTypes.DEFAULT.name()); + if (_enabledPipelineTypes.contains(Pipeline.Type.DEFAULT)) { + logger.info("Initializing {} pipeline", Pipeline.Type.DEFAULT.name()); _resourceControlDataProvider = new ResourceControllerDataProvider(clusterName); _eventQueue = new ClusterEventBlockingQueue(); _eventThread = new ClusterEventProcessor(_resourceControlDataProvider, _eventQueue, "default-" + clusterName); initPipeline(_eventThread, _resourceControlDataProvider); - logger.info("Initialized {} pipeline", PipelineTypes.DEFAULT.name()); + logger.info("Initialized {} pipeline", Pipeline.Type.DEFAULT.name()); } else { _eventQueue = null; _resourceControlDataProvider = null; _eventThread = null; } - if (_enabledPipelineTypes.contains(PipelineTypes.TASK)) { - logger.info("Initializing {} pipeline", PipelineTypes.TASK.name()); + if (_enabledPipelineTypes.contains(Pipeline.Type.TASK)) { + logger.info("Initializing {} pipeline", Pipeline.Type.TASK.name()); _workflowControlDataProvider = new WorkflowControllerDataProvider(clusterName); _taskEventQueue = new ClusterEventBlockingQueue(); _taskEventThread = new ClusterEventProcessor(_workflowControlDataProvider, _taskEventQueue, "task-" + clusterName); initPipeline(_taskEventThread, _workflowControlDataProvider); - logger.info("Initialized {} pipeline", PipelineTypes.TASK.name()); + logger.info("Initialized {} pipeline", Pipeline.Type.TASK.name()); } else { _workflowControlDataProvider = null; _taskEventQueue = null; @@ -477,7 +471,7 @@ public class GenericHelixController implements IdealStateChangeListener, /** * lock-always: caller always needs to obtain an external lock before call, calls to handleEvent() * should be serialized - * @param event + * @param event cluster event to handle */ private void handleEvent(ClusterEvent event, BaseControllerDataProvider dataProvider) { HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); @@ -507,7 +501,7 @@ public class GenericHelixController implements IdealStateChangeListener, } if (context != null) { - if (context.getType() == Type.FINALIZE) { + if (context.getType() == NotificationContext.Type.FINALIZE) { stopPeriodRebalance(); logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getEventType()); return; @@ -609,9 +603,8 @@ public class GenericHelixController implements IdealStateChangeListener, .updateClusterEventDuration(ClusterEventMonitor.PhaseName.Callback.name(), enqueueTime - zkCallbackTime); } - sb.append(String.format( - "Callback time for event: " + event.getEventType() + " took: " + (enqueueTime - - zkCallbackTime) + " ms\n")); + sb.append(String.format("Callback time for event: %s took: %s ms\n", event.getEventType(), + enqueueTime - zkCallbackTime)); } if (_isMonitoring) { _clusterStatusMonitor @@ -621,13 +614,10 @@ public class GenericHelixController implements IdealStateChangeListener, .updateClusterEventDuration(ClusterEventMonitor.PhaseName.TotalProcessed.name(), _lastPipelineEndTimestamp - startTime); } - sb.append(String.format( - "InQueue time for event: " + event.getEventType() + " took: " + (startTime - enqueueTime) - + " ms\n")); - sb.append(String.format( - "TotalProcessed time for event: " + event.getEventType() + " took: " + ( - _lastPipelineEndTimestamp - - startTime) + " ms")); + sb.append(String.format("InQueue time for event: %s took: %s ms\n", event.getEventType(), + startTime - enqueueTime)); + sb.append(String.format("TotalProcessed time for event: %s took: %s ms", event.getEventType(), + _lastPipelineEndTimestamp - startTime)); logger.info(sb.toString()); } @@ -748,7 +738,7 @@ public class GenericHelixController implements IdealStateChangeListener, pushToEventQueues(ClusterEventType.IdealStateChange, changeContext, Collections.<String, Object>emptyMap()); - if (changeContext.getType() != Type.FINALIZE) { + if (changeContext.getType() != NotificationContext.Type.FINALIZE) { HelixManager manager = changeContext.getManager(); if (manager != null) { HelixDataAccessor dataAccessor = changeContext.getManager().getHelixDataAccessor(); @@ -800,7 +790,7 @@ public class GenericHelixController implements IdealStateChangeListener, } private void notifyCaches(NotificationContext context, ChangeType changeType) { - if (context == null || context.getType() != Type.CALLBACK) { + if (context == null || context.getType() != NotificationContext.Type.CALLBACK) { requestDataProvidersFullRefresh(); } else { updateDataChangeInProvider(changeType, context.getPathChanged()); @@ -832,7 +822,7 @@ public class GenericHelixController implements IdealStateChangeListener, // No need for completed UUID, prefixed should be fine String uid = UUID.randomUUID().toString().substring(0, 8); ClusterEvent event = new ClusterEvent(_clusterName, eventType, - String.format("%s_%s", uid, PipelineTypes.DEFAULT.name())); + String.format("%s_%s", uid, Pipeline.Type.DEFAULT.name())); event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager()); event.addAttribute(AttributeName.changeContext.name(), changeContext); event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool); @@ -841,7 +831,7 @@ public class GenericHelixController implements IdealStateChangeListener, } enqueueEvent(_eventQueue, event); enqueueEvent(_taskEventQueue, - event.clone(String.format("%s_%s", uid, PipelineTypes.TASK.name()))); + event.clone(String.format("%s_%s", uid, Pipeline.Type.TASK.name()))); } private void enqueueEvent(ClusterEventBlockingQueue queue, ClusterEvent event) { @@ -860,17 +850,18 @@ public class GenericHelixController implements IdealStateChangeListener, boolean controllerIsLeader; - if (changeContext != null && changeContext.getType() == Type.FINALIZE) { + if (changeContext == null || changeContext.getType() == NotificationContext.Type.FINALIZE) { logger.info( - "GenericClusterController.onControllerChange() FINALIZE for cluster " + _clusterName); + "GenericClusterController.onControllerChange() Cluster change type {} for cluster {}. Disable leadership.", + changeContext == null ? null : changeContext.getType(), _clusterName); controllerIsLeader = false; } else { // double check if this controller is the leader controllerIsLeader = changeContext.getManager().isLeader(); } - HelixManager manager = changeContext.getManager(); if (controllerIsLeader) { + HelixManager manager = changeContext.getManager(); HelixDataAccessor accessor = manager.getHelixDataAccessor(); Builder keyBuilder = accessor.keyBuilder(); PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause()); @@ -968,10 +959,10 @@ public class GenericHelixController implements IdealStateChangeListener, public void shutdown() throws InterruptedException { stopPeriodRebalance(); - logger.info("Shutting down {} pipeline", PipelineTypes.DEFAULT.name()); + logger.info("Shutting down {} pipeline", Pipeline.Type.DEFAULT.name()); shutdownPipeline(_eventThread, _eventQueue); - logger.info("Shutting down {} pipeline", PipelineTypes.TASK.name()); + logger.info("Shutting down {} pipeline", Pipeline.Type.TASK.name()); shutdownPipeline(_taskEventThread, _taskEventQueue); // shutdown asycTasksThreadpool and wait for terminate. @@ -1050,13 +1041,12 @@ public class GenericHelixController implements IdealStateChangeListener, logger.info("controller is now resumed from paused state"); String uid = UUID.randomUUID().toString().substring(0, 8); ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.Resume, - String.format("%s_%s", uid, PipelineTypes.DEFAULT.name())); + String.format("%s_%s", uid, Pipeline.Type.DEFAULT.name())); event.addAttribute(AttributeName.changeContext.name(), changeContext); event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager()); - event.addAttribute(AttributeName.eventData.name(), signal); event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool); _eventQueue.put(event); - _taskEventQueue.put(event.clone(String.format("%s_%s", uid, PipelineTypes.TASK.name()))); + _taskEventQueue.put(event.clone(String.format("%s_%s", uid, Pipeline.Type.TASK.name()))); } } return statusFlag; @@ -1064,12 +1054,13 @@ public class GenericHelixController implements IdealStateChangeListener, // TODO: refactor this to use common/ClusterEventProcessor. + @Deprecated private class ClusterEventProcessor extends Thread { private final BaseControllerDataProvider _cache; private final ClusterEventBlockingQueue _eventBlockingQueue; private final String _processorName; - public ClusterEventProcessor(BaseControllerDataProvider cache, + ClusterEventProcessor(BaseControllerDataProvider cache, ClusterEventBlockingQueue eventBlockingQueue, String processorName) { super("HelixController-pipeline-" + processorName); _cache = cache; @@ -1114,8 +1105,4 @@ public class GenericHelixController implements IdealStateChangeListener, eventThread.setDaemon(true); eventThread.start(); } - - public static String getPipelineType(boolean isTask) { - return isTask ? PipelineTypes.TASK.name() : PipelineTypes.DEFAULT.name(); - } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java index 3b851bf..84013ba 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java @@ -29,8 +29,8 @@ import org.apache.helix.PropertyKey; import org.apache.helix.ZNRecord; import org.apache.helix.common.caches.AbstractDataCache; import org.apache.helix.common.caches.PropertyCache; -import org.apache.helix.controller.GenericHelixController; import org.apache.helix.controller.LogUtil; +import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.controller.stages.MissingTopStateRecord; import org.apache.helix.model.ExternalView; import org.apache.helix.model.ResourceAssignment; @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory; public class ResourceControllerDataProvider extends BaseControllerDataProvider { private static final Logger logger = LoggerFactory.getLogger(ResourceControllerDataProvider.class); - private static final String PIPELINE_NAME = GenericHelixController.PipelineTypes.DEFAULT.name(); + private static final String PIPELINE_NAME = Pipeline.Type.DEFAULT.name(); // Resource control specific property caches private final PropertyCache<ExternalView> _externalViewCache; diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java index 3fe13af..3416e22 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java @@ -27,8 +27,8 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.ZNRecord; import org.apache.helix.common.caches.AbstractDataCache; import org.apache.helix.common.caches.TaskDataCache; -import org.apache.helix.controller.GenericHelixController; import org.apache.helix.controller.LogUtil; +import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.LiveInstance; import org.apache.helix.task.AssignableInstanceManager; @@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory; public class WorkflowControllerDataProvider extends BaseControllerDataProvider { private static final Logger logger = LoggerFactory.getLogger(WorkflowControllerDataProvider.class); - private static final String PIPELINE_NAME = GenericHelixController.PipelineTypes.TASK.name(); + private static final String PIPELINE_NAME = Pipeline.Type.TASK.name(); private TaskDataCache _taskDataCache; private Map<String, Integer> _participantActiveTaskCount; diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java index 7f00759..d3bb474 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java +++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java @@ -33,6 +33,11 @@ public class Pipeline { private final String _pipelineType; List<Stage> _stages; + public enum Type { + DEFAULT, + TASK + } + public Pipeline() { this(""); } @@ -84,4 +89,5 @@ public class Pipeline { public List<Stage> getStages() { return _stages; } + } diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index 7bb32f3..513ea27 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -19,21 +19,38 @@ package org.apache.helix.manager.zk; * under the License. */ -import javax.management.JMException; +import com.google.common.collect.Sets; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Timer; import java.util.concurrent.TimeUnit; - +import javax.management.JMException; import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.exception.ZkInterruptedException; -import org.apache.helix.*; +import org.apache.helix.BaseDataAccessor; +import org.apache.helix.ClusterMessagingService; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixAdmin; import org.apache.helix.HelixConstants.ChangeType; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerProperties; +import org.apache.helix.HelixTimerTask; +import org.apache.helix.InstanceType; +import org.apache.helix.LiveInstanceInfoProvider; +import org.apache.helix.PreConnectCallback; +import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.PropertyPathBuilder; +import org.apache.helix.PropertyType; +import org.apache.helix.SystemPropertyKeys; +import org.apache.helix.ZNRecord; import org.apache.helix.api.listeners.ClusterConfigChangeListener; import org.apache.helix.api.listeners.ConfigChangeListener; import org.apache.helix.api.listeners.ControllerChangeListener; @@ -46,6 +63,7 @@ import org.apache.helix.api.listeners.MessageListener; import org.apache.helix.api.listeners.ResourceConfigChangeListener; import org.apache.helix.api.listeners.ScopedConfigChangeListener; import org.apache.helix.controller.GenericHelixController; +import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.healthcheck.ParticipantHealthReportCollector; import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl; import org.apache.helix.healthcheck.ParticipantHealthReportTask; @@ -133,6 +151,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { * controller fields */ private GenericHelixController _controller; + private Set<Pipeline.Type> _enabledPipelineTypes; private CallbackHandler _leaderElectionHandler = null; protected final List<HelixTimerTask> _controllerTimerTasks = new ArrayList<>(); @@ -202,6 +221,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { } _instanceName = instanceName; + _enabledPipelineTypes = + Sets.newHashSet(Pipeline.Type.DEFAULT, Pipeline.Type.TASK); _preConnectCallbacks = new ArrayList<>(); _handlers = new ArrayList<>(); _properties = new HelixManagerProperties(SystemPropertyKeys.CLUSTER_MANAGER_VERSION); @@ -294,6 +315,15 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { } } + public void setEnabledControlPipelineTypes(Set<Pipeline.Type> types) { + if (!InstanceType.CONTROLLER.equals(_instanceType) && !InstanceType.CONTROLLER_PARTICIPANT + .equals(_instanceType)) { + throw new IllegalStateException( + String.format("Cannot enable control pipeline for instance type %s", _instanceType)); + } + _enabledPipelineTypes = types; + } + @Override public boolean removeListener(PropertyKey key, Object listener) { LOG.info("Removing listener: " + listener + " on path: " + key.getPath() + " from cluster: " + _clusterName + " by instance: " + _instanceName); @@ -696,7 +726,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { case CONTROLLER: case CONTROLLER_PARTICIPANT: if (_controller == null) { - _controller = new GenericHelixController(_clusterName); + _controller = new GenericHelixController(_clusterName, _enabledPipelineTypes); _messagingService.getExecutor().setController(_controller); } break; diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java index e144a51..d5e6ec2 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java @@ -19,10 +19,13 @@ package org.apache.helix.participant; * under the License. */ +import com.google.common.collect.Sets; +import java.util.Set; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; import org.apache.helix.NotificationContext; +import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.model.Message; import org.apache.helix.participant.statemachine.StateModelInfo; import org.slf4j.Logger; @@ -34,9 +37,16 @@ import org.slf4j.LoggerFactory; public class DistClusterControllerStateModel extends AbstractHelixLeaderStandbyStateModel { private static Logger logger = LoggerFactory.getLogger(DistClusterControllerStateModel.class); protected HelixManager _controller = null; + private final Set<Pipeline.Type> _enabledPipelineTypes; public DistClusterControllerStateModel(String zkAddr) { + this(zkAddr, Sets.newHashSet(Pipeline.Type.DEFAULT, Pipeline.Type.TASK)); + } + + public DistClusterControllerStateModel(String zkAddr, + Set<Pipeline.Type> enabledPipelineTypes) { super(zkAddr); + _enabledPipelineTypes = enabledPipelineTypes; } @Override @@ -56,6 +66,7 @@ public class DistClusterControllerStateModel extends AbstractHelixLeaderStandbyS _controller = HelixManagerFactory.getZKHelixManager(clusterName, controllerName, InstanceType.CONTROLLER, _zkAddr); + _controller.setEnabledControlPipelineTypes(_enabledPipelineTypes); _controller.connect(); _controller.startTimerTasks(); logStateTransition("STANDBY", "LEADER", clusterName, controllerName); diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java index dad3c76..9391782 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java @@ -19,6 +19,7 @@ package org.apache.helix.controller.stages; * under the License. */ +import java.util.Set; import org.apache.helix.ClusterMessagingService; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; @@ -41,6 +42,7 @@ import org.apache.helix.api.listeners.LiveInstanceChangeListener; import org.apache.helix.api.listeners.MessageListener; import org.apache.helix.api.listeners.ResourceConfigChangeListener; import org.apache.helix.api.listeners.ScopedConfigChangeListener; +import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.healthcheck.ParticipantHealthReportCollector; import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; import org.apache.helix.participant.StateMachineEngine; @@ -144,6 +146,11 @@ public class DummyClusterManager implements HelixManager { } @Override + public void setEnabledControlPipelineTypes(Set<Pipeline.Type> types) { + + } + + @Override public boolean removeListener(PropertyKey key, Object listener) { // TODO Auto-generated method stub return false; diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockManager.java b/helix-core/src/test/java/org/apache/helix/mock/MockManager.java index 2e67128..5753781 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockManager.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockManager.java @@ -19,6 +19,7 @@ package org.apache.helix.mock; * under the License. */ +import java.util.Set; import java.util.UUID; import org.apache.helix.ClusterMessagingService; import org.apache.helix.ConfigAccessor; @@ -43,6 +44,7 @@ import org.apache.helix.api.listeners.LiveInstanceChangeListener; import org.apache.helix.api.listeners.MessageListener; import org.apache.helix.api.listeners.ResourceConfigChangeListener; import org.apache.helix.api.listeners.ScopedConfigChangeListener; +import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.healthcheck.ParticipantHealthReportCollector; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.participant.HelixStateMachineEngine; @@ -159,6 +161,11 @@ public class MockManager implements HelixManager { } @Override + public void setEnabledControlPipelineTypes(Set<Pipeline.Type> types) { + + } + + @Override public String getClusterName() { return _clusterName; } diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java index 8eb9a2b..983a6a3 100644 --- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java +++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java @@ -19,6 +19,7 @@ package org.apache.helix.participant; * under the License. */ +import java.util.Set; import java.util.UUID; import org.apache.helix.ClusterMessagingService; import org.apache.helix.ConfigAccessor; @@ -42,6 +43,7 @@ import org.apache.helix.api.listeners.LiveInstanceChangeListener; import org.apache.helix.api.listeners.MessageListener; import org.apache.helix.api.listeners.ResourceConfigChangeListener; import org.apache.helix.api.listeners.ScopedConfigChangeListener; +import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.healthcheck.ParticipantHealthReportCollector; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; @@ -100,6 +102,11 @@ public class MockZKHelixManager implements HelixManager { } @Override + public void setEnabledControlPipelineTypes(Set<Pipeline.Type> types) { + + } + + @Override public void addLiveInstanceChangeListener(org.apache.helix.LiveInstanceChangeListener listener) throws Exception { }