Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2241#discussion_r129736039
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
---
@@ -155,134 +150,141 @@ public void start() throws Exception {
Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
@Override public Object run() throws Exception {
- workerState =
- new WorkerState(conf, context, topologyId,
assignmentId, port, workerId, topologyConf, stateStorage,
+ return loadWorker(topologyConf, stateStorage,
stormClusterState, initCreds, initialCredentials);
+ }
+ }); // Subject.doAs(...)
+
+ }
+
+ private Object loadWorker(Map topologyConf, IStateStorage
stateStorage, IStormClusterState stormClusterState, Map<String, String>
initCreds, Credentials initialCredentials)
+ throws Exception {
+ workerState =
+ new WorkerState(conf, context, topologyId, assignmentId,
port, workerId, topologyConf, stateStorage,
stormClusterState);
- // Heartbeat here so that worker process dies if this fails
- // it's important that worker heartbeat to supervisor ASAP
so that supervisor knows
- // that worker is running and moves on
- doHeartBeat();
+ // Heartbeat here so that worker process dies if this fails
+ // it's important that worker heartbeat to supervisor ASAP so that
supervisor knows
+ // that worker is running and moves on
+ doHeartBeat();
- executorsAtom = new AtomicReference<>(null);
+ executorsAtom = new AtomicReference<>(null);
- // launch heartbeat threads immediately so that
slow-loading tasks don't cause the worker to timeout
- // to the supervisor
- workerState.heartbeatTimer
- .scheduleRecurring(0, (Integer)
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
- try {
- doHeartBeat();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
+ // launch heartbeat threads immediately so that slow-loading tasks
don't cause the worker to timeout
+ // to the supervisor
+ workerState.heartbeatTimer
+ .scheduleRecurring(0, (Integer)
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
+ try {
+ doHeartBeat();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
- workerState.executorHeartbeatTimer
- .scheduleRecurring(0, (Integer)
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
+ workerState.executorHeartbeatTimer
+ .scheduleRecurring(0, (Integer)
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
Worker.this::doExecutorHeartbeats);
- workerState.registerCallbacks();
+ workerState.registerCallbacks();
- workerState.refreshConnections(null);
+ workerState.refreshConnections(null);
- workerState.activateWorkerWhenAllConnectionsReady();
+ workerState.activateWorkerWhenAllConnectionsReady();
- workerState.refreshStormActive(null);
+ workerState.refreshStormActive(null);
- workerState.runWorkerStartHooks();
+ workerState.runWorkerStartHooks();
- List<IRunningExecutor> newExecutors = new
ArrayList<IRunningExecutor>();
- for (List<Long> e : workerState.getExecutors()) {
- if (ConfigUtils.isLocalMode(topologyConf)) {
- newExecutors.add(
- LocalExecutor.mkExecutor(workerState, e,
initCreds)
+ List<IRunningExecutor> newExecutors = new
ArrayList<IRunningExecutor>();
+ for (List<Long> e : workerState.getExecutors()) {
+ if (ConfigUtils.isLocalMode(topologyConf)) {
+ newExecutors.add(
+ LocalExecutor.mkExecutor(workerState, e, initCreds)
.execute());
- } else {
- newExecutors.add(
- Executor.mkExecutor(workerState, e, initCreds)
+ } else {
+ newExecutors.add(
+ Executor.mkExecutor(workerState, e, initCreds)
.execute());
- }
- }
- executorsAtom.set(newExecutors);
+ }
+ }
- EventHandler<Object> tupleHandler = (packets, seqId,
batchEnd) -> workerState
- .sendTuplesToRemoteWorker((HashMap<Integer,
ArrayList<TaskMessage>>) packets, seqId, batchEnd);
+ executorsAtom.set(newExecutors);
- // This thread will publish the messages destined for
remote tasks to remote connections
- transferThread = Utils.asyncLoop(() -> {
-
workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
- return 0L;
- });
+ JCQueue.Consumer tupleHandler = workerState;
- DisruptorBackpressureCallback disruptorBackpressureHandler
=
- mkDisruptorBackpressureHandler(workerState);
-
workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
- workerState.transferQueue
- .setEnableBackpressure((Boolean)
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
- workerState.transferQueue
-
.setHighWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
- workerState.transferQueue
-
.setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
-
- WorkerBackpressureCallback backpressureCallback =
mkBackpressureHandler();
- backpressureThread = new
WorkerBackpressureThread(workerState.backpressureTrigger, workerState,
backpressureCallback);
- if ((Boolean)
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
- backpressureThread.start();
- stormClusterState.topologyBackpressure(topologyId,
workerState::refreshThrottle);
-
- int pollingSecs =
ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
-
workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs,
workerState::refreshThrottle);
- }
+ // This thread will send the messages destined for remote tasks
(out of process)
+ transferThread = Utils.asyncLoop(() -> {
+ int x = workerState.transferQueue.consume(tupleHandler);
+ if(x==0)
+ return 1L;
+ return 0L;
+ });
+ transferThread.setName("Worker-Transfer");
- credentialsAtom = new
AtomicReference<Credentials>(initialCredentials);
+ credentialsAtom = new
AtomicReference<Credentials>(initialCredentials);
- establishLogSettingCallback();
+ establishLogSettingCallback();
+
+ workerState.stormClusterState.credentials(topologyId,
Worker.this::checkCredentialsChanged);
- workerState.stormClusterState.credentials(topologyId,
Worker.this::checkCredentialsChanged);
+ workerState.refreshCredentialsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new
Runnable() {
+ @Override public void run() {
+ checkCredentialsChanged();
+ }
+ });
- workerState.refreshCredentialsTimer.scheduleRecurring(0,
- (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS),
new Runnable() {
- @Override public void run() {
- checkCredentialsChanged();
- if ((Boolean)
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
- checkThrottleChanged();
- }
+ workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
+ (Integer)
conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new
Runnable() {
+ @Override public void run() {
+ try {
+ LOG.debug("Checking if blobs have updated");
+ updateBlobUpdates();
+ } catch (IOException e) {
+ // IOException from reading the version files
to be ignored
+ LOG.error(e.getStackTrace().toString());
}
- });
-
- workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
- (Integer)
conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new
Runnable() {
- @Override public void run() {
- try {
- LOG.debug("Checking if blobs have
updated");
- updateBlobUpdates();
- } catch (IOException e) {
- // IOException from reading the
version files to be ignored
-
LOG.error(e.getStackTrace().toString());
- }
- }
- });
-
- // The jitter allows the clients to get the data at
different times, and avoids thundering herd
- if (!(Boolean)
topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
-
workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500,
workerState::refreshLoad);
- }
+ }
+ });
- workerState.refreshConnectionsTimer.scheduleRecurring(0,
- (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
workerState::refreshConnections);
+ // The jitter allows the clients to get the data at different
times, and avoids thundering herd
+ if (!(Boolean)
topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
+ workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1,
500, workerState::refreshLoad);
+ }
- workerState.resetLogLevelsTimer.scheduleRecurring(0,
- (Integer)
conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS),
logConfigManager::resetLogLevels);
+ workerState.refreshConnectionsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
workerState::refreshConnections);
- workerState.refreshActiveTimer.scheduleRecurring(0,
(Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
- workerState::refreshStormActive);
+ workerState.resetLogLevelsTimer.scheduleRecurring(0,
+ (Integer)
conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS),
logConfigManager::resetLogLevels);
- LOG.info("Worker has topology config {}",
Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
- LOG.info("Worker {} for storm {} on {}:{} has finished
loading", workerId, topologyId, assignmentId, port);
- return this;
- };
- });
+ workerState.refreshActiveTimer.scheduleRecurring(0, (Integer)
conf.get(Config.TASK_REFRESH_POLL_SECS),
+ workerState::refreshStormActive);
+
+ setupFlushTupleTimer(newExecutors);
+
+ LOG.info("Worker has topology config {}",
Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
+ LOG.info("Worker {} for storm {} on {}:{} has finished loading",
workerId, topologyId, assignmentId, port);
+ return this;
+ }
+ private void setupFlushTupleTimer(final List<IRunningExecutor>
executors) {
+// StormTimer timerTask = workerState.getUserTimer();
+ Integer batchSize =
ObjectReader.getInt(conf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
+ final Long flushIntervalMs = ObjectReader.getLong(
conf.get(Config.TOPOLOGY_FLUSH_TUPLE_FREQ_MILLIS) );
+ if(batchSize==1 || flushIntervalMs==0)
+ return;
+
+ workerState.flushTupleTimer.scheduleRecurringMs(flushIntervalMs,
flushIntervalMs, new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 0; i < executors.size(); i++) {
+ IRunningExecutor exec = executors.get(i);
+ if(exec.getExecutorId().get(0) != -1) // dont send to
system bolt
--- End diff --
agree.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---