Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2241#discussion_r158195004
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java 
---
    @@ -155,134 +150,159 @@ public void start() throws Exception {
     
             Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
                 @Override public Object run() throws Exception {
    -                workerState =
    -                    new WorkerState(conf, context, topologyId, 
assignmentId, port, workerId, topologyConf, stateStorage,
    +                return loadWorker(topologyConf, stateStorage, 
stormClusterState, initCreds, initialCredentials);
    +            }
    +        }); // Subject.doAs(...)
    +
    +    }
    +
    +    private Object loadWorker(Map<String, Object> topologyConf, 
IStateStorage stateStorage, IStormClusterState stormClusterState, Map<String, 
String> initCreds, Credentials initialCredentials)
    +            throws Exception {
    +        workerState =
    +                new WorkerState(conf, context, topologyId, assignmentId, 
port, workerId, topologyConf, stateStorage,
                             stormClusterState);
     
    -                // Heartbeat here so that worker process dies if this fails
    -                // it's important that worker heartbeat to supervisor ASAP 
so that supervisor knows
    -                // that worker is running and moves on
    -                doHeartBeat();
    +        // Heartbeat here so that worker process dies if this fails
    +        // it's important that worker heartbeat to supervisor ASAP so that 
supervisor knows
    +        // that worker is running and moves on
    +        doHeartBeat();
     
    -                executorsAtom = new AtomicReference<>(null);
    +        executorsAtom = new AtomicReference<>(null);
     
    -                // launch heartbeat threads immediately so that 
slow-loading tasks don't cause the worker to timeout
    -                // to the supervisor
    -                workerState.heartbeatTimer
    -                    .scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
    -                        try {
    -                            doHeartBeat();
    -                        } catch (IOException e) {
    -                            throw new RuntimeException(e);
    -                        }
    -                    });
    +        // launch heartbeat threads immediately so that slow-loading tasks 
don't cause the worker to timeout
    +        // to the supervisor
    +        workerState.heartbeatTimer
    +                .scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
    +                    try {
    +                        doHeartBeat();
    +                    } catch (IOException e) {
    +                        throw new RuntimeException(e);
    +                    }
    +                });
     
    -                workerState.executorHeartbeatTimer
    -                    .scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
    +        workerState.executorHeartbeatTimer
    +                .scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
                             Worker.this::doExecutorHeartbeats);
     
    -                workerState.registerCallbacks();
    +        workerState.registerCallbacks();
     
    -                workerState.refreshConnections(null);
    +        workerState.refreshConnections(null);
     
    -                workerState.activateWorkerWhenAllConnectionsReady();
    +        workerState.activateWorkerWhenAllConnectionsReady();
     
    -                workerState.refreshStormActive(null);
    +        workerState.refreshStormActive(null);
     
    -                workerState.runWorkerStartHooks();
    +        workerState.runWorkerStartHooks();
     
    -                List<IRunningExecutor> newExecutors = new 
ArrayList<IRunningExecutor>();
    -                for (List<Long> e : workerState.getExecutors()) {
    -                    if (ConfigUtils.isLocalMode(topologyConf)) {
    -                        newExecutors.add(
    -                            LocalExecutor.mkExecutor(workerState, e, 
initCreds)
    -                                .execute());
    -                    } else {
    -                        newExecutors.add(
    -                            Executor.mkExecutor(workerState, e, initCreds)
    -                                .execute());
    -                    }
    -                }
    -                executorsAtom.set(newExecutors);
    +        List<Executor> execs = new ArrayList<>();
    +        for (List<Long> e : workerState.getExecutors()) {
    +            if (ConfigUtils.isLocalMode(topologyConf)) {
    +                Executor executor = LocalExecutor.mkExecutor(workerState, 
e, initCreds);
    +                execs.add( executor );
    +                
workerState.localReceiveQueues.put(executor.getTaskIds().get(0), 
executor.getReceiveQueue());
    +            } else {
    +                Executor executor = Executor.mkExecutor(workerState, e, 
initCreds);
    +                
workerState.localReceiveQueues.put(executor.getTaskIds().get(0), 
executor.getReceiveQueue());
    +                execs.add(executor);
    +            }
    +        }
     
    -                EventHandler<Object> tupleHandler = (packets, seqId, 
batchEnd) -> workerState
    -                    .sendTuplesToRemoteWorker((HashMap<Integer, 
ArrayList<TaskMessage>>) packets, seqId, batchEnd);
    +        List<IRunningExecutor> newExecutors = new 
ArrayList<IRunningExecutor>();
    +        for (Executor executor : execs) {
    +            newExecutors.add(executor.execute());
    +        }
    +        executorsAtom.set(newExecutors);
     
    -                // This thread will publish the messages destined for 
remote tasks to remote connections
    -                transferThread = Utils.asyncLoop(() -> {
    -                    
workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
    -                    return 0L;
    -                });
    +        // This thread will send out messages destined for remote tasks 
(on other workers)
    +        transferThread = workerState.makeTransferThread();
    +        transferThread.setName("Worker-Transfer");
     
    -                DisruptorBackpressureCallback disruptorBackpressureHandler 
