Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r159958378 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java --- @@ -155,134 +150,159 @@ public void start() throws Exception { Subject.doAs(subject, new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { - workerState = - new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage, + return loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials); + } + }); // Subject.doAs(...) + + } + + private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState, Map<String, String> initCreds, Credentials initialCredentials) + throws Exception { + workerState = + new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage, stormClusterState); - // Heartbeat here so that worker process dies if this fails - // it's important that worker heartbeat to supervisor ASAP so that supervisor knows - // that worker is running and moves on - doHeartBeat(); + // Heartbeat here so that worker process dies if this fails + // it's important that worker heartbeat to supervisor ASAP so that supervisor knows + // that worker is running and moves on + doHeartBeat(); - executorsAtom = new AtomicReference<>(null); + executorsAtom = new AtomicReference<>(null); - // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout - // to the supervisor - workerState.heartbeatTimer - .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> { - try { - doHeartBeat(); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout + // to the supervisor + workerState.heartbeatTimer + .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> { + try { + doHeartBeat(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); - workerState.executorHeartbeatTimer - .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), + workerState.executorHeartbeatTimer + .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), Worker.this::doExecutorHeartbeats); - workerState.registerCallbacks(); + workerState.registerCallbacks(); - workerState.refreshConnections(null); + workerState.refreshConnections(null); - workerState.activateWorkerWhenAllConnectionsReady(); + workerState.activateWorkerWhenAllConnectionsReady(); - workerState.refreshStormActive(null); + workerState.refreshStormActive(null); - workerState.runWorkerStartHooks(); + workerState.runWorkerStartHooks(); - List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>(); - for (List<Long> e : workerState.getExecutors()) { - if (ConfigUtils.isLocalMode(topologyConf)) { - newExecutors.add( - LocalExecutor.mkExecutor(workerState, e, initCreds) - .execute()); - } else { - newExecutors.add( - Executor.mkExecutor(workerState, e, initCreds) - .execute()); - } - } - executorsAtom.set(newExecutors); + List<Executor> execs = new ArrayList<>(); + for (List<Long> e : workerState.getExecutors()) { + if (ConfigUtils.isLocalMode(topologyConf)) { + Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds); + execs.add( executor ); + workerState.localReceiveQueues.put(executor.getTaskIds().get(0), executor.getReceiveQueue()); + } else { + Executor executor = Executor.mkExecutor(workerState, e, initCreds); + workerState.localReceiveQueues.put(executor.getTaskIds().get(0), executor.getReceiveQueue()); + execs.add(executor); + } + } - EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState - .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd); + List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>(); + for (Executor executor : execs) { + newExecutors.add(executor.execute()); + } + executorsAtom.set(newExecutors); - // This thread will publish the messages destined for remote tasks to remote connections - transferThread = Utils.asyncLoop(() -> { - workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler); - return 0L; - }); + // This thread will send out messages destined for remote tasks (on other workers) + transferThread = workerState.makeTransferThread(); + transferThread.setName("Worker-Transfer"); - DisruptorBackpressureCallback disruptorBackpressureHandler = - mkDisruptorBackpressureHandler(workerState); - workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler); - workerState.transferQueue - .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)); - workerState.transferQueue - .setHighWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK))); - workerState.transferQueue - .setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK))); - - WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler(); - backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback); - if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) { - backpressureThread.start(); - stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle); - - int pollingSecs = ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS)); - workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, workerState::refreshThrottle); - } + credentialsAtom = new AtomicReference<Credentials>(initialCredentials); - credentialsAtom = new AtomicReference<Credentials>(initialCredentials); + establishLogSettingCallback(); - establishLogSettingCallback(); + workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged); - workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged); + workerState.refreshCredentialsTimer.scheduleRecurring(0, + (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() { + @Override public void run() { + checkCredentialsChanged(); + } + }); - workerState.refreshCredentialsTimer.scheduleRecurring(0, - (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() { - @Override public void run() { - checkCredentialsChanged(); - if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) { - checkThrottleChanged(); - } + workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0, + (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new Runnable() { + @Override public void run() { + try { + LOG.debug("Checking if blobs have updated"); + updateBlobUpdates(); + } catch (IOException e) { + // IOException from reading the version files to be ignored + LOG.error(e.getStackTrace().toString()); } - }); - - workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0, - (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new Runnable() { - @Override public void run() { - try { - LOG.debug("Checking if blobs have updated"); - updateBlobUpdates(); - } catch (IOException e) { - // IOException from reading the version files to be ignored - LOG.error(e.getStackTrace().toString()); - } - } - }); - - // The jitter allows the clients to get the data at different times, and avoids thundering herd - if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) { - workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad); - } + } + }); + + // The jitter allows the clients to get the data at different times, and avoids thundering herd + if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) { + workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad); + } + + workerState.refreshConnectionsTimer.scheduleRecurring(0, + (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections); - workerState.refreshConnectionsTimer.scheduleRecurring(0, - (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections); + workerState.resetLogLevelsTimer.scheduleRecurring(0, + (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels); - workerState.resetLogLevelsTimer.scheduleRecurring(0, - (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels); + workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), + workerState::refreshStormActive); - workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), - workerState::refreshStormActive); + setupFlushTupleTimer(topologyConf, newExecutors); + setupBackPressureCheckTimer(topologyConf); + + LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD)); + LOG.info("Worker {} for storm {} on {}:{} has finished loading", workerId, topologyId, assignmentId, port); + return this; + } - LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD)); - LOG.info("Worker {} for storm {} on {}:{} has finished loading", workerId, topologyId, assignmentId, port); - return this; - }; + private void setupFlushTupleTimer(final Map<String, Object> topologyConf, final List<IRunningExecutor> executors) { + final Integer producerBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE)); + final Integer xferBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BATCH_SIZE)); + final Long flushIntervalMillis = ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BATCH_FLUSH_INTERVAL_MILLIS)); + if ((producerBatchSize == 1 && xferBatchSize == 1) || flushIntervalMillis == 0) { + LOG.info("Flush Tuple generation disabled. producerBatchSize={}, xferBatchSize={}, flushIntervalMillis={}", producerBatchSize, xferBatchSize, flushIntervalMillis); + return; + } + + workerState.flushTupleTimer.scheduleRecurringMs(flushIntervalMillis, flushIntervalMillis, new Runnable() { + @Override + public void run() { + // send flush tuple to all executors + for (int i = 0; i < executors.size(); i++) { + IRunningExecutor exec = executors.get(i); + if (exec.getExecutorId().get(0) != Constants.SYSTEM_TASK_ID) { + exec.getExecutor().publishFlushTuple(); + } + } + } }); + LOG.info("Flush tuple will be generated every {} millis", flushIntervalMillis); + } + private void setupBackPressureCheckTimer(final Map<String, Object> topologyConf) { + final Integer workerCount = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_WORKERS)); + if (workerCount <= 1) { + LOG.info("BackPressure change checking is disabled as there is only one worker"); + return; + } + final Long bpCheckIntervalMs = ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_CHECK_MILLIS)); + workerState.backPressureCheckTimer.scheduleRecurringMs(bpCheckIntervalMs, bpCheckIntervalMs, new Runnable() { --- End diff -- nit: Could we use java 8 lambdas here? ``` workerState.backPressureCheckTimer.scheduleRecurringMs(bpCheckIntervalMs, bpCheckIntervalMs, () -> workerState.refreshBackPressureStatus()); ```
---