Repository: helix
Updated Branches:
  refs/heads/master 566d4f166 -> 1103fecb6


Customize the pipeline thread name with cluster name and pipeline type.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/96593708
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/96593708
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/96593708

Branch: refs/heads/master
Commit: 9659370849116b8a3f5d1853c7695274942ce211
Parents: 566d4f1
Author: Lei Xia <l...@linkedin.com>
Authored: Tue Oct 2 14:43:25 2018 -0700
Committer: Junkai Xue <j...@linkedin.com>
Committed: Wed Oct 31 13:50:33 2018 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      | 30 +++++++++++++-------
 1 file changed, 19 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/96593708/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index dd409e5..eb75286 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -356,7 +356,7 @@ public class GenericHelixController implements 
IdealStateChangeListener,
   }
 
   private GenericHelixController(PipelineRegistry registry, PipelineRegistry 
taskRegistry,
-      String clusterName) {
+      final String clusterName) {
     _paused = false;
     _registry = registry;
     _taskRegistry = taskRegistry;
@@ -366,7 +366,7 @@ public class GenericHelixController implements 
IdealStateChangeListener,
     _asyncTasksThreadPool =
         Executors.newScheduledThreadPool(ASYNC_TASKS_THREADPOOL_SIZE, new 
ThreadFactory() {
           @Override public Thread newThread(Runnable r) {
-            return new Thread(r, "GerenricHelixController-async_task_thread");
+            return new Thread(r, "HelixController-async_tasks-" + 
_clusterName);
           }
         });
 
@@ -378,8 +378,9 @@ public class GenericHelixController implements 
IdealStateChangeListener,
     _cache = new ClusterDataCache(clusterName);
     _taskCache = new ClusterDataCache(clusterName);
 
-    _eventThread = new ClusterEventProcessor(_cache, _eventQueue);
-    _taskEventThread = new ClusterEventProcessor(_taskCache, _taskEventQueue);
+    _eventThread = new ClusterEventProcessor(_cache, _eventQueue, "default-" + 
clusterName);
+    _taskEventThread =
+        new ClusterEventProcessor(_taskCache, _taskEventQueue, "task-" + 
clusterName);
 
     _forceRebalanceTimer = new Timer();
     _lastPipelineEndTimestamp = 
TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED;
@@ -979,33 +980,40 @@ public class GenericHelixController implements 
IdealStateChangeListener,
   private class ClusterEventProcessor extends Thread {
     private final ClusterDataCache _cache;
     private final ClusterEventBlockingQueue _eventBlockingQueue;
+    private final String _processorName;
 
     public ClusterEventProcessor(ClusterDataCache cache,
-        ClusterEventBlockingQueue eventBlockingQueue) {
-      super("GenericHelixController-event_process");
+        ClusterEventBlockingQueue eventBlockingQueue, String processorName) {
+      super("HelixController-pipeline-" + processorName);
       _cache = cache;
       _eventBlockingQueue = eventBlockingQueue;
+      _processorName = processorName;
     }
 
     @Override
     public void run() {
-      logger.info("START ClusterEventProcessor thread  for cluster " + 
_clusterName);
+      logger.info(
+          "START ClusterEventProcessor thread  for cluster " + _clusterName + 
", processor name: "
+              + _processorName);
       while (!isInterrupted()) {
         try {
           handleEvent(_eventBlockingQueue.take(), _cache);
         } catch (InterruptedException e) {
-          logger.warn("ClusterEventProcessor interrupted", e);
+          logger.warn("ClusterEventProcessor interrupted " + _processorName, 
e);
           interrupt();
         } catch (ZkInterruptedException e) {
-          logger.warn("ClusterEventProcessor caught a ZK connection 
interrupt", e);
+          logger
+              .warn("ClusterEventProcessor caught a ZK connection interrupt " 
+ _processorName, e);
           interrupt();
         } catch (ThreadDeath death) {
+          logger.error("ClusterEventProcessor caught a ThreadDeath  " + 
_processorName, death);
           throw death;
         } catch (Throwable t) {
-          logger.error("ClusterEventProcessor failed while running the 
controller pipeline", t);
+          logger.error("ClusterEventProcessor failed while running the 
controller pipeline "
+              + _processorName, t);
         }
       }
-      logger.info("END ClusterEventProcessor thread");
+      logger.info("END ClusterEventProcessor thread " + _processorName);
     }
   }
 

Reply via email to