=
    -                    mkDisruptorBackpressureHandler(workerState);
    -                
workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
    -                workerState.transferQueue
    -                    .setEnableBackpressure((Boolean) 
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
    -                workerState.transferQueue
    -                    
.setHighWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
    -                workerState.transferQueue
    -                    
.setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
    -
    -                WorkerBackpressureCallback backpressureCallback = 
mkBackpressureHandler();
    -                backpressureThread = new 
WorkerBackpressureThread(workerState.backpressureTrigger, workerState, 
backpressureCallback);
    -                if ((Boolean) 
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    -                    backpressureThread.start();
    -                    stormClusterState.topologyBackpressure(topologyId, 
workerState::refreshThrottle);
    -                    
    -                    int pollingSecs = 
ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
    -                    
workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, 
workerState::refreshThrottle);
    -                }
    +        credentialsAtom = new 
AtomicReference<Credentials>(initialCredentials);
     
    -                credentialsAtom = new 
AtomicReference<Credentials>(initialCredentials);
    +        establishLogSettingCallback();
     
    -                establishLogSettingCallback();
    +        workerState.stormClusterState.credentials(topologyId, 
Worker.this::checkCredentialsChanged);
     
    -                workerState.stormClusterState.credentials(topologyId, 
Worker.this::checkCredentialsChanged);
    +        workerState.refreshCredentialsTimer.scheduleRecurring(0,
    +                (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new 
Runnable() {
    +                    @Override public void run() {
    +                        checkCredentialsChanged();
    +                    }
    +                });
     
    -                workerState.refreshCredentialsTimer.scheduleRecurring(0,
    -                    (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), 
new Runnable() {
    -                        @Override public void run() {
    -                            checkCredentialsChanged();
    -                            if ((Boolean) 
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    -                               checkThrottleChanged();
    -                            }
    +        workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
    +                (Integer) 
conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new 
Runnable() {
    +                    @Override public void run() {
    +                        try {
    +                            LOG.debug("Checking if blobs have updated");
    +                            updateBlobUpdates();
    +                        } catch (IOException e) {
    +                            // IOException from reading the version files 
to be ignored
    +                            LOG.error(e.getStackTrace().toString());
                             }
    -                    });
    -
    -                workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
    -                        (Integer) 
conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new 
Runnable() {
    -                            @Override public void run() {
    -                                try {
    -                                    LOG.debug("Checking if blobs have 
updated");
    -                                    updateBlobUpdates();
    -                                } catch (IOException e) {
    -                                    // IOException from reading the 
version files to be ignored
    -                                    
LOG.error(e.getStackTrace().toString());
    -                                }
    -                            }
    -                        });
    -
    -                // The jitter allows the clients to get the data at 
different times, and avoids thundering herd
    -                if (!(Boolean) 
topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
    -                    
workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, 
workerState::refreshLoad);
    -                }
    +                    }
    +                });
    +
    +        // The jitter allows the clients to get the data at different 
times, and avoids thundering herd
    +        if (!(Boolean) 
topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
    +            workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 
500, workerState::refreshLoad);
    +        }
    +
    +        workerState.refreshConnectionsTimer.scheduleRecurring(0,
    +                (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), 
workerState::refreshConnections);
     
    -                workerState.refreshConnectionsTimer.scheduleRecurring(0,
    -                    (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), 
workerState::refreshConnections);
    +        workerState.resetLogLevelsTimer.scheduleRecurring(0,
    +                (Integer) 
conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), 
logConfigManager::resetLogLevels);
     
    -                workerState.resetLogLevelsTimer.scheduleRecurring(0,
    -                    (Integer) 
conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), 
logConfigManager::resetLogLevels);
    +        workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) 
conf.get(Config.TASK_REFRESH_POLL_SECS),
    +                workerState::refreshStormActive);
     
    -                workerState.refreshActiveTimer.scheduleRecurring(0, 
(Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
    -                    workerState::refreshStormActive);
    +        setupFlushTupleTimer(topologyConf, newExecutors);
    +        setupBackPressureCheckTimer(topologyConf);
    +
    +        LOG.info("Worker has topology config {}", 
Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
    +        LOG.info("Worker {} for storm {} on {}:{}  has finished loading", 
workerId, topologyId, assignmentId, port);
    +        return this;
    +    }
     
    -                LOG.info("Worker has topology config {}", 
Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
    -                LOG.info("Worker {} for storm {} on {}:{}  has finished 
loading", workerId, topologyId, assignmentId, port);
    -                return this;
    -            };
    +    private void setupFlushTupleTimer(final Map<String, Object> 
topologyConf, final List<IRunningExecutor> executors) {
    +        final Integer producerBatchSize = 
ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
    +        final Integer xferBatchSize = 
ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BATCH_SIZE));
    +        final Long flushIntervalMicros = 
ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BATCH_FLUSH_INTERVAL_MILLIS));
    --- End diff --
    
    config name is milliseconds, whereas variable name is microseconds. if you 
want to convert it later, it would be better to do it here, or just rename to 
milliseconds.
    
    Looks like it is used as milliseconds below, so I guess the variable name 
and below log messages in this method are wrong.


---

Reply via email to