This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 011e67c3125fb6e6a87d8555ebf4c41949cfa011
Author: Harry Zhang <hrzh...@linkedin.com>
AuthorDate: Wed Dec 5 15:17:42 2018 -0800

    support HelixManager to configure enabled pipeline types
---
 .../main/java/org/apache/helix/HelixManager.java   |  9 +++
 .../helix/controller/GenericHelixController.java   | 93 ++++++++++------------
 .../ResourceControllerDataProvider.java            |  4 +-
 .../WorkflowControllerDataProvider.java            |  4 +-
 .../apache/helix/controller/pipeline/Pipeline.java |  6 ++
 .../apache/helix/manager/zk/ZKHelixManager.java    | 38 ++++++++-
 .../DistClusterControllerStateModel.java           | 11 +++
 .../controller/stages/DummyClusterManager.java     |  7 ++
 .../java/org/apache/helix/mock/MockManager.java    |  7 ++
 .../helix/participant/MockZKHelixManager.java      |  7 ++
 10 files changed, 125 insertions(+), 61 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java 
b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index 66f83ae..1a6815f 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -21,9 +21,11 @@ package org.apache.helix;
 
 import java.util.List;
 
+import java.util.Set;
 import org.apache.helix.api.listeners.ClusterConfigChangeListener;
 import org.apache.helix.api.listeners.ResourceConfigChangeListener;
 import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.ClusterConfig;
@@ -255,6 +257,13 @@ public interface HelixManager {
   void addControllerMessageListener(org.apache.helix.MessageListener listener);
 
   /**
+   * Selectively enable controller pipeline using the given types. This will 
only take effect
+   * when called before connect(), and instance type is CONTROLLER
+   * @param types pipeline types to enable
+   */
+  void setEnabledControlPipelineTypes(Set<Pipeline.Type> types);
+
+  /**
    * Removes the listener. If the same listener was used for multiple changes,
    * all change notifications will be removed.<br/>
    * This will invoke onChange method on the listener with
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index ff9d32c..b287886 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -38,7 +38,6 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
-import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
@@ -172,13 +171,7 @@ public class GenericHelixController implements 
IdealStateChangeListener,
   private long _lastPipelineEndTimestamp;
 
   private String _clusterName;
-  private final Set<PipelineTypes> _enabledPipelineTypes;
-
-  // TODO: move this enum to Pipeline class
-  public enum PipelineTypes {
-    DEFAULT,
-    TASK
-  }
+  private final Set<Pipeline.Type> _enabledPipelineTypes;
 
   /**
    * Default constructor that creates a default pipeline registry. This is 
sufficient in most cases,
@@ -186,19 +179,19 @@ public class GenericHelixController implements 
IdealStateChangeListener,
    * pipeline registry
    */
   public GenericHelixController() {
-    this(createDefaultRegistry(PipelineTypes.DEFAULT.name()),
-        createTaskRegistry(PipelineTypes.TASK.name()));
+    this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()),
+        createTaskRegistry(Pipeline.Type.TASK.name()));
   }
 
   public GenericHelixController(String clusterName) {
-    this(createDefaultRegistry(PipelineTypes.DEFAULT.name()),
-        createTaskRegistry(PipelineTypes.TASK.name()), clusterName,
-        Sets.newHashSet(PipelineTypes.TASK, PipelineTypes.DEFAULT));
+    this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()),
+        createTaskRegistry(Pipeline.Type.TASK.name()), clusterName,
+        Sets.newHashSet(Pipeline.Type.TASK, Pipeline.Type.DEFAULT));
   }
 
-  public GenericHelixController(String clusterName, Set<PipelineTypes> 
enabledPipelins) {
-    this(createDefaultRegistry(PipelineTypes.DEFAULT.name()),
-        createTaskRegistry(PipelineTypes.TASK.name()), clusterName, 
enabledPipelins);
+  public GenericHelixController(String clusterName, Set<Pipeline.Type> 
enabledPipelins) {
+    this(createDefaultRegistry(Pipeline.Type.DEFAULT.name()),
+        createTaskRegistry(Pipeline.Type.TASK.name()), clusterName, 
enabledPipelins);
   }
 
   class RebalanceTask extends TimerTask {
@@ -389,11 +382,12 @@ public class GenericHelixController implements 
IdealStateChangeListener,
 
   // TODO: refactor the constructor as providing both registry but only 
enabling one looks confusing
   public GenericHelixController(PipelineRegistry registry, PipelineRegistry 
taskRegistry) {
-    this(registry, taskRegistry, null, Sets.newHashSet(PipelineTypes.TASK, 
PipelineTypes.DEFAULT));
+    this(registry, taskRegistry, null, Sets.newHashSet(
+        Pipeline.Type.TASK, Pipeline.Type.DEFAULT));
   }
 
   private GenericHelixController(PipelineRegistry registry, PipelineRegistry 
taskRegistry,
-      final String clusterName, Set<PipelineTypes> enabledPipelineTypes) {
+      final String clusterName, Set<Pipeline.Type> enabledPipelineTypes) {
     _paused = false;
     _enabledPipelineTypes = enabledPipelineTypes;
     _registry = registry;
@@ -414,28 +408,28 @@ public class GenericHelixController implements 
IdealStateChangeListener,
     initializeAsyncFIFOWorkers();
 
     // initialize pipelines at the end so we have everything else prepared
-    if (_enabledPipelineTypes.contains(PipelineTypes.DEFAULT)) {
-      logger.info("Initializing {} pipeline", PipelineTypes.DEFAULT.name());
+    if (_enabledPipelineTypes.contains(Pipeline.Type.DEFAULT)) {
+      logger.info("Initializing {} pipeline", Pipeline.Type.DEFAULT.name());
       _resourceControlDataProvider = new 
ResourceControllerDataProvider(clusterName);
       _eventQueue = new ClusterEventBlockingQueue();
       _eventThread = new ClusterEventProcessor(_resourceControlDataProvider, 
_eventQueue,
           "default-" + clusterName);
       initPipeline(_eventThread, _resourceControlDataProvider);
-      logger.info("Initialized {} pipeline", PipelineTypes.DEFAULT.name());
+      logger.info("Initialized {} pipeline", Pipeline.Type.DEFAULT.name());
     } else {
       _eventQueue = null;
       _resourceControlDataProvider = null;
       _eventThread = null;
     }
 
-    if (_enabledPipelineTypes.contains(PipelineTypes.TASK)) {
-      logger.info("Initializing {} pipeline", PipelineTypes.TASK.name());
+    if (_enabledPipelineTypes.contains(Pipeline.Type.TASK)) {
+      logger.info("Initializing {} pipeline", Pipeline.Type.TASK.name());
       _workflowControlDataProvider = new 
WorkflowControllerDataProvider(clusterName);
       _taskEventQueue = new ClusterEventBlockingQueue();
       _taskEventThread = new 
ClusterEventProcessor(_workflowControlDataProvider, _taskEventQueue,
           "task-" + clusterName);
       initPipeline(_taskEventThread, _workflowControlDataProvider);
-      logger.info("Initialized {} pipeline", PipelineTypes.TASK.name());
+      logger.info("Initialized {} pipeline", Pipeline.Type.TASK.name());
     } else {
       _workflowControlDataProvider = null;
       _taskEventQueue = null;
@@ -477,7 +471,7 @@ public class GenericHelixController implements 
IdealStateChangeListener,
   /**
    * lock-always: caller always needs to obtain an external lock before call, 
calls to handleEvent()
    * should be serialized
-   * @param event
+   * @param event cluster event to handle
    */
   private void handleEvent(ClusterEvent event, BaseControllerDataProvider 
dataProvider) {
     HelixManager manager = 
event.getAttribute(AttributeName.helixmanager.name());
@@ -507,7 +501,7 @@ public class GenericHelixController implements 
IdealStateChangeListener,
     }
 
     if (context != null) {
-      if (context.getType() == Type.FINALIZE) {
+      if (context.getType() == NotificationContext.Type.FINALIZE) {
         stopPeriodRebalance();
         logger.info("Get FINALIZE notification, skip the pipeline. Event :" + 
event.getEventType());
         return;
@@ -609,9 +603,8 @@ public class GenericHelixController implements 
IdealStateChangeListener,
               
.updateClusterEventDuration(ClusterEventMonitor.PhaseName.Callback.name(),
                   enqueueTime - zkCallbackTime);
         }
-        sb.append(String.format(
-            "Callback time for event: " + event.getEventType() + " took: " + 
(enqueueTime
-                - zkCallbackTime) + " ms\n"));
+        sb.append(String.format("Callback time for event: %s took: %s ms\n", 
event.getEventType(),
+            enqueueTime - zkCallbackTime));
       }
       if (_isMonitoring) {
         _clusterStatusMonitor
@@ -621,13 +614,10 @@ public class GenericHelixController implements 
IdealStateChangeListener,
             
.updateClusterEventDuration(ClusterEventMonitor.PhaseName.TotalProcessed.name(),
                 _lastPipelineEndTimestamp - startTime);
       }
-      sb.append(String.format(
-          "InQueue time for event: " + event.getEventType() + " took: " + 
(startTime - enqueueTime)
-              + " ms\n"));
-      sb.append(String.format(
-          "TotalProcessed time for event: " + event.getEventType() + " took: " 
+ (
-              _lastPipelineEndTimestamp
-              - startTime) + " ms"));
+      sb.append(String.format("InQueue time for event: %s took: %s ms\n", 
event.getEventType(),
+          startTime - enqueueTime));
+      sb.append(String.format("TotalProcessed time for event: %s took: %s ms", 
event.getEventType(),
+          _lastPipelineEndTimestamp - startTime));
       logger.info(sb.toString());
     }
 
@@ -748,7 +738,7 @@ public class GenericHelixController implements 
IdealStateChangeListener,
     pushToEventQueues(ClusterEventType.IdealStateChange, changeContext,
         Collections.<String, Object>emptyMap());
 
-    if (changeContext.getType() != Type.FINALIZE) {
+    if (changeContext.getType() != NotificationContext.Type.FINALIZE) {
       HelixManager manager = changeContext.getManager();
       if (manager != null) {
         HelixDataAccessor dataAccessor = 
changeContext.getManager().getHelixDataAccessor();
@@ -800,7 +790,7 @@ public class GenericHelixController implements 
IdealStateChangeListener,
   }
 
   private void notifyCaches(NotificationContext context, ChangeType 
changeType) {
-    if (context == null || context.getType() != Type.CALLBACK) {
+    if (context == null || context.getType() != 
NotificationContext.Type.CALLBACK) {
       requestDataProvidersFullRefresh();
     } else {
       updateDataChangeInProvider(changeType, context.getPathChanged());
@@ -832,7 +822,7 @@ public class GenericHelixController implements 
IdealStateChangeListener,
     // No need for completed UUID, prefixed should be fine
     String uid = UUID.randomUUID().toString().substring(0, 8);
     ClusterEvent event = new ClusterEvent(_clusterName, eventType,
-        String.format("%s_%s", uid, PipelineTypes.DEFAULT.name()));
+        String.format("%s_%s", uid, Pipeline.Type.DEFAULT.name()));
     event.addAttribute(AttributeName.helixmanager.name(), 
changeContext.getManager());
     event.addAttribute(AttributeName.changeContext.name(), changeContext);
     event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), 
_asyncFIFOWorkerPool);
@@ -841,7 +831,7 @@ public class GenericHelixController implements 
IdealStateChangeListener,
     }
     enqueueEvent(_eventQueue, event);
     enqueueEvent(_taskEventQueue,
-        event.clone(String.format("%s_%s", uid, PipelineTypes.TASK.name())));
+        event.clone(String.format("%s_%s", uid, Pipeline.Type.TASK.name())));
   }
 
   private void enqueueEvent(ClusterEventBlockingQueue queue, ClusterEvent 
event) {
@@ -860,17 +850,18 @@ public class GenericHelixController implements 
IdealStateChangeListener,
 
     boolean controllerIsLeader;
 
-    if (changeContext != null && changeContext.getType() == Type.FINALIZE) {
+    if (changeContext == null || changeContext.getType() == 
NotificationContext.Type.FINALIZE) {
       logger.info(
-          "GenericClusterController.onControllerChange() FINALIZE for cluster 
" + _clusterName);
+          "GenericClusterController.onControllerChange() Cluster change type 
{} for cluster {}. Disable leadership.",
+          changeContext == null ? null : changeContext.getType(), 
_clusterName);
       controllerIsLeader = false;
     } else {
       // double check if this controller is the leader
       controllerIsLeader = changeContext.getManager().isLeader();
     }
 
-    HelixManager manager = changeContext.getManager();
     if (controllerIsLeader) {
+      HelixManager manager = changeContext.getManager();
       HelixDataAccessor accessor = manager.getHelixDataAccessor();
       Builder keyBuilder = accessor.keyBuilder();
       PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause());
@@ -968,10 +959,10 @@ public class GenericHelixController implements 
IdealStateChangeListener,
   public void shutdown() throws InterruptedException {
     stopPeriodRebalance();
 
-    logger.info("Shutting down {} pipeline", PipelineTypes.DEFAULT.name());
+    logger.info("Shutting down {} pipeline", Pipeline.Type.DEFAULT.name());
     shutdownPipeline(_eventThread, _eventQueue);
 
-    logger.info("Shutting down {} pipeline", PipelineTypes.TASK.name());
+    logger.info("Shutting down {} pipeline", Pipeline.Type.TASK.name());
     shutdownPipeline(_taskEventThread, _taskEventQueue);
 
     // shutdown asycTasksThreadpool and wait for terminate.
@@ -1050,13 +1041,12 @@ public class GenericHelixController implements 
IdealStateChangeListener,
         logger.info("controller is now resumed from paused state");
         String uid = UUID.randomUUID().toString().substring(0, 8);
         ClusterEvent event = new ClusterEvent(_clusterName, 
ClusterEventType.Resume,
-            String.format("%s_%s", uid, PipelineTypes.DEFAULT.name()));
+            String.format("%s_%s", uid, Pipeline.Type.DEFAULT.name()));
         event.addAttribute(AttributeName.changeContext.name(), changeContext);
         event.addAttribute(AttributeName.helixmanager.name(), 
changeContext.getManager());
-        event.addAttribute(AttributeName.eventData.name(), signal);
         event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), 
_asyncFIFOWorkerPool);
         _eventQueue.put(event);
-        _taskEventQueue.put(event.clone(String.format("%s_%s", uid, 
PipelineTypes.TASK.name())));
+        _taskEventQueue.put(event.clone(String.format("%s_%s", uid, 
Pipeline.Type.TASK.name())));
       }
     }
     return statusFlag;
@@ -1064,12 +1054,13 @@ public class GenericHelixController implements 
IdealStateChangeListener,
 
 
   // TODO: refactor this to use common/ClusterEventProcessor.
+  @Deprecated
   private class ClusterEventProcessor extends Thread {
     private final BaseControllerDataProvider _cache;
     private final ClusterEventBlockingQueue _eventBlockingQueue;
     private final String _processorName;
 
-    public ClusterEventProcessor(BaseControllerDataProvider cache,
+    ClusterEventProcessor(BaseControllerDataProvider cache,
         ClusterEventBlockingQueue eventBlockingQueue, String processorName) {
       super("HelixController-pipeline-" + processorName);
       _cache = cache;
@@ -1114,8 +1105,4 @@ public class GenericHelixController implements 
IdealStateChangeListener,
     eventThread.setDaemon(true);
     eventThread.start();
   }
-
-  public static String getPipelineType(boolean isTask) {
-    return isTask ? PipelineTypes.TASK.name() : PipelineTypes.DEFAULT.name();
-  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index 3b851bf..84013ba 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -29,8 +29,8 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.common.caches.AbstractDataCache;
 import org.apache.helix.common.caches.PropertyCache;
-import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.stages.MissingTopStateRecord;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.ResourceAssignment;
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
 public class ResourceControllerDataProvider extends BaseControllerDataProvider 
{
   private static final Logger logger =
       LoggerFactory.getLogger(ResourceControllerDataProvider.class);
-  private static final String PIPELINE_NAME = 
GenericHelixController.PipelineTypes.DEFAULT.name();
+  private static final String PIPELINE_NAME = Pipeline.Type.DEFAULT.name();
 
   // Resource control specific property caches
   private final PropertyCache<ExternalView> _externalViewCache;
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
index 3fe13af..3416e22 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
@@ -27,8 +27,8 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.common.caches.AbstractDataCache;
 import org.apache.helix.common.caches.TaskDataCache;
-import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.task.AssignableInstanceManager;
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
 public class WorkflowControllerDataProvider extends BaseControllerDataProvider 
{
   private static final Logger logger =
       LoggerFactory.getLogger(WorkflowControllerDataProvider.class);
-  private static final String PIPELINE_NAME = 
GenericHelixController.PipelineTypes.TASK.name();
+  private static final String PIPELINE_NAME = Pipeline.Type.TASK.name();
 
   private TaskDataCache _taskDataCache;
   private Map<String, Integer> _participantActiveTaskCount;
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java 
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
index 7f00759..d3bb474 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
@@ -33,6 +33,11 @@ public class Pipeline {
   private final String _pipelineType;
   List<Stage> _stages;
 
+  public enum Type {
+    DEFAULT,
+    TASK
+  }
+
   public Pipeline() {
     this("");
   }
@@ -84,4 +89,5 @@ public class Pipeline {
   public List<Stage> getStages() {
     return _stages;
   }
+
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 7bb32f3..513ea27 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -19,21 +19,38 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
-import javax.management.JMException;
+import com.google.common.collect.Sets;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.concurrent.TimeUnit;
-
+import javax.management.JMException;
 import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
-import org.apache.helix.*;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerProperties;
+import org.apache.helix.HelixTimerTask;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceInfoProvider;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.PropertyType;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.api.listeners.ClusterConfigChangeListener;
 import org.apache.helix.api.listeners.ConfigChangeListener;
 import org.apache.helix.api.listeners.ControllerChangeListener;
@@ -46,6 +63,7 @@ import org.apache.helix.api.listeners.MessageListener;
 import org.apache.helix.api.listeners.ResourceConfigChangeListener;
 import org.apache.helix.api.listeners.ScopedConfigChangeListener;
 import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
 import org.apache.helix.healthcheck.ParticipantHealthReportTask;
@@ -133,6 +151,7 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
    * controller fields
    */
   private GenericHelixController _controller;
+  private Set<Pipeline.Type> _enabledPipelineTypes;
   private CallbackHandler _leaderElectionHandler = null;
   protected final List<HelixTimerTask> _controllerTimerTasks = new 
ArrayList<>();
 
@@ -202,6 +221,8 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
     }
 
     _instanceName = instanceName;
+    _enabledPipelineTypes =
+        Sets.newHashSet(Pipeline.Type.DEFAULT, Pipeline.Type.TASK);
     _preConnectCallbacks = new ArrayList<>();
     _handlers = new ArrayList<>();
     _properties = new 
HelixManagerProperties(SystemPropertyKeys.CLUSTER_MANAGER_VERSION);
@@ -294,6 +315,15 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
     }
   }
 
+  public void setEnabledControlPipelineTypes(Set<Pipeline.Type> types) {
+    if (!InstanceType.CONTROLLER.equals(_instanceType) && 
!InstanceType.CONTROLLER_PARTICIPANT
+        .equals(_instanceType)) {
+      throw new IllegalStateException(
+          String.format("Cannot enable control pipeline for instance type %s", 
_instanceType));
+    }
+    _enabledPipelineTypes = types;
+  }
+
   @Override public boolean removeListener(PropertyKey key, Object listener) {
     LOG.info("Removing listener: " + listener + " on path: " + key.getPath() + 
" from cluster: "
         + _clusterName + " by instance: " + _instanceName);
@@ -696,7 +726,7 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
     case CONTROLLER:
     case CONTROLLER_PARTICIPANT:
       if (_controller == null) {
-        _controller = new GenericHelixController(_clusterName);
+        _controller = new GenericHelixController(_clusterName, 
_enabledPipelineTypes);
         _messagingService.getExecutor().setController(_controller);
       }
       break;
diff --git 
a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
 
b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
index e144a51..d5e6ec2 100644
--- 
a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
+++ 
b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
@@ -19,10 +19,13 @@ package org.apache.helix.participant;
  * under the License.
  */
 
+import com.google.common.collect.Sets;
+import java.util.Set;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.slf4j.Logger;
@@ -34,9 +37,16 @@ import org.slf4j.LoggerFactory;
 public class DistClusterControllerStateModel extends 
AbstractHelixLeaderStandbyStateModel {
   private static Logger logger = 
LoggerFactory.getLogger(DistClusterControllerStateModel.class);
   protected HelixManager _controller = null;
+  private final Set<Pipeline.Type> _enabledPipelineTypes;
 
   public DistClusterControllerStateModel(String zkAddr) {
+    this(zkAddr, Sets.newHashSet(Pipeline.Type.DEFAULT, Pipeline.Type.TASK));
+  }
+
+  public DistClusterControllerStateModel(String zkAddr,
+      Set<Pipeline.Type> enabledPipelineTypes) {
     super(zkAddr);
+    _enabledPipelineTypes = enabledPipelineTypes;
   }
 
   @Override
@@ -56,6 +66,7 @@ public class DistClusterControllerStateModel extends 
AbstractHelixLeaderStandbyS
       _controller =
           HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
               InstanceType.CONTROLLER, _zkAddr);
+      _controller.setEnabledControlPipelineTypes(_enabledPipelineTypes);
       _controller.connect();
       _controller.startTimerTasks();
       logStateTransition("STANDBY", "LEADER", clusterName, controllerName);
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
index dad3c76..9391782 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.Set;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
@@ -41,6 +42,7 @@ import 
org.apache.helix.api.listeners.LiveInstanceChangeListener;
 import org.apache.helix.api.listeners.MessageListener;
 import org.apache.helix.api.listeners.ResourceConfigChangeListener;
 import org.apache.helix.api.listeners.ScopedConfigChangeListener;
+import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.participant.StateMachineEngine;
@@ -144,6 +146,11 @@ public class DummyClusterManager implements HelixManager {
   }
 
   @Override
+  public void setEnabledControlPipelineTypes(Set<Pipeline.Type> types) {
+
+  }
+
+  @Override
   public boolean removeListener(PropertyKey key, Object listener) {
     // TODO Auto-generated method stub
     return false;
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockManager.java 
b/helix-core/src/test/java/org/apache/helix/mock/MockManager.java
index 2e67128..5753781 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockManager.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockManager.java
@@ -19,6 +19,7 @@ package org.apache.helix.mock;
  * under the License.
  */
 
+import java.util.Set;
 import java.util.UUID;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.ConfigAccessor;
@@ -43,6 +44,7 @@ import 
org.apache.helix.api.listeners.LiveInstanceChangeListener;
 import org.apache.helix.api.listeners.MessageListener;
 import org.apache.helix.api.listeners.ResourceConfigChangeListener;
 import org.apache.helix.api.listeners.ScopedConfigChangeListener;
+import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.participant.HelixStateMachineEngine;
@@ -159,6 +161,11 @@ public class MockManager implements HelixManager {
   }
 
   @Override
+  public void setEnabledControlPipelineTypes(Set<Pipeline.Type> types) {
+
+  }
+
+  @Override
   public String getClusterName() {
     return _clusterName;
   }
diff --git 
a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java 
b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index 8eb9a2b..983a6a3 100644
--- 
a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ 
b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -19,6 +19,7 @@ package org.apache.helix.participant;
  * under the License.
  */
 
+import java.util.Set;
 import java.util.UUID;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.ConfigAccessor;
@@ -42,6 +43,7 @@ import 
org.apache.helix.api.listeners.LiveInstanceChangeListener;
 import org.apache.helix.api.listeners.MessageListener;
 import org.apache.helix.api.listeners.ResourceConfigChangeListener;
 import org.apache.helix.api.listeners.ScopedConfigChangeListener;
+import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -100,6 +102,11 @@ public class MockZKHelixManager implements HelixManager {
   }
 
   @Override
+  public void setEnabledControlPipelineTypes(Set<Pipeline.Type> types) {
+
+  }
+
+  @Override
   public void 
addLiveInstanceChangeListener(org.apache.helix.LiveInstanceChangeListener 
listener) throws Exception {
 
   }

Reply via email to