Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2241#discussion_r158195004
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
---
@@ -155,134 +150,159 @@ public void start() throws Exception {
Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
@Override public Object run() throws Exception {
- workerState =
- new WorkerState(conf, context, topologyId,
assignmentId, port, workerId, topologyConf, stateStorage,
+ return loadWorker(topologyConf, stateStorage,
stormClusterState, initCreds, initialCredentials);
+ }
+ }); // Subject.doAs(...)
+
+ }
+
+ private Object loadWorker(Map<String, Object> topologyConf,
IStateStorage stateStorage, IStormClusterState stormClusterState, Map<String,
String> initCreds, Credentials initialCredentials)
+ throws Exception {
+ workerState =
+ new WorkerState(conf, context, topologyId, assignmentId,
port, workerId, topologyConf, stateStorage,
stormClusterState);
- // Heartbeat here so that worker process dies if this fails
- // it's important that worker heartbeat to supervisor ASAP
so that supervisor knows
- // that worker is running and moves on
- doHeartBeat();
+ // Heartbeat here so that worker process dies if this fails
+ // it's important that worker heartbeat to supervisor ASAP so that
supervisor knows
+ // that worker is running and moves on
+ doHeartBeat();
- executorsAtom = new AtomicReference<>(null);
+ executorsAtom = new AtomicReference<>(null);
- // launch heartbeat threads immediately so that
slow-loading tasks don't cause the worker to timeout
- // to the supervisor
- workerState.heartbeatTimer
- .scheduleRecurring(0, (Integer)
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
- try {
- doHeartBeat();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
+ // launch heartbeat threads immediately so that slow-loading tasks
don't cause the worker to timeout
+ // to the supervisor
+ workerState.heartbeatTimer
+ .scheduleRecurring(0, (Integer)
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
+ try {
+ doHeartBeat();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
- workerState.executorHeartbeatTimer
- .scheduleRecurring(0, (Integer)
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
+ workerState.executorHeartbeatTimer
+ .scheduleRecurring(0, (Integer)
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
Worker.this::doExecutorHeartbeats);
- workerState.registerCallbacks();
+ workerState.registerCallbacks();
- workerState.refreshConnections(null);
+ workerState.refreshConnections(null);
- workerState.activateWorkerWhenAllConnectionsReady();
+ workerState.activateWorkerWhenAllConnectionsReady();
- workerState.refreshStormActive(null);
+ workerState.refreshStormActive(null);
- workerState.runWorkerStartHooks();
+ workerState.runWorkerStartHooks();
- List<IRunningExecutor> newExecutors = new
ArrayList<IRunningExecutor>();
- for (List<Long> e : workerState.getExecutors()) {
- if (ConfigUtils.isLocalMode(topologyConf)) {
- newExecutors.add(
- LocalExecutor.mkExecutor(workerState, e,
initCreds)
- .execute());
- } else {
- newExecutors.add(
- Executor.mkExecutor(workerState, e, initCreds)
- .execute());
- }
- }
- executorsAtom.set(newExecutors);
+ List<Executor> execs = new ArrayList<>();
+ for (List<Long> e : workerState.getExecutors()) {
+ if (ConfigUtils.isLocalMode(topologyConf)) {
+ Executor executor = LocalExecutor.mkExecutor(workerState,
e, initCreds);
+ execs.add( executor );
+
workerState.localReceiveQueues.put(executor.getTaskIds().get(0),
executor.getReceiveQueue());
+ } else {
+ Executor executor = Executor.mkExecutor(workerState, e,
initCreds);
+
workerState.localReceiveQueues.put(executor.getTaskIds().get(0),
executor.getReceiveQueue());
+ execs.add(executor);
+ }
+ }
- EventHandler<Object> tupleHandler = (packets, seqId,
batchEnd) -> workerState
- .sendTuplesToRemoteWorker((HashMap<Integer,
ArrayList<TaskMessage>>) packets, seqId, batchEnd);
+ List<IRunningExecutor> newExecutors = new
ArrayList<IRunningExecutor>();
+ for (Executor executor : execs) {
+ newExecutors.add(executor.execute());
+ }
+ executorsAtom.set(newExecutors);
- // This thread will publish the messages destined for
remote tasks to remote connections
- transferThread = Utils.asyncLoop(() -> {
-
workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
- return 0L;
- });
+ // This thread will send out messages destined for remote tasks
(on other workers)
+ transferThread = workerState.makeTransferThread();
+ transferThread.setName("Worker-Transfer");
- DisruptorBackpressureCallback disruptorBackpressureHandler
=
- mkDisruptorBackpressureHandler(workerState);
-
workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
- workerState.transferQueue
- .setEnableBackpressure((Boolean)
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
- workerState.transferQueue
-
.setHighWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
- workerState.transferQueue
-
.setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
-
- WorkerBackpressureCallback backpressureCallback =
mkBackpressureHandler();
- backpressureThread = new
WorkerBackpressureThread(workerState.backpressureTrigger, workerState,
backpressureCallback);
- if ((Boolean)
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
- backpressureThread.start();
- stormClusterState.topologyBackpressure(topologyId,
workerState::refreshThrottle);
-
- int pollingSecs =
ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
-
workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs,
workerState::refreshThrottle);
- }
+ credentialsAtom = new
AtomicReference<Credentials>(initialCredentials);
- credentialsAtom = new
AtomicReference<Credentials>(initialCredentials);
+ establishLogSettingCallback();
- establishLogSettingCallback();
+ workerState.stormClusterState.credentials(topologyId,
Worker.this::checkCredentialsChanged);
- workerState.stormClusterState.credentials(topologyId,
Worker.this::checkCredentialsChanged);
+ workerState.refreshCredentialsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new
Runnable() {
+ @Override public void run() {
+ checkCredentialsChanged();
+ }
+ });
- workerState.refreshCredentialsTimer.scheduleRecurring(0,
- (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS),
new Runnable() {
- @Override public void run() {
- checkCredentialsChanged();
- if ((Boolean)
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
- checkThrottleChanged();
- }
+ workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
+ (Integer)
conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new
Runnable() {
+ @Override public void run() {
+ try {
+ LOG.debug("Checking if blobs have updated");
+ updateBlobUpdates();
+ } catch (IOException e) {
+ // IOException from reading the version files
to be ignored
+ LOG.error(e.getStackTrace().toString());
}
- });
-
- workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
- (Integer)
conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new
Runnable() {
- @Override public void run() {
- try {
- LOG.debug("Checking if blobs have
updated");
- updateBlobUpdates();
- } catch (IOException e) {
- // IOException from reading the
version files to be ignored
-
LOG.error(e.getStackTrace().toString());
- }
- }
- });
-
- // The jitter allows the clients to get the data at
different times, and avoids thundering herd
- if (!(Boolean)
topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
-
workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500,
workerState::refreshLoad);
- }
+ }
+ });
+
+ // The jitter allows the clients to get the data at different
times, and avoids thundering herd
+ if (!(Boolean)
topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
+ workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1,
500, workerState::refreshLoad);
+ }
+
+ workerState.refreshConnectionsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
workerState::refreshConnections);
- workerState.refreshConnectionsTimer.scheduleRecurring(0,
- (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
workerState::refreshConnections);
+ workerState.resetLogLevelsTimer.scheduleRecurring(0,
+ (Integer)
conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS),
logConfigManager::resetLogLevels);
- workerState.resetLogLevelsTimer.scheduleRecurring(0,
- (Integer)
conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS),
logConfigManager::resetLogLevels);
+ workerState.refreshActiveTimer.scheduleRecurring(0, (Integer)
conf.get(Config.TASK_REFRESH_POLL_SECS),
+ workerState::refreshStormActive);
- workerState.refreshActiveTimer.scheduleRecurring(0,
(Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
- workerState::refreshStormActive);
+ setupFlushTupleTimer(topologyConf, newExecutors);
+ setupBackPressureCheckTimer(topologyConf);
+
+ LOG.info("Worker has topology config {}",
Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
+ LOG.info("Worker {} for storm {} on {}:{} has finished loading",
workerId, topologyId, assignmentId, port);
+ return this;
+ }
- LOG.info("Worker has topology config {}",
Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
- LOG.info("Worker {} for storm {} on {}:{} has finished
loading", workerId, topologyId, assignmentId, port);
- return this;
- };
+ private void setupFlushTupleTimer(final Map<String, Object>
topologyConf, final List<IRunningExecutor> executors) {
+ final Integer producerBatchSize =
ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
+ final Integer xferBatchSize =
ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BATCH_SIZE));
+ final Long flushIntervalMicros =
ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BATCH_FLUSH_INTERVAL_MILLIS));
--- End diff --
config name is milliseconds, whereas variable name is microseconds. if you
want to convert it later, it would be better to do it here, or just rename to
milliseconds.
Looks like it is used as milliseconds below, so I guess the variable name
and below log messages in this method are wrong.
---