Repository: helix Updated Branches: refs/heads/master 566d4f166 -> 1103fecb6
Customize the pipeline thread name with cluster name and pipeline type. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/96593708 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/96593708 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/96593708 Branch: refs/heads/master Commit: 9659370849116b8a3f5d1853c7695274942ce211 Parents: 566d4f1 Author: Lei Xia <l...@linkedin.com> Authored: Tue Oct 2 14:43:25 2018 -0700 Committer: Junkai Xue <j...@linkedin.com> Committed: Wed Oct 31 13:50:33 2018 -0700 ---------------------------------------------------------------------- .../controller/GenericHelixController.java | 30 +++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/96593708/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index dd409e5..eb75286 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -356,7 +356,7 @@ public class GenericHelixController implements IdealStateChangeListener, } private GenericHelixController(PipelineRegistry registry, PipelineRegistry taskRegistry, - String clusterName) { + final String clusterName) { _paused = false; _registry = registry; _taskRegistry = taskRegistry; @@ -366,7 +366,7 @@ public class GenericHelixController implements IdealStateChangeListener, _asyncTasksThreadPool = Executors.newScheduledThreadPool(ASYNC_TASKS_THREADPOOL_SIZE, new ThreadFactory() { @Override public Thread newThread(Runnable r) { - return new Thread(r, "GerenricHelixController-async_task_thread"); + return new Thread(r, "HelixController-async_tasks-" + _clusterName); } }); @@ -378,8 +378,9 @@ public class GenericHelixController implements IdealStateChangeListener, _cache = new ClusterDataCache(clusterName); _taskCache = new ClusterDataCache(clusterName); - _eventThread = new ClusterEventProcessor(_cache, _eventQueue); - _taskEventThread = new ClusterEventProcessor(_taskCache, _taskEventQueue); + _eventThread = new ClusterEventProcessor(_cache, _eventQueue, "default-" + clusterName); + _taskEventThread = + new ClusterEventProcessor(_taskCache, _taskEventQueue, "task-" + clusterName); _forceRebalanceTimer = new Timer(); _lastPipelineEndTimestamp = TopStateHandoffReportStage.TIMESTAMP_NOT_RECORDED; @@ -979,33 +980,40 @@ public class GenericHelixController implements IdealStateChangeListener, private class ClusterEventProcessor extends Thread { private final ClusterDataCache _cache; private final ClusterEventBlockingQueue _eventBlockingQueue; + private final String _processorName; public ClusterEventProcessor(ClusterDataCache cache, - ClusterEventBlockingQueue eventBlockingQueue) { - super("GenericHelixController-event_process"); + ClusterEventBlockingQueue eventBlockingQueue, String processorName) { + super("HelixController-pipeline-" + processorName); _cache = cache; _eventBlockingQueue = eventBlockingQueue; + _processorName = processorName; } @Override public void run() { - logger.info("START ClusterEventProcessor thread for cluster " + _clusterName); + logger.info( + "START ClusterEventProcessor thread for cluster " + _clusterName + ", processor name: " + + _processorName); while (!isInterrupted()) { try { handleEvent(_eventBlockingQueue.take(), _cache); } catch (InterruptedException e) { - logger.warn("ClusterEventProcessor interrupted", e); + logger.warn("ClusterEventProcessor interrupted " + _processorName, e); interrupt(); } catch (ZkInterruptedException e) { - logger.warn("ClusterEventProcessor caught a ZK connection interrupt", e); + logger + .warn("ClusterEventProcessor caught a ZK connection interrupt " + _processorName, e); interrupt(); } catch (ThreadDeath death) { + logger.error("ClusterEventProcessor caught a ThreadDeath " + _processorName, death); throw death; } catch (Throwable t) { - logger.error("ClusterEventProcessor failed while running the controller pipeline", t); + logger.error("ClusterEventProcessor failed while running the controller pipeline " + + _processorName, t); } } - logger.info("END ClusterEventProcessor thread"); + logger.info("END ClusterEventProcessor thread " + _processorName); } }