Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2241#discussion_r129736039
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java 
---
    @@ -155,134 +150,141 @@ public void start() throws Exception {
     
             Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
                 @Override public Object run() throws Exception {
    -                workerState =
    -                    new WorkerState(conf, context, topologyId, 
assignmentId, port, workerId, topologyConf, stateStorage,
    +                return loadWorker(topologyConf, stateStorage, 
stormClusterState, initCreds, initialCredentials);
    +            }
    +        }); // Subject.doAs(...)
    +
    +    }
    +
    +    private Object loadWorker(Map topologyConf, IStateStorage 
stateStorage, IStormClusterState stormClusterState, Map<String, String> 
initCreds, Credentials initialCredentials)
    +            throws Exception {
    +        workerState =
    +                new WorkerState(conf, context, topologyId, assignmentId, 
port, workerId, topologyConf, stateStorage,
                             stormClusterState);
     
    -                // Heartbeat here so that worker process dies if this fails
    -                // it's important that worker heartbeat to supervisor ASAP 
so that supervisor knows
    -                // that worker is running and moves on
    -                doHeartBeat();
    +        // Heartbeat here so that worker process dies if this fails
    +        // it's important that worker heartbeat to supervisor ASAP so that 
supervisor knows
    +        // that worker is running and moves on
    +        doHeartBeat();
     
    -                executorsAtom = new AtomicReference<>(null);
    +        executorsAtom = new AtomicReference<>(null);
     
    -                // launch heartbeat threads immediately so that 
slow-loading tasks don't cause the worker to timeout
    -                // to the supervisor
    -                workerState.heartbeatTimer
    -                    .scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
    -                        try {
    -                            doHeartBeat();
    -                        } catch (IOException e) {
    -                            throw new RuntimeException(e);
    -                        }
    -                    });
    +        // launch heartbeat threads immediately so that slow-loading tasks 
don't cause the worker to timeout
    +        // to the supervisor
    +        workerState.heartbeatTimer
    +                .scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
    +                    try {
    +                        doHeartBeat();
    +                    } catch (IOException e) {
    +                        throw new RuntimeException(e);
    +                    }
    +                });
     
    -                workerState.executorHeartbeatTimer
    -                    .scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
    +        workerState.executorHeartbeatTimer
    +                .scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
                             Worker.this::doExecutorHeartbeats);
     
    -                workerState.registerCallbacks();
    +        workerState.registerCallbacks();
     
    -                workerState.refreshConnections(null);
    +        workerState.refreshConnections(null);
     
    -                workerState.activateWorkerWhenAllConnectionsReady();
    +        workerState.activateWorkerWhenAllConnectionsReady();
     
    -                workerState.refreshStormActive(null);
    +        workerState.refreshStormActive(null);
     
    -                workerState.runWorkerStartHooks();
    +        workerState.runWorkerStartHooks();
     
    -                List<IRunningExecutor> newExecutors = new 
ArrayList<IRunningExecutor>();
    -                for (List<Long> e : workerState.getExecutors()) {
    -                    if (ConfigUtils.isLocalMode(topologyConf)) {
    -                        newExecutors.add(
    -                            LocalExecutor.mkExecutor(workerState, e, 
initCreds)
    +        List<IRunningExecutor> newExecutors = new 
ArrayList<IRunningExecutor>();
    +        for (List<Long> e : workerState.getExecutors()) {
    +            if (ConfigUtils.isLocalMode(topologyConf)) {
    +                newExecutors.add(
    +                        LocalExecutor.mkExecutor(workerState, e, initCreds)
                                     .execute());
    -                    } else {
    -                        newExecutors.add(
    -                            Executor.mkExecutor(workerState, e, initCreds)
    +            } else {
    +                newExecutors.add(
    +                        Executor.mkExecutor(workerState, e, initCreds)
                                     .execute());
    -                    }
    -                }
    -                executorsAtom.set(newExecutors);
    +            }
    +        }
     
    -                EventHandler<Object> tupleHandler = (packets, seqId, 
batchEnd) -> workerState
    -                    .sendTuplesToRemoteWorker((HashMap<Integer, 
ArrayList<TaskMessage>>) packets, seqId, batchEnd);
    +        executorsAtom.set(newExecutors);
     
    -                // This thread will publish the messages destined for 
remote tasks to remote connections
    -                transferThread = Utils.asyncLoop(() -> {
    -                    
workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
    -                    return 0L;
    -                });
    +        JCQueue.Consumer tupleHandler = workerState;
     
    -                DisruptorBackpressureCallback disruptorBackpressureHandler 
=
    -                    mkDisruptorBackpressureHandler(workerState);
    -                
workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
    -                workerState.transferQueue
    -                    .setEnableBackpressure((Boolean) 
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
    -                workerState.transferQueue
    -                    
.setHighWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
    -                workerState.transferQueue
    -                    
.setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
    -
    -                WorkerBackpressureCallback backpressureCallback = 
mkBackpressureHandler();
    -                backpressureThread = new 
WorkerBackpressureThread(workerState.backpressureTrigger, workerState, 
backpressureCallback);
    -                if ((Boolean) 
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    -                    backpressureThread.start();
    -                    stormClusterState.topologyBackpressure(topologyId, 
workerState::refreshThrottle);
    -                    
    -                    int pollingSecs = 
ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
    -                    
workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, 
workerState::refreshThrottle);
    -                }
    +        // This thread will send the messages destined for remote tasks 
(out of process)
    +        transferThread = Utils.asyncLoop(() -> {
    +            int x = workerState.transferQueue.consume(tupleHandler);
    +            if(x==0)
    +                return 1L;
    +            return 0L;
    +        });
    +        transferThread.setName("Worker-Transfer");
     
    -                credentialsAtom = new 
AtomicReference<Credentials>(initialCredentials);
    +        credentialsAtom = new 
AtomicReference<Credentials>(initialCredentials);
     
    -                establishLogSettingCallback();
    +        establishLogSettingCallback();
    +
    +        workerState.stormClusterState.credentials(topologyId, 
Worker.this::checkCredentialsChanged);
     
    -                workerState.stormClusterState.credentials(topologyId, 
Worker.this::checkCredentialsChanged);
    +        workerState.refreshCredentialsTimer.scheduleRecurring(0,
    +                (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new 
Runnable() {
    +                    @Override public void run() {
    +                        checkCredentialsChanged();
    +                    }
    +                });
     
    -                workerState.refreshCredentialsTimer.scheduleRecurring(0,
    -                    (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), 
new Runnable() {
    -                        @Override public void run() {
    -                            checkCredentialsChanged();
    -                            if ((Boolean) 
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    -                               checkThrottleChanged();
    -                            }
    +        workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
    +                (Integer) 
conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new 
Runnable() {
    +                    @Override public void run() {
    +                        try {
    +                            LOG.debug("Checking if blobs have updated");
    +                            updateBlobUpdates();
    +                        } catch (IOException e) {
    +                            // IOException from reading the version files 
to be ignored
    +                            LOG.error(e.getStackTrace().toString());
                             }
    -                    });
    -
    -                workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
    -                        (Integer) 
conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new 
Runnable() {
    -                            @Override public void run() {
    -                                try {
    -                                    LOG.debug("Checking if blobs have 
updated");
    -                                    updateBlobUpdates();
    -                                } catch (IOException e) {
    -                                    // IOException from reading the 
version files to be ignored
    -                                    
LOG.error(e.getStackTrace().toString());
    -                                }
    -                            }
    -                        });
    -
    -                // The jitter allows the clients to get the data at 
different times, and avoids thundering herd
    -                if (!(Boolean) 
topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
    -                    
workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, 
workerState::refreshLoad);
    -                }
    +                    }
    +                });
     
    -                workerState.refreshConnectionsTimer.scheduleRecurring(0,
    -                    (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), 
workerState::refreshConnections);
    +        // The jitter allows the clients to get the data at different 
times, and avoids thundering herd
    +        if (!(Boolean) 
topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
    +            workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 
500, workerState::refreshLoad);
    +        }
     
    -                workerState.resetLogLevelsTimer.scheduleRecurring(0,
    -                    (Integer) 
conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), 
logConfigManager::resetLogLevels);
    +        workerState.refreshConnectionsTimer.scheduleRecurring(0,
    +                (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), 
workerState::refreshConnections);
     
    -                workerState.refreshActiveTimer.scheduleRecurring(0, 
(Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
    -                    workerState::refreshStormActive);
    +        workerState.resetLogLevelsTimer.scheduleRecurring(0,
    +                (Integer) 
conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), 
logConfigManager::resetLogLevels);
     
    -                LOG.info("Worker has topology config {}", 
Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
    -                LOG.info("Worker {} for storm {} on {}:{}  has finished 
loading", workerId, topologyId, assignmentId, port);
    -                return this;
    -            };
    -        });
    +        workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) 
conf.get(Config.TASK_REFRESH_POLL_SECS),
    +                workerState::refreshStormActive);
    +
    +        setupFlushTupleTimer(newExecutors);
    +
    +        LOG.info("Worker has topology config {}", 
Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
    +        LOG.info("Worker {} for storm {} on {}:{}  has finished loading", 
workerId, topologyId, assignmentId, port);
    +        return this;
    +    }
     
    +    private void setupFlushTupleTimer(final List<IRunningExecutor> 
executors) {
    +//                StormTimer timerTask = workerState.getUserTimer();
    +        Integer batchSize = 
ObjectReader.getInt(conf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
    +        final Long flushIntervalMs = ObjectReader.getLong( 
conf.get(Config.TOPOLOGY_FLUSH_TUPLE_FREQ_MILLIS) );
    +        if(batchSize==1 || flushIntervalMs==0)
    +            return;
    +
    +        workerState.flushTupleTimer.scheduleRecurringMs(flushIntervalMs, 
flushIntervalMs, new Runnable() {
    +            @Override
    +            public void run() {
    +                for (int i = 0; i < executors.size(); i++) {
    +                    IRunningExecutor exec = executors.get(i);
    +                    if(exec.getExecutorId().get(0) != -1) // dont send to 
system bolt
    --- End diff --
    
    agree.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to