Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159957697
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
---
@@ -155,134 +150,159 @@ public void start() throws Exception {
Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
@Override public Object run() throws Exception {
- workerState =
- new WorkerState(conf, context, topologyId,
assignmentId, port, workerId, topologyConf, stateStorage,
+ return loadWorker(topologyConf, stateStorage,
stormClusterState, initCreds, initialCredentials);
+ }
+ }); // Subject.doAs(...)
+
+ }
+
+ private Object loadWorker(Map<String, Object> topologyConf,
IStateStorage stateStorage, IStormClusterState stormClusterState, Map<String,
String> initCreds, Credentials initialCredentials)
+ throws Exception {
+ workerState =
+ new WorkerState(conf, context, topologyId, assignmentId,
port, workerId, topologyConf, stateStorage,
stormClusterState);
- // Heartbeat here so that worker process dies if this fails
- // it's important that worker heartbeat to supervisor ASAP
so that supervisor knows
- // that worker is running and moves on
- doHeartBeat();
+ // Heartbeat here so that worker process dies if this fails
+ // it's important that worker heartbeat to supervisor ASAP so that
supervisor knows
+ // that worker is running and moves on
+ doHeartBeat();
- executorsAtom = new AtomicReference<>(null);
+ executorsAtom = new AtomicReference<>(null);
- // launch heartbeat threads immediately so that
slow-loading tasks don't cause the worker to timeout
- // to the supervisor
- workerState.heartbeatTimer
- .scheduleRecurring(0, (Integer)
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
- try {
- doHeartBeat();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
+ // launch heartbeat threads immediately so that slow-loading tasks
don't cause the worker to timeout
+ // to the supervisor
+ workerState.heartbeatTimer
+ .scheduleRecurring(0, (Integer)
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
+ try {
+ doHeartBeat();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
- workerState.executorHeartbeatTimer
- .scheduleRecurring(0, (Integer)
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
+ workerState.executorHeartbeatTimer
+ .scheduleRecurring(0, (Integer)
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
Worker.this::doExecutorHeartbeats);
- workerState.registerCallbacks();
+ workerState.registerCallbacks();
- workerState.refreshConnections(null);
+ workerState.refreshConnections(null);
- workerState.activateWorkerWhenAllConnectionsReady();
+ workerState.activateWorkerWhenAllConnectionsReady();
- workerState.refreshStormActive(null);
+ workerState.refreshStormActive(null);
- workerState.runWorkerStartHooks();
+ workerState.runWorkerStartHooks();
- List<IRunningExecutor> newExecutors = new
ArrayList<IRunningExecutor>();
- for (List<Long> e : workerState.getExecutors()) {
- if (ConfigUtils.isLocalMode(topologyConf)) {
- newExecutors.add(
- LocalExecutor.mkExecutor(workerState, e,
initCreds)
- .execute());
- } else {
- newExecutors.add(
- Executor.mkExecutor(workerState, e, initCreds)
- .execute());
- }
- }
- executorsAtom.set(newExecutors);
+ List<Executor> execs = new ArrayList<>();
+ for (List<Long> e : workerState.getExecutors()) {
+ if (ConfigUtils.isLocalMode(topologyConf)) {
+ Executor executor = LocalExecutor.mkExecutor(workerState,
e, initCreds);
+ execs.add( executor );
+
workerState.localReceiveQueues.put(executor.getTaskIds().get(0),
executor.getReceiveQueue());
+ } else {
+ Executor executor = Executor.mkExecutor(workerState, e,
initCreds);
+
workerState.localReceiveQueues.put(executor.getTaskIds().get(0),
executor.getReceiveQueue());
+ execs.add(executor);
+ }
+ }
- EventHandler<Object> tupleHandler = (packets, seqId,
batchEnd) -> workerState
- .sendTuplesToRemoteWorker((HashMap<Integer,
ArrayList<TaskMessage>>) packets, seqId, batchEnd);
+ List<IRunningExecutor> newExecutors = new
ArrayList<IRunningExecutor>();
+ for (Executor executor : execs) {
+ newExecutors.add(executor.execute());
+ }
+ executorsAtom.set(newExecutors);
- // This thread will publish the messages destined for
remote tasks to remote connections
- transferThread = Utils.asyncLoop(() -> {
-
workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
- return 0L;
- });
+ // This thread will send out messages destined for remote tasks
(on other workers)
+ transferThread = workerState.makeTransferThread();
+ transferThread.setName("Worker-Transfer");
- DisruptorBackpressureCallback disruptorBackpressureHandler
=
- mkDisruptorBackpressureHandler(workerState);
-
workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
- workerState.transferQueue
- .setEnableBackpressure((Boolean)
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
- workerState.transferQueue
-
.setHighWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
- workerState.transferQueue
-
.setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
-
- WorkerBackpressureCallback backpressureCallback =
mkBackpressureHandler();
- backpressureThread = new
WorkerBackpressureThread(workerState.backpressureTrigger, workerState,
backpressureCallback);
- if ((Boolean)
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
- backpressureThread.start();
- stormClusterState.topologyBackpressure(topologyId,
workerState::refreshThrottle);
-
- int pollingSecs =
ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
-
workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs,
workerState::refreshThrottle);
- }
+ credentialsAtom = new
AtomicReference<Credentials>(initialCredentials);
- credentialsAtom = new
AtomicReference<Credentials>(initialCredentials);
+ establishLogSettingCallback();
- establishLogSettingCallback();
+ workerState.stormClusterState.credentials(topologyId,
Worker.this::checkCredentialsChanged);
- workerState.stormClusterState.credentials(topologyId,
Worker.this::checkCredentialsChanged);
+ workerState.refreshCredentialsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new
Runnable() {
+ @Override public void run() {
+ checkCredentialsChanged();
+ }
+ });
- workerState.refreshCredentialsTimer.scheduleRecurring(0,
- (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS),
new Runnable() {
- @Override public void run() {
- checkCredentialsChanged();
- if ((Boolean)
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
- checkThrottleChanged();
- }
+ workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
+ (Integer)
conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new
Runnable() {
+ @Override public void run() {
+ try {
+ LOG.debug("Checking if blobs have updated");
+ updateBlobUpdates();
+ } catch (IOException e) {
+ // IOException from reading the version files
to be ignored
+ LOG.error(e.getStackTrace().toString());
}
- });
-
- workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
- (Integer)
conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new
Runnable() {
- @Override public void run() {
- try {
- LOG.debug("Checking if blobs have
updated");
- updateBlobUpdates();
- } catch (IOException e) {
- // IOException from reading the
version files to be ignored
-
LOG.error(e.getStackTrace().toString());
- }
- }
- });
-
- // The jitter allows the clients to get the data at
different times, and avoids thundering herd
- if (!(Boolean)
topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
-
workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500,
workerState::refreshLoad);
- }
+ }
+ });
+
+ // The jitter allows the clients to get the data at different
times, and avoids thundering herd
+ if (!(Boolean)
topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
+ workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1,
500, workerState::refreshLoad);
+ }
+
+ workerState.refreshConnectionsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
workerState::refreshConnections);
- workerState.refreshConnectionsTimer.scheduleRecurring(0,
- (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
workerState::refreshConnections);
+ workerState.resetLogLevelsTimer.scheduleRecurring(0,
+ (Integer)
conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS),
logConfigManager::resetLogLevels);
- workerState.resetLogLevelsTimer.scheduleRecurring(0,
- (Integer)
conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS),
logConfigManager::resetLogLevels);
+ workerState.refreshActiveTimer.scheduleRecurring(0, (Integer)
conf.get(Config.TASK_REFRESH_POLL_SECS),
+ workerState::refreshStormActive);
- workerState.refreshActiveTimer.scheduleRecurring(0,
(Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
- workerState::refreshStormActive);
+ setupFlushTupleTimer(topologyConf, newExecutors);
+ setupBackPressureCheckTimer(topologyConf);
+
+ LOG.info("Worker has topology config {}",
Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
--- End diff --
nit: I think we log this elsewhere too, so it might be good to remove this.
---