Resolve conflicts
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f7b8cda6 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f7b8cda6 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f7b8cda6 Branch: refs/heads/master Commit: f7b8cda6f678b894f924ab6a8ad3b37815d63083 Parents: 02d290b 99a11cb Author: Abhishek Agarwal <abhishek.agar...@appdynamics.com> Authored: Thu Nov 3 12:57:06 2016 +0530 Committer: Abhishek Agarwal <abhishek.agar...@appdynamics.com> Committed: Thu Nov 3 12:57:06 2016 +0530 ---------------------------------------------------------------------- CHANGELOG.md | 2 + .../org/apache/storm/daemon/local_executor.clj | 27 ---------- storm-core/src/clj/org/apache/storm/testing.clj | 15 ++---- .../jvm/org/apache/storm/ProcessSimulator.java | 14 ++++- .../src/jvm/org/apache/storm/StormTimer.java | 9 ++-- .../org/apache/storm/command/KillWorkers.java | 2 +- .../src/jvm/org/apache/storm/daemon/Task.java | 20 +++---- .../daemon/supervisor/ReadClusterState.java | 55 +++++++++++++++----- .../storm/daemon/supervisor/Supervisor.java | 4 +- .../apache/storm/daemon/supervisor/UniFunc.java | 22 ++++++++ .../supervisor/timer/SupervisorHealthCheck.java | 2 +- .../storm/daemon/worker/LogConfigManager.java | 2 +- .../org/apache/storm/daemon/worker/Worker.java | 6 +-- .../org/apache/storm/utils/DisruptorQueue.java | 10 ++-- .../test/clj/org/apache/storm/worker_test.clj | 34 ------------ .../apache/storm/daemon/worker/WorkerTest.java | 39 ++++++++++++++ 16 files changed, 151 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/f7b8cda6/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/local_executor.clj index aeded77,1e46e37..0000000 deleted file mode 100644,100644 --- a/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj +++ /dev/null @@@ -1,27 -1,42 +1,0 @@@ --;; Licensed to the Apache Software Foundation (ASF) under one --;; or more contributor license agreements. See the NOTICE file --;; distributed with this work for additional information --;; regarding copyright ownership. The ASF licenses this file --;; to you under the Apache License, Version 2.0 (the --;; "License"); you may not use this file except in compliance --;; with the License. You may obtain a copy of the License at --;; --;; http://www.apache.org/licenses/LICENSE-2.0 --;; --;; Unless required by applicable law or agreed to in writing, software --;; distributed under the License is distributed on an "AS IS" BASIS, --;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --;; See the License for the specific language governing permissions and --;; limitations under the License. --(ns org.apache.storm.daemon.local-executor -- (:use [org.apache.storm util config log]) -- (:import [org.apache.storm.tuple AddressedTuple] -- [org.apache.storm.executor Executor ExecutorTransfer]) -- (:import [org.apache.storm.utils DisruptorQueue]) -- (:import [org.apache.storm Config Constants])) -- --(defn local-transfer-executor-tuple [] -- (fn [task tuple batch-transfer->worker] -- (let [val (AddressedTuple. task tuple)] -- (.publish ^DisruptorQueue batch-transfer->worker val)))) - -(defn mk-local-executor-transfer [worker-topology-context batch-queue storm-conf transfer-fn] - (proxy [ExecutorTransfer] [worker-topology-context batch-queue storm-conf transfer-fn] - (transfer [task tuple] - (let [batch-transfer->worker (.getBatchTransferQueue this)] - ((local-transfer-executor-tuple) task tuple batch-transfer->worker))))) -- -(defn mk-local-executor [workerData executorId credentials] - (let [executor (Executor/mkExecutor workerData executorId credentials) - worker-topology-context (.getWorkerTopologyContext executor) - batch-transfer-queue (.getTransferWorkerQueue executor) - storm-conf (.getStormConf executor) - transfer-fn (.getTransferFn executor) - local-executor-transfer (mk-local-executor-transfer worker-topology-context batch-transfer-queue storm-conf transfer-fn)] - (.setLocalExecutorTransfer executor local-executor-transfer) - (.execute executor))) http://git-wip-us.apache.org/repos/asf/storm/blob/f7b8cda6/storm-core/src/clj/org/apache/storm/testing.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/testing.clj index e8ef58b,5081aa4..032f11f --- a/storm-core/src/clj/org/apache/storm/testing.clj +++ b/storm-core/src/clj/org/apache/storm/testing.clj @@@ -17,15 -17,16 +17,14 @@@ (ns org.apache.storm.testing (:require [org.apache.storm.daemon [nimbus :as nimbus] -- [local-executor :as local-executor] [local-supervisor :as local-supervisor] - [common :as common] - [worker :as worker]]) + [common :as common]]) (:import [org.apache.commons.io FileUtils] [org.apache.storm.utils] [org.apache.storm.zookeeper Zookeeper] [org.apache.storm ProcessSimulator] - [org.apache.storm.daemon.supervisor Supervisor StandaloneSupervisor SupervisorUtils] + [org.apache.storm.daemon.supervisor Supervisor StandaloneSupervisor SupervisorUtils ReadClusterState] - [org.apache.storm.executor Executor] + [org.apache.storm.executor Executor LocalExecutor] [java.util.concurrent.atomic AtomicBoolean]) (:import [java.io File]) (:import [java.util HashMap ArrayList]) @@@ -674,14 -673,14 +672,7 @@@ ;; of tuple emission (and not on a separate thread later) for ;; topologies to be tracked correctly. This is because "transferred" *must* ;; be incremented before "processing". -- local-executor/local-transfer-executor-tuple -- (let [old# local-executor/local-transfer-executor-tuple] -- (fn [& args#] -- (let [transferrer# (apply old# args#)] -- (fn [& args2#] -- ;; (log-message "Transferring: " args2#) -- (increment-global! id# "transferred" 1) -- (apply transferrer# args2#)))))] ++ ] (with-simulated-time-local-cluster [~cluster-sym ~@cluster-args] (let [~cluster-sym (assoc-track-id ~cluster-sym id#)] ~@body))) http://git-wip-us.apache.org/repos/asf/storm/blob/f7b8cda6/storm-core/src/jvm/org/apache/storm/daemon/Task.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/Task.java index 2afe4e1,2e846ff..47c1d90 --- a/storm-core/src/jvm/org/apache/storm/daemon/Task.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/Task.java @@@ -178,24 -179,25 +178,24 @@@ public class Task } private TopologyContext mkTopologyContext(StormTopology topology) throws IOException { - Map conf = (Map) workerData.get(Constants.CONF); + Map conf = workerData.getConf(); return new TopologyContext( - topology, - (Map) workerData.get(Constants.STORM_CONF), - (Map<Integer, String>) workerData.get(Constants.TASK_TO_COMPONENT), - (Map<String, List<Integer>>) workerData.get(Constants.COMPONENT_TO_SORTED_TASKS), - (Map<String, Map<String, Fields>>) workerData.get(Constants.COMPONENT_TO_STREAM_TO_FIELDS), - (String) workerData.get(Constants.STORM_ID), - ConfigUtils.supervisorStormResourcesPath( - ConfigUtils.supervisorStormDistRoot(conf, (String) workerData.get(Constants.STORM_ID))), - ConfigUtils.workerPidsRoot(conf, (String) workerData.get(Constants.WORKER_ID)), - taskId, - (Integer) workerData.get(Constants.PORT), - (List<Integer>) workerData.get(Constants.TASK_IDS), - (Map<String, Object>) workerData.get(Constants.DEFAULT_SHARED_RESOURCES), - (Map<String, Object>) workerData.get(Constants.USER_SHARED_RESOURCES), - executor.getSharedExecutorData(), - executor.getIntervalToTaskToMetricToRegistry(), - executor.getOpenOrPrepareWasCalled()); + topology, + workerData.getTopologyConf(), + workerData.getTaskToComponent(), + workerData.getComponentToSortedTasks(), + workerData.getComponentToStreamToFields(), + workerData.getTopologyId(), - ConfigUtils.supervisorStormResourcesPath( - ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())), - ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()), - taskId, - workerData.getPort(), workerData.getTaskIds(), - workerData.getDefaultSharedResources(), - workerData.getUserSharedResources(), - executor.getSharedExecutorData(), - executor.getIntervalToTaskToMetricToRegistry(), - executor.getOpenOrPrepareWasCalled()); ++ ConfigUtils.supervisorStormResourcesPath( ++ ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())), ++ ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()), ++ taskId, ++ workerData.getPort(), workerData.getTaskIds(), ++ workerData.getDefaultSharedResources(), ++ workerData.getUserSharedResources(), ++ executor.getSharedExecutorData(), ++ executor.getIntervalToTaskToMetricToRegistry(), ++ executor.getOpenOrPrepareWasCalled()); } private Object mkTaskObject() { http://git-wip-us.apache.org/repos/asf/storm/blob/f7b8cda6/storm-core/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java index 1d5f93b,0000000..94d1ce1 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java @@@ -1,157 -1,0 +1,157 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.worker; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.LoggerConfig; +import org.apache.storm.utils.Time; +import org.apache.storm.generated.LogConfig; +import org.apache.storm.generated.LogLevel; +import org.apache.storm.generated.LogLevelAction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; + +public class LogConfigManager { + + private static final Logger LOG = LoggerFactory.getLogger(LogConfigManager.class); + + private final AtomicReference<TreeMap<String, LogLevel>> latestLogConfig; + private final Map<String, Level> originalLogLevels; + + public LogConfigManager() { + this(new AtomicReference<>(new TreeMap<>())); + } + + public LogConfigManager(AtomicReference<TreeMap<String, LogLevel>> latestLogConfig) { + this.latestLogConfig = latestLogConfig; + this.originalLogLevels = getLoggerLevels(); + LOG.info("Started with log levels: {}", originalLogLevels); + } + + public void processLogConfigChange(LogConfig logConfig) { + if (null != logConfig) { + LOG.debug("Processing received log config: {}", logConfig); + TreeMap<String, LogLevel> loggers = new TreeMap<>(logConfig.get_named_logger_level()); - LoggerContext logContext = (LoggerContext) LogManager.getContext(); ++ LoggerContext logContext = (LoggerContext) LogManager.getContext(false); + Map<String, LogLevel> newLogConfigs = new HashMap<>(); + for (Map.Entry<String, LogLevel> entry : loggers.entrySet()) { + String msgLoggerName = entry.getKey(); + msgLoggerName = ("ROOT".equalsIgnoreCase(msgLoggerName)) ? LogManager.ROOT_LOGGER_NAME : msgLoggerName; + LogLevel loggerLevel = entry.getValue(); + // the new-timeouts map now contains logger => timeout + if (loggerLevel.is_set_reset_log_level_timeout_epoch()) { + LogLevel copy = new LogLevel(loggerLevel); + if (originalLogLevels.containsKey(msgLoggerName)) { + copy.set_reset_log_level(originalLogLevels.get(msgLoggerName).name()); + } else { + copy.set_reset_log_level(Level.INFO.name()); + } + + //copy.unset_reset_log_level(); + newLogConfigs.put(msgLoggerName, copy); + } + + } + + // Look for deleted log timeouts + TreeMap<String,LogLevel> latestConf = latestLogConfig.get(); + if (latestConf != null) { + for (String loggerName : latestConf.descendingKeySet()) { + if (! newLogConfigs.containsKey(loggerName)) { + // if we had a timeout, but the timeout is no longer active + setLoggerLevel(logContext, loggerName, latestConf.get(loggerName).get_reset_log_level()); + + } + } + } + + // apply new log settings we just received + // the merged configs are only for the reset logic + for (String loggerName : new TreeSet<>(logConfig.get_named_logger_level().keySet())) { + LogLevel logLevel = logConfig.get_named_logger_level().get(loggerName); + loggerName = ("ROOT".equalsIgnoreCase(loggerName)) ? LogManager.ROOT_LOGGER_NAME : loggerName; + LogLevelAction action = logLevel.get_action(); + if (action == LogLevelAction.UPDATE) { + setLoggerLevel(logContext, loggerName, logLevel.get_target_log_level()); + } + + } + + logContext.updateLoggers(); + latestLogConfig.set(new TreeMap<>(newLogConfigs)); + LOG.debug("New merged log config is {}", latestLogConfig.get()); + } + } + + // function called on timer to reset log levels last set to DEBUG + // also called from processLogConfigChange + public void resetLogLevels() { + TreeMap<String, LogLevel> latestLogLevelMap = latestLogConfig.get(); + LoggerContext loggerContext = (LoggerContext) LogManager.getContext(false); + + for (String loggerName : latestLogLevelMap.descendingKeySet()) { + LogLevel loggerSetting = latestLogLevelMap.get(loggerName); + long timeout = loggerSetting.get_reset_log_level_timeout_epoch(); + String resetLogLevel = loggerSetting.get_reset_log_level(); + if (timeout < Time.currentTimeMillis()) { + LOG.info("{}: Resetting level to {}", loggerName, resetLogLevel); + setLoggerLevel(loggerContext, loggerName, resetLogLevel); + } + latestLogConfig.getAndUpdate(input -> { + TreeMap<String, LogLevel> result = new TreeMap<>(input); + result.remove(loggerName); + return result; + }); + } + loggerContext.updateLoggers(); + } + + public Map<String, Level> getLoggerLevels() { + Configuration loggerConfig = ((LoggerContext) LogManager.getContext(false)).getConfiguration(); + Map<String, Level> logLevelMap = new HashMap<>(); + for (Map.Entry<String, LoggerConfig> entry : loggerConfig.getLoggers().entrySet()) { + logLevelMap.put(entry.getKey(), entry.getValue().getLevel()); + } + return logLevelMap; + } + + public void setLoggerLevel(LoggerContext logContext, String loggerName, String newLevelStr) { + Level newLevel = Level.getLevel(newLevelStr); + Configuration configuration = logContext.getConfiguration(); + LoggerConfig loggerConfig = configuration.getLoggerConfig(loggerName); + if (loggerConfig.getName().equalsIgnoreCase(loggerName)) { + LOG.info("Setting {} log level to: {}", loggerConfig, newLevel); + loggerConfig.setLevel(newLevel); + } else { + // create a new config. Make it additive (true) s.t. inherit parents appenders + LoggerConfig newLoggerConfig = new LoggerConfig(loggerName, newLevel, true); + LOG.info("Adding config for: {} with level: {}", newLoggerConfig, newLevel); + configuration.addLogger(loggerName, newLoggerConfig); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/f7b8cda6/storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java index 15f26e0,0000000..18715f7 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java @@@ -1,426 -1,0 +1,426 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.worker; + +import com.google.common.base.Preconditions; +import com.lmax.disruptor.EventHandler; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.ObjectUtils; +import org.apache.storm.Config; +import org.apache.storm.cluster.*; +import org.apache.storm.daemon.DaemonCommon; +import org.apache.storm.daemon.Shutdownable; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.ExecutorShutdown; +import org.apache.storm.executor.IRunningExecutor; +import org.apache.storm.executor.LocalExecutor; +import org.apache.storm.generated.*; +import org.apache.storm.messaging.IConnection; +import org.apache.storm.messaging.IContext; +import org.apache.storm.messaging.TaskMessage; +import org.apache.storm.security.auth.AuthUtils; +import org.apache.storm.security.auth.IAutoCredentials; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.utils.*; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J; + +import javax.security.auth.Subject; +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.security.PrivilegedExceptionAction; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class Worker implements Shutdownable, DaemonCommon { + + private static final Logger LOG = LoggerFactory.getLogger(Worker.class); + private final Map conf; + private final IContext context; + private final String topologyId; + private final String assignmentId; + private final int port; + private final String workerId; + private final LogConfigManager logConfigManager; + + + private WorkerState workerState; + private AtomicReference<List<IRunningExecutor>> executorsAtom; + private Thread transferThread; + private WorkerBackpressureThread backpressureThread; + + private AtomicReference<Credentials> credentialsAtom; + private Subject subject; + private Collection<IAutoCredentials> autoCreds; + + + /** + * TODO: should worker even take the topologyId as input? this should be + * deducable from cluster state (by searching through assignments) + * what about if there's inconsistency in assignments? -> but nimbus should guarantee this consistency + * + * @param conf - Storm configuration + * @param context - + * @param topologyId - topology id + * @param assignmentId - assignement id + * @param port - port on which the worker runs + * @param workerId - worker id + */ + + public Worker(Map conf, IContext context, String topologyId, String assignmentId, int port, String workerId) { + this.conf = conf; + this.context = context; + this.topologyId = topologyId; + this.assignmentId = assignmentId; + this.port = port; + this.workerId = workerId; + this.logConfigManager = new LogConfigManager(); + } + + public void start() throws Exception { + LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId, + conf); + // because in local mode, its not a separate + // process. supervisor will register it in this case + // if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode. + if (!ConfigUtils.isLocalMode(conf)) { + // Distributed mode + SysOutOverSLF4J.sendSystemOutAndErrToSLF4J(); + String pid = Utils.processPid(); + FileUtils.touch(new File(ConfigUtils.workerPidPath(conf, workerId, pid))); + FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid, - Charset.defaultCharset()); ++ Charset.forName("UTF-8")); + } + final Map topologyConf = + ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId)); + List<ACL> acls = Utils.getWorkerACL(topologyConf); + IStateStorage stateStorage = + ClusterUtils.mkStateStorage(conf, topologyConf, acls, new ClusterStateContext(DaemonType.WORKER)); + IStormClusterState stormClusterState = + ClusterUtils.mkStormClusterState(stateStorage, acls, new ClusterStateContext()); + Credentials initialCredentials = stormClusterState.credentials(topologyId, null); + autoCreds = AuthUtils.GetAutoCredentials(topologyConf); + subject = AuthUtils.populateSubject(null, autoCreds, initialCredentials.get_creds()); + + Subject.doAs(subject, new PrivilegedExceptionAction<Object>() { + @Override public Object run() throws Exception { + workerState = + new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage, + stormClusterState); + + // Heartbeat here so that worker process dies if this fails + // it's important that worker heartbeat to supervisor ASAP so that supervisor knows + // that worker is running and moves on + doHeartBeat(); + + executorsAtom = new AtomicReference<>(null); + + // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout + // to the supervisor + workerState.heartbeatTimer + .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> { + try { + doHeartBeat(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + workerState.executorHeartbeatTimer + .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), + Worker.this::doExecutorHeartbeats); + + workerState.registerCallbacks(); + + workerState.refreshConnections(null); + + workerState.activateWorkerWhenAllConnectionsReady(); + + workerState.refreshStormActive(null); + + workerState.runWorkerStartHooks(); + + List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>(); + for (List<Long> e : workerState.getExecutors()) { + if (ConfigUtils.isLocalMode(topologyConf)) { + newExecutors.add( + LocalExecutor.mkExecutor(workerState, e, initialCredentials.get_creds()) + .execute()); + } else { + newExecutors.add( + Executor.mkExecutor(workerState, e, initialCredentials.get_creds()) + .execute()); + } + } + executorsAtom.set(newExecutors); + + EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState + .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd); + + // This thread will publish the messages destined for remote tasks to remote connections + transferThread = Utils.asyncLoop(() -> { + workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler); + return 0L; + }); + + DisruptorBackpressureCallback disruptorBackpressureHandler = + mkDisruptorBackpressureHandler(workerState); + workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler); + workerState.transferQueue + .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)); + workerState.transferQueue - .setHighWaterMark((Double) topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)); ++ .setHighWaterMark(Utils.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK))); + workerState.transferQueue - .setLowWaterMark((Double) topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)); ++ .setLowWaterMark(Utils.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK))); + + WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler(); + backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback); + if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) { + backpressureThread.start(); + stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle); + } + + credentialsAtom = new AtomicReference<Credentials>(initialCredentials); + + establishLogSettingCallback(); + + workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged); + + workerState.refreshCredentialsTimer.scheduleRecurring(0, + (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() { + @Override public void run() { + checkCredentialsChanged(); + if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) { + checkThrottleChanged(); + } + } + }); + + // The jitter allows the clients to get the data at different times, and avoids thundering herd + if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) { + workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad); + } + + workerState.refreshConnectionsTimer.scheduleRecurring(0, + (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections); + + workerState.resetLogTevelsTimer.scheduleRecurring(0, + (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels); + + workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), + workerState::refreshStormActive); + + LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD)); + LOG.info("Worker {} for storm {} on {}:{} has finished loading", workerId, topologyId, assignmentId, port); + return this; + }; + }); + + } + + public void doHeartBeat() throws IOException { + LocalState state = ConfigUtils.workerState(workerState.conf, workerState.workerId); + state.setWorkerHeartBeat(new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId, + workerState.executors.stream() + .map(executor -> new ExecutorInfo(executor.get(0).intValue(), executor.get(1).intValue())) + .collect(Collectors.toList()), workerState.port)); + state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up. + // it shouldn't take supervisor 120 seconds between listing dir and reading it + } + + public void doExecutorHeartbeats() { + Map<List<Integer>, ExecutorStats> stats; + List<IRunningExecutor> executors = this.executorsAtom.get(); + if (null == executors) { + stats = StatsUtil.mkEmptyExecutorZkHbs(workerState.executors); + } else { + stats = StatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors + .toMap((Function<IRunningExecutor, List<Long>>) IRunningExecutor::getExecutorId, + (Function<IRunningExecutor, ExecutorStats>) IRunningExecutor::renderStats))); + } + Map<String, Object> zkHB = StatsUtil.mkZkWorkerHb(workerState.topologyId, stats, workerState.uptime.upTime()); + try { + workerState.stormClusterState + .workerHeartbeat(workerState.topologyId, workerState.assignmentId, (long) workerState.port, + StatsUtil.thriftifyZkWorkerHb(zkHB)); + } catch (Exception ex) { + LOG.error("Worker failed to write heartbeats to ZK or Pacemaker...will retry", ex); + } + } + + public void checkCredentialsChanged() { + Credentials newCreds = workerState.stormClusterState.credentials(topologyId, null); + if (! ObjectUtils.equals(newCreds, credentialsAtom.get())) { + // This does not have to be atomic, worst case we update when one is not needed + AuthUtils.updateSubject(subject, autoCreds, (null == newCreds) ? null : newCreds.get_creds()); + for (IRunningExecutor executor : executorsAtom.get()) { + executor.credenetialsChanged(newCreds); + } + credentialsAtom.set(newCreds); + } + } + + public void checkThrottleChanged() { + boolean throttleOn = workerState.stormClusterState.topologyBackpressure(topologyId, this::checkThrottleChanged); + workerState.throttleOn.set(throttleOn); + } + + public void checkLogConfigChanged() { + LogConfig logConfig = workerState.stormClusterState.topologyLogConfig(topologyId, null); + logConfigManager.processLogConfigChange(logConfig); + establishLogSettingCallback(); + } + + public void establishLogSettingCallback() { + workerState.stormClusterState.topologyLogConfig(topologyId, this::checkLogConfigChanged); + } + + + /** + * make a handler for the worker's send disruptor queue to + * check highWaterMark and lowWaterMark for backpressure + */ + private DisruptorBackpressureCallback mkDisruptorBackpressureHandler(WorkerState workerState) { + return new DisruptorBackpressureCallback() { + @Override public void highWaterMark() throws Exception { + workerState.transferBackpressure.set(true); + WorkerBackpressureThread.notifyBackpressureChecker(workerState.backpressureTrigger); + } + + @Override public void lowWaterMark() throws Exception { + workerState.transferBackpressure.set(false); + WorkerBackpressureThread.notifyBackpressureChecker(workerState.backpressureTrigger); + } + }; + } + + /** + * make a handler that checks and updates worker's backpressure flag + */ + private WorkerBackpressureCallback mkBackpressureHandler() { + final List<IRunningExecutor> executors = executorsAtom.get(); + return new WorkerBackpressureCallback() { + @Override public void onEvent(Object obj) { + String topologyId = workerState.topologyId; + String assignmentId = workerState.assignmentId; + int port = workerState.port; + IStormClusterState stormClusterState = workerState.stormClusterState; + boolean prevBackpressureFlag = workerState.backpressure.get(); + boolean currBackpressureFlag = prevBackpressureFlag; + if (null != executors) { + currBackpressureFlag = workerState.transferQueue.getThrottleOn() || (executors.stream() + .map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get()); + } + + if (currBackpressureFlag != prevBackpressureFlag) { + try { + LOG.debug("worker backpressure flag changing from {} to {}", prevBackpressureFlag, currBackpressureFlag); + stormClusterState.workerBackpressure(topologyId, assignmentId, (long) port, currBackpressureFlag); + } catch (Exception ex) { + LOG.error("workerBackpressure update failed when connecting to ZK ... will retry", ex); + } + } + } + }; + } + + @Override public void shutdown() { + try { + LOG.info("Shutting down worker {} {} {}", topologyId, assignmentId, port); + + for (IConnection socket : workerState.cachedNodeToPortSocket.get().values()) { + //this will do best effort flushing since the linger period + // was set on creation + socket.close(); + } + LOG.info("Terminating messaging context"); + LOG.info("Shutting down executors"); + for (IRunningExecutor executor : executorsAtom.get()) { + ((ExecutorShutdown) executor).shutdown(); + } + LOG.info("Shut down executors"); + + // this is fine because the only time this is shared is when it's a local context, + // in which case it's a noop + workerState.mqContext.term(); + LOG.info("Shutting down transfer thread"); + workerState.transferQueue.haltWithInterrupt(); + + transferThread.interrupt(); + transferThread.join(); + LOG.info("Shut down transfer thread"); + + backpressureThread.terminate(); + LOG.info("Shut down backpressure thread"); + + workerState.heartbeatTimer.close(); + workerState.refreshConnectionsTimer.close(); + workerState.refreshCredentialsTimer.close(); + workerState.refreshActiveTimer.close(); + workerState.executorHeartbeatTimer.close(); + workerState.userTimer.close(); + workerState.refreshLoadTimer.close(); + workerState.resetLogTevelsTimer.close(); + workerState.closeResources(); + + LOG.info("Trigger any worker shutdown hooks"); + workerState.runWorkerShutdownHooks(); + + workerState.stormClusterState.removeWorkerHeartbeat(topologyId, assignmentId, (long) port); + workerState.stormClusterState.removeWorkerBackpressure(topologyId, assignmentId, (long) port); + LOG.info("Disconnecting from storm cluster state context"); + workerState.stormClusterState.disconnect(); + workerState.stateStorage.close(); + LOG.info("Shut down worker {} {} {}", topologyId, assignmentId, port); + } catch (Exception ex) { + throw Utils.wrapInRuntime(ex); + } + + } + + @Override public boolean isWaiting() { + return workerState.heartbeatTimer.isTimerWaiting() + && workerState.refreshConnectionsTimer.isTimerWaiting() + && workerState.refreshLoadTimer.isTimerWaiting() + && workerState.refreshCredentialsTimer.isTimerWaiting() + && workerState.refreshActiveTimer.isTimerWaiting() + && workerState.executorHeartbeatTimer.isTimerWaiting() + && workerState.userTimer.isTimerWaiting(); + } + + public static void main(String[] args) throws Exception { + Preconditions.checkArgument(args.length == 4, "Illegal number of arguemtns. Expected: 4, Actual: " + args.length); + String stormId = args[0]; + String assignmentId = args[1]; + String portStr = args[2]; + String workerId = args[3]; + Map conf = Utils.readStormConfig(); + Utils.setupDefaultUncaughtExceptionHandler(); + StormCommon.validateDistributedMode(conf); + Worker worker = new Worker(conf, null, stormId, assignmentId, Integer.parseInt(portStr), workerId); + worker.start(); + Utils.addShutdownHookWithForceKillIn1Sec(worker::shutdown); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/f7b8cda6/storm-core/test/clj/org/apache/storm/worker_test.clj ---------------------------------------------------------------------- diff --cc storm-core/test/clj/org/apache/storm/worker_test.clj index 3848b16,6b6fede..0000000 deleted file mode 100644,100644 --- a/storm-core/test/clj/org/apache/storm/worker_test.clj +++ /dev/null @@@ -1,34 -1,205 +1,0 @@@ --;; Licensed to the Apache Software Foundation (ASF) under one --;; or more contributor license agreements. See the NOTICE file --;; distributed with this work for additional information --;; regarding copyright ownership. The ASF licenses this file --;; to you under the Apache License, Version 2.0 (the --;; "License"); you may not use this file except in compliance --;; with the License. You may obtain a copy of the License at --;; --;; http://www.apache.org/licenses/LICENSE-2.0 --;; --;; Unless required by applicable law or agreed to in writing, software --;; distributed under the License is distributed on an "AS IS" BASIS, --;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --;; See the License for the specific language governing permissions and --;; limitations under the License. --(ns org.apache.storm.worker-test -- (:use [clojure test]) - (:use [org.apache.storm testing]) - (:import [org.apache.storm.messaging ConnectionWithStatus ConnectionWithStatus$Status]) - (:import [org.mockito Mockito] - (org.apache.storm.daemon.worker WorkerState)) - (:require [org.apache.storm.daemon [worker :as worker]]) - (:require [conjure.core]) - (:require [clj-time.core :as time]) - (:require [clj-time.coerce :as coerce]) - (:import [org.apache.storm.generated LogConfig LogLevel LogLevelAction]) - (:import [org.apache.logging.log4j Level LogManager]) - (:import [org.slf4j Logger]) - (:use [conjure core]) - (:use [org.apache.storm testing log]) - (:use [org.apache.storm.daemon common]) - (:use [clojure.string :only [join]]) - (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status]) - (:import [org.mockito Mockito]) -- ) - - -(deftest test-log-reset-should-not-trigger-for-future-time - (with-local-cluster [cluster] - (let [worker (:worker cluster) - present (time/now) - the-future (coerce/to-long (time/plus present (time/secs 1))) - mock-config {"foo" {:timeout the-future}} - mock-config-atom (atom mock-config)] - (stubbing [time/now present] - (worker/reset-log-levels mock-config-atom) - ;; if the worker doesn't reset log levels, the atom should not be nil - (is (not(= @mock-config-atom nil))))))) - -(deftest test-log-reset-triggers-for-past-time - (with-local-cluster [cluster] - (let [worker (:worker cluster) - present (time/now) - past (time/plus present (time/secs -1)) - mock-config {"foo" { :timeout (coerce/to-long past) - :target-log-level Level/INFO - :reset-log-level Level/WARN}} - mock-config-atom (atom mock-config)] - (stubbing [time/now present] - (worker/reset-log-levels mock-config-atom) - ;; the logger config is removed from atom - (is (= @mock-config-atom {})))))) - -(deftest test-log-reset-resets-does-nothing-for-empty-log-config - (with-local-cluster [cluster] - (let [worker (:worker cluster) - present (time/now) - past (coerce/to-long (time/plus present (time/secs -1))) - mock-config {} - mock-config-atom (atom mock-config)] - (stubbing [worker/set-logger-level nil - time/now present] - (worker/reset-log-levels mock-config-atom) - ;; if the worker resets log level, the atom is nil'ed out - (is (= @mock-config-atom {})) - ;; test that the set-logger-level function was not called - (verify-call-times-for worker/set-logger-level 0))))) - -(deftest test-log-reset-resets-root-logger-if-set - (with-local-cluster [cluster] - (let [worker (:worker cluster) - present (time/now) - past (coerce/to-long (time/plus present (time/secs -1))) - mock-config {LogManager/ROOT_LOGGER_NAME {:timeout past - :target-log-level Level/DEBUG - :reset-log-level Level/WARN}} - mock-config-atom (atom mock-config)] - (stubbing [worker/set-logger-level nil - time/now present] - (worker/reset-log-levels mock-config-atom) - ;; if the worker resets log level, the atom is reset to {} - (is (= @mock-config-atom {})) - ;; ensure we reset back to WARN level - (verify-call-times-for worker/set-logger-level 1) - (verify-first-call-args-for-indices worker/set-logger-level [1 2] LogManager/ROOT_LOGGER_NAME Level/WARN))))) - -;;This should be removed when it goes into conjure -(defmacro verify-nth-call-args-for-indices - "Asserts that the function was called at least once, and the nth call was - passed the args specified, into the indices of the arglist specified. In - other words, it checks only the particular args you care about." - [n fn-name indices & args] - `(do - (assert-in-fake-context "verify-first-call-args-for-indices") - (assert-conjurified-fn "verify-first-call-args-for-indices" ~fn-name) - (is (< ~n (count (get @call-times ~fn-name))) - (str "(verify-nth-call-args-for-indices " ~n " " ~fn-name " " ~indices " " ~(join " " args) ")")) - (let [nth-call-args# (nth (get @call-times ~fn-name) ~n) - indices-in-range?# (< (apply max ~indices) (count nth-call-args#))] - (if indices-in-range?# - (is (= ~(vec args) (map #(nth nth-call-args# %) ~indices)) - (str "(verify-first-call-args-for-indices " ~n " " ~fn-name " " ~indices " " ~(join " " args) ")")) - (is (= :fail (format "indices %s are out of range for the args, %s" ~indices ~(vec args))) - (str "(verify-first-call-args-for-indices " ~n " " ~fn-name " " ~indices " " ~(join " " args) ")")))))) - -(deftest test-log-resets-named-loggers-with-past-timeout - (with-local-cluster [cluster] - (let [worker (:worker cluster) - present (time/now) - past (coerce/to-long (time/plus present (time/secs -1))) - mock-config {"my_debug_logger" {:timeout past - :target-log-level Level/DEBUG - :reset-log-level Level/INFO} - "my_info_logger" {:timeout past - :target-log-level Level/INFO - :reset-log-level Level/WARN} - "my_error_logger" {:timeout past - :target-log-level Level/ERROR - :reset-log-level Level/INFO}} - result (atom {}) - mock-config-atom (atom mock-config)] - (stubbing [worker/set-logger-level nil - time/now present] - (worker/reset-log-levels mock-config-atom) - ;; if the worker resets log level, the atom is reset to {} - (is (= @mock-config-atom {})) - (verify-call-times-for worker/set-logger-level 3) - (verify-nth-call-args-for-indices 0 worker/set-logger-level [1 2] "my_debug_logger" Level/INFO) - (verify-nth-call-args-for-indices 1 worker/set-logger-level [1 2] "my_error_logger" Level/INFO) - (verify-nth-call-args-for-indices 2 worker/set-logger-level [1 2] "my_info_logger" Level/WARN))))) - -(deftest test-process-root-log-level-to-debug-sets-logger-and-timeout-2 - (with-local-cluster [cluster] - (let [worker (:worker cluster) - mock-config (LogConfig.) - root-level (LogLevel.) - mock-config-atom (atom nil) - orig-levels (atom {}) - present (time/now) - in-thirty-seconds (coerce/to-long (time/plus present (time/secs 30)))] - ;; configure the root logger to be debug - (.set_reset_log_level_timeout_epoch root-level in-thirty-seconds) - (.set_target_log_level root-level "DEBUG") - (.set_action root-level LogLevelAction/UPDATE) - (.put_to_named_logger_level mock-config "ROOT" root-level) - (stubbing [worker/set-logger-level nil - time/now present] - (worker/process-log-config-change mock-config-atom orig-levels mock-config) - ;; test that the set-logger-level function was not called - (log-message "Tests " @mock-config-atom) - (verify-call-times-for worker/set-logger-level 1) - (verify-nth-call-args-for-indices 0 worker/set-logger-level [1 2] "" Level/DEBUG) - (let [root-result (get @mock-config-atom LogManager/ROOT_LOGGER_NAME)] - (is (= (:action root-result) LogLevelAction/UPDATE)) - (is (= (:target-log-level root-result) Level/DEBUG)) - ;; defaults to INFO level when the logger isn't found previously - (is (= (:reset-log-level root-result) Level/INFO)) - (is (= (:timeout root-result) in-thirty-seconds))))))) - -(deftest test-process-root-log-level-to-debug-sets-logger-and-timeout - (with-local-cluster [cluster] - (let [worker (:worker cluster) - mock-config (LogConfig.) - root-level (LogLevel.) - orig-levels (atom {}) - present (time/now) - in-thirty-seconds (coerce/to-long (time/plus present (time/secs 30))) - mock-config-atom (atom {})] - ;; configure the root logger to be debug - (doseq [named {"ROOT" "DEBUG" - "my_debug_logger" "DEBUG" - "my_info_logger" "INFO" - "my_error_logger" "ERROR"}] - (let [level (LogLevel.)] - (.set_action level LogLevelAction/UPDATE) - (.set_reset_log_level_timeout_epoch level in-thirty-seconds) - (.set_target_log_level level (val named)) - (.put_to_named_logger_level mock-config (key named) level))) - (log-message "Tests " mock-config) - (stubbing [worker/set-logger-level nil - time/now present] - (worker/process-log-config-change mock-config-atom orig-levels mock-config) - (verify-call-times-for worker/set-logger-level 4) - (verify-nth-call-args-for-indices 0 worker/set-logger-level [1 2] "" Level/DEBUG) - (verify-nth-call-args-for-indices 1 worker/set-logger-level [1 2] "my_debug_logger" Level/DEBUG) - (verify-nth-call-args-for-indices 2 worker/set-logger-level [1 2] "my_error_logger" Level/ERROR) - (verify-nth-call-args-for-indices 3 worker/set-logger-level [1 2] "my_info_logger" Level/INFO))))) -- --(deftest test-worker-is-connection-ready -- (let [connection (Mockito/mock ConnectionWithStatus)] -- (. (Mockito/when (.status connection)) thenReturn ConnectionWithStatus$Status/Ready) - (is (= true (WorkerState/isConnectionReady connection))) - (is (= true (worker/is-connection-ready connection))) -- -- (. (Mockito/when (.status connection)) thenReturn ConnectionWithStatus$Status/Connecting) - (is (= false (WorkerState/isConnectionReady connection))) - (is (= false (worker/is-connection-ready connection))) -- -- (. (Mockito/when (.status connection)) thenReturn ConnectionWithStatus$Status/Closed) - (is (= false (WorkerState/isConnectionReady connection))) - (is (= false (worker/is-connection-ready connection))) -- )) http://git-wip-us.apache.org/repos/asf/storm/blob/f7b8cda6/storm-core/test/jvm/org/apache/storm/daemon/worker/WorkerTest.java ---------------------------------------------------------------------- diff --cc storm-core/test/jvm/org/apache/storm/daemon/worker/WorkerTest.java index 0000000,0000000..1414bb0 new file mode 100644 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/daemon/worker/WorkerTest.java @@@ -1,0 -1,0 +1,39 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * <p> ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * <p> ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.storm.daemon.worker; ++import org.apache.storm.messaging.ConnectionWithStatus; ++import org.junit.Test; ++import org.mockito.Mockito; ++ ++import static org.junit.Assert.*; ++ ++public class WorkerTest { ++ @Test ++ public void testWorkerIsConnectionReady() { ++ ConnectionWithStatus connection = Mockito.mock(ConnectionWithStatus.class); ++ Mockito.when(connection.status()).thenReturn(ConnectionWithStatus.Status.Ready); ++ assertTrue(WorkerState.isConnectionReady(connection)); ++ ++ Mockito.when(connection.status()).thenReturn(ConnectionWithStatus.Status.Connecting); ++ assertFalse(WorkerState.isConnectionReady(connection)); ++ ++ Mockito.when(connection.status()).thenReturn(ConnectionWithStatus.Status.Closed); ++ assertFalse(WorkerState.isConnectionReady(connection)); ++ } ++}