Resolve conflicts

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f7b8cda6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f7b8cda6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f7b8cda6

Branch: refs/heads/master
Commit: f7b8cda6f678b894f924ab6a8ad3b37815d63083
Parents: 02d290b 99a11cb
Author: Abhishek Agarwal <abhishek.agar...@appdynamics.com>
Authored: Thu Nov 3 12:57:06 2016 +0530
Committer: Abhishek Agarwal <abhishek.agar...@appdynamics.com>
Committed: Thu Nov 3 12:57:06 2016 +0530

----------------------------------------------------------------------
 CHANGELOG.md                                    |  2 +
 .../org/apache/storm/daemon/local_executor.clj  | 27 ----------
 storm-core/src/clj/org/apache/storm/testing.clj | 15 ++----
 .../jvm/org/apache/storm/ProcessSimulator.java  | 14 ++++-
 .../src/jvm/org/apache/storm/StormTimer.java    |  9 ++--
 .../org/apache/storm/command/KillWorkers.java   |  2 +-
 .../src/jvm/org/apache/storm/daemon/Task.java   | 20 +++----
 .../daemon/supervisor/ReadClusterState.java     | 55 +++++++++++++++-----
 .../storm/daemon/supervisor/Supervisor.java     |  4 +-
 .../apache/storm/daemon/supervisor/UniFunc.java | 22 ++++++++
 .../supervisor/timer/SupervisorHealthCheck.java |  2 +-
 .../storm/daemon/worker/LogConfigManager.java   |  2 +-
 .../org/apache/storm/daemon/worker/Worker.java  |  6 +--
 .../org/apache/storm/utils/DisruptorQueue.java  | 10 ++--
 .../test/clj/org/apache/storm/worker_test.clj   | 34 ------------
 .../apache/storm/daemon/worker/WorkerTest.java  | 39 ++++++++++++++
 16 files changed, 151 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f7b8cda6/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/local_executor.clj
index aeded77,1e46e37..0000000
deleted file mode 100644,100644
--- a/storm-core/src/clj/org/apache/storm/daemon/local_executor.clj
+++ /dev/null
@@@ -1,27 -1,42 +1,0 @@@
--;; Licensed to the Apache Software Foundation (ASF) under one
--;; or more contributor license agreements.  See the NOTICE file
--;; distributed with this work for additional information
--;; regarding copyright ownership.  The ASF licenses this file
--;; to you under the Apache License, Version 2.0 (the
--;; "License"); you may not use this file except in compliance
--;; with the License.  You may obtain a copy of the License at
--;;
--;; http://www.apache.org/licenses/LICENSE-2.0
--;;
--;; Unless required by applicable law or agreed to in writing, software
--;; distributed under the License is distributed on an "AS IS" BASIS,
--;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--;; See the License for the specific language governing permissions and
--;; limitations under the License.
--(ns org.apache.storm.daemon.local-executor
--  (:use [org.apache.storm util config log])
--  (:import [org.apache.storm.tuple AddressedTuple]
--           [org.apache.storm.executor Executor ExecutorTransfer])
--  (:import [org.apache.storm.utils DisruptorQueue])
--  (:import [org.apache.storm Config Constants]))
--
--(defn local-transfer-executor-tuple []
--  (fn [task tuple batch-transfer->worker]
--    (let [val (AddressedTuple. task tuple)]
--      (.publish ^DisruptorQueue batch-transfer->worker val))))
 -
 -(defn mk-local-executor-transfer [worker-topology-context batch-queue 
storm-conf transfer-fn]
 -  (proxy [ExecutorTransfer] [worker-topology-context batch-queue storm-conf 
transfer-fn]
 -    (transfer [task tuple]
 -      (let [batch-transfer->worker (.getBatchTransferQueue this)]
 -        ((local-transfer-executor-tuple) task tuple 
batch-transfer->worker)))))
--
 -(defn mk-local-executor [workerData executorId credentials]
 -  (let [executor (Executor/mkExecutor workerData executorId credentials)
 -        worker-topology-context (.getWorkerTopologyContext executor)
 -        batch-transfer-queue (.getTransferWorkerQueue executor)
 -        storm-conf (.getStormConf executor)
 -        transfer-fn (.getTransferFn executor)
 -        local-executor-transfer (mk-local-executor-transfer 
worker-topology-context batch-transfer-queue storm-conf transfer-fn)]
 -    (.setLocalExecutorTransfer executor local-executor-transfer)
 -    (.execute executor)))

http://git-wip-us.apache.org/repos/asf/storm/blob/f7b8cda6/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/testing.clj
index e8ef58b,5081aa4..032f11f
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@@ -17,15 -17,16 +17,14 @@@
  (ns org.apache.storm.testing
    (:require [org.apache.storm.daemon
               [nimbus :as nimbus]
--             [local-executor :as local-executor]
               [local-supervisor :as local-supervisor]
 -             [common :as common]
 -             [worker :as worker]])
 +             [common :as common]])
    (:import [org.apache.commons.io FileUtils]
             [org.apache.storm.utils]
             [org.apache.storm.zookeeper Zookeeper]
             [org.apache.storm ProcessSimulator]
-            [org.apache.storm.daemon.supervisor Supervisor 
StandaloneSupervisor SupervisorUtils]
+            [org.apache.storm.daemon.supervisor Supervisor 
StandaloneSupervisor SupervisorUtils ReadClusterState]
 -           [org.apache.storm.executor Executor]
 +           [org.apache.storm.executor Executor LocalExecutor]
             [java.util.concurrent.atomic AtomicBoolean])
    (:import [java.io File])
    (:import [java.util HashMap ArrayList])
@@@ -674,14 -673,14 +672,7 @@@
           ;; of tuple emission (and not on a separate thread later) for
           ;; topologies to be tracked correctly. This is because "transferred" 
*must*
           ;; be incremented before "processing".
--         local-executor/local-transfer-executor-tuple
--         (let [old# local-executor/local-transfer-executor-tuple]
--           (fn [& args#]
--             (let [transferrer# (apply old# args#)]
--               (fn [& args2#]
--                 ;; (log-message "Transferring: " args2#)
--                 (increment-global! id# "transferred" 1)
--                 (apply transferrer# args2#)))))]
++         ]
            (with-simulated-time-local-cluster [~cluster-sym ~@cluster-args]
                                (let [~cluster-sym (assoc-track-id ~cluster-sym 
id#)]
                                  ~@body)))

http://git-wip-us.apache.org/repos/asf/storm/blob/f7b8cda6/storm-core/src/jvm/org/apache/storm/daemon/Task.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/Task.java
index 2afe4e1,2e846ff..47c1d90
--- a/storm-core/src/jvm/org/apache/storm/daemon/Task.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/Task.java
@@@ -178,24 -179,25 +178,24 @@@ public class Task 
      }
  
      private TopologyContext mkTopologyContext(StormTopology topology) throws 
IOException {
 -        Map conf = (Map) workerData.get(Constants.CONF);
 +        Map conf = workerData.getConf();
          return new TopologyContext(
 -                topology,
 -                (Map) workerData.get(Constants.STORM_CONF),
 -                (Map<Integer, String>) 
workerData.get(Constants.TASK_TO_COMPONENT),
 -                (Map<String, List<Integer>>) 
workerData.get(Constants.COMPONENT_TO_SORTED_TASKS),
 -                (Map<String, Map<String, Fields>>) 
workerData.get(Constants.COMPONENT_TO_STREAM_TO_FIELDS),
 -                (String) workerData.get(Constants.STORM_ID),
 -                ConfigUtils.supervisorStormResourcesPath(
 -                        ConfigUtils.supervisorStormDistRoot(conf, (String) 
workerData.get(Constants.STORM_ID))),
 -                ConfigUtils.workerPidsRoot(conf, (String) 
workerData.get(Constants.WORKER_ID)),
 -                taskId,
 -                (Integer) workerData.get(Constants.PORT),
 -                (List<Integer>) workerData.get(Constants.TASK_IDS),
 -                (Map<String, Object>) 
workerData.get(Constants.DEFAULT_SHARED_RESOURCES),
 -                (Map<String, Object>) 
workerData.get(Constants.USER_SHARED_RESOURCES),
 -                executor.getSharedExecutorData(),
 -                executor.getIntervalToTaskToMetricToRegistry(),
 -                executor.getOpenOrPrepareWasCalled());
 +            topology,
 +            workerData.getTopologyConf(),
 +            workerData.getTaskToComponent(),
 +            workerData.getComponentToSortedTasks(),
 +            workerData.getComponentToStreamToFields(),
 +            workerData.getTopologyId(),
-                 ConfigUtils.supervisorStormResourcesPath(
-                         ConfigUtils.supervisorStormDistRoot(conf, 
workerData.getTopologyId())),
-                 ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()),
-                 taskId,
-                 workerData.getPort(), workerData.getTaskIds(),
-                 workerData.getDefaultSharedResources(),
-                workerData.getUserSharedResources(),
-                 executor.getSharedExecutorData(),
-                 executor.getIntervalToTaskToMetricToRegistry(),
-                 executor.getOpenOrPrepareWasCalled());
++            ConfigUtils.supervisorStormResourcesPath(
++                    ConfigUtils.supervisorStormDistRoot(conf, 
workerData.getTopologyId())),
++            ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()),
++            taskId,
++            workerData.getPort(), workerData.getTaskIds(),
++            workerData.getDefaultSharedResources(),
++            workerData.getUserSharedResources(),
++            executor.getSharedExecutorData(),
++            executor.getIntervalToTaskToMetricToRegistry(),
++            executor.getOpenOrPrepareWasCalled());
      }
  
      private Object mkTaskObject() {

http://git-wip-us.apache.org/repos/asf/storm/blob/f7b8cda6/storm-core/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java
----------------------------------------------------------------------
diff --cc 
storm-core/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java
index 1d5f93b,0000000..94d1ce1
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/worker/LogConfigManager.java
@@@ -1,157 -1,0 +1,157 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.storm.daemon.worker;
 +
 +import org.apache.logging.log4j.Level;
 +import org.apache.logging.log4j.LogManager;
 +import org.apache.logging.log4j.core.LoggerContext;
 +import org.apache.logging.log4j.core.config.Configuration;
 +import org.apache.logging.log4j.core.config.LoggerConfig;
 +import org.apache.storm.utils.Time;
 +import org.apache.storm.generated.LogConfig;
 +import org.apache.storm.generated.LogLevel;
 +import org.apache.storm.generated.LogLevelAction;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.util.HashMap;
 +import java.util.Map;
 +import java.util.TreeMap;
 +import java.util.TreeSet;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +public class LogConfigManager {
 +
 +    private static final Logger LOG = 
LoggerFactory.getLogger(LogConfigManager.class);
 +
 +    private final AtomicReference<TreeMap<String, LogLevel>> latestLogConfig;
 +    private final Map<String, Level> originalLogLevels;
 +
 +    public LogConfigManager() {
 +        this(new AtomicReference<>(new TreeMap<>()));
 +    }
 +
 +    public LogConfigManager(AtomicReference<TreeMap<String, LogLevel>> 
latestLogConfig) {
 +        this.latestLogConfig = latestLogConfig;
 +        this.originalLogLevels = getLoggerLevels();
 +        LOG.info("Started with log levels: {}", originalLogLevels);
 +    }
 +
 +    public void processLogConfigChange(LogConfig logConfig) {
 +        if (null != logConfig) {
 +            LOG.debug("Processing received log config: {}", logConfig);
 +            TreeMap<String, LogLevel> loggers = new 
TreeMap<>(logConfig.get_named_logger_level());
-             LoggerContext logContext = (LoggerContext) 
LogManager.getContext();
++            LoggerContext logContext = (LoggerContext) 
LogManager.getContext(false);
 +            Map<String, LogLevel> newLogConfigs = new HashMap<>();
 +            for (Map.Entry<String, LogLevel> entry : loggers.entrySet()) {
 +                String msgLoggerName = entry.getKey();
 +                msgLoggerName = ("ROOT".equalsIgnoreCase(msgLoggerName)) ? 
LogManager.ROOT_LOGGER_NAME : msgLoggerName;
 +                LogLevel loggerLevel = entry.getValue();
 +                // the new-timeouts map now contains logger => timeout
 +                if (loggerLevel.is_set_reset_log_level_timeout_epoch()) {
 +                    LogLevel copy = new LogLevel(loggerLevel);
 +                    if (originalLogLevels.containsKey(msgLoggerName)) {
 +                        
copy.set_reset_log_level(originalLogLevels.get(msgLoggerName).name());
 +                    } else {
 +                        copy.set_reset_log_level(Level.INFO.name());
 +                    }
 +
 +                    //copy.unset_reset_log_level();
 +                    newLogConfigs.put(msgLoggerName, copy);
 +                }
 +
 +            }
 +
 +            // Look for deleted log timeouts
 +            TreeMap<String,LogLevel> latestConf = latestLogConfig.get();
 +            if (latestConf != null) {
 +                for (String loggerName : latestConf.descendingKeySet()) {
 +                    if (! newLogConfigs.containsKey(loggerName)) {
 +                        // if we had a timeout, but the timeout is no longer 
active
 +                        setLoggerLevel(logContext, loggerName, 
latestConf.get(loggerName).get_reset_log_level());
 +
 +                    }
 +                }
 +            }
 +
 +            // apply new log settings we just received
 +            // the merged configs are only for the reset logic
 +            for (String loggerName : new 
TreeSet<>(logConfig.get_named_logger_level().keySet())) {
 +                LogLevel logLevel = 
logConfig.get_named_logger_level().get(loggerName);
 +                loggerName = ("ROOT".equalsIgnoreCase(loggerName)) ? 
LogManager.ROOT_LOGGER_NAME : loggerName;
 +                LogLevelAction action = logLevel.get_action();
 +                if (action == LogLevelAction.UPDATE) {
 +                    setLoggerLevel(logContext, loggerName, 
logLevel.get_target_log_level());
 +                }
 +
 +            }
 +
 +            logContext.updateLoggers();
 +            latestLogConfig.set(new TreeMap<>(newLogConfigs));
 +            LOG.debug("New merged log config is {}", latestLogConfig.get());
 +        }
 +    }
 +
 +    // function called on timer to reset log levels last set to DEBUG
 +    // also called from processLogConfigChange
 +    public void resetLogLevels() {
 +        TreeMap<String, LogLevel> latestLogLevelMap = latestLogConfig.get();
 +        LoggerContext loggerContext = (LoggerContext) 
LogManager.getContext(false);
 +
 +        for (String loggerName : latestLogLevelMap.descendingKeySet()) {
 +            LogLevel loggerSetting = latestLogLevelMap.get(loggerName);
 +            long timeout = loggerSetting.get_reset_log_level_timeout_epoch();
 +            String resetLogLevel = loggerSetting.get_reset_log_level();
 +            if (timeout < Time.currentTimeMillis()) {
 +                LOG.info("{}: Resetting level to {}", loggerName, 
resetLogLevel);
 +                setLoggerLevel(loggerContext, loggerName, resetLogLevel);
 +            }
 +            latestLogConfig.getAndUpdate(input -> {
 +                TreeMap<String, LogLevel> result = new TreeMap<>(input);
 +                result.remove(loggerName);
 +                return result;
 +            });
 +        }
 +        loggerContext.updateLoggers();
 +    }
 +
 +    public Map<String, Level> getLoggerLevels() {
 +        Configuration loggerConfig = ((LoggerContext) 
LogManager.getContext(false)).getConfiguration();
 +        Map<String, Level> logLevelMap = new HashMap<>();
 +        for (Map.Entry<String, LoggerConfig> entry : 
loggerConfig.getLoggers().entrySet()) {
 +            logLevelMap.put(entry.getKey(), entry.getValue().getLevel());
 +        }
 +        return logLevelMap;
 +    }
 +
 +    public void setLoggerLevel(LoggerContext logContext, String loggerName, 
String newLevelStr) {
 +        Level newLevel = Level.getLevel(newLevelStr);
 +        Configuration configuration = logContext.getConfiguration();
 +        LoggerConfig loggerConfig = configuration.getLoggerConfig(loggerName);
 +        if (loggerConfig.getName().equalsIgnoreCase(loggerName)) {
 +            LOG.info("Setting {} log level to: {}", loggerConfig, newLevel);
 +            loggerConfig.setLevel(newLevel);
 +        } else {
 +            // create a new config. Make it additive (true) s.t. inherit 
parents appenders
 +            LoggerConfig newLoggerConfig = new LoggerConfig(loggerName, 
newLevel, true);
 +            LOG.info("Adding config for: {} with level: {}", newLoggerConfig, 
newLevel);
 +            configuration.addLogger(loggerName, newLoggerConfig);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7b8cda6/storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 15f26e0,0000000..18715f7
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@@ -1,426 -1,0 +1,426 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.storm.daemon.worker;
 +
 +import com.google.common.base.Preconditions;
 +import com.lmax.disruptor.EventHandler;
 +import org.apache.commons.io.FileUtils;
 +import org.apache.commons.lang.ObjectUtils;
 +import org.apache.storm.Config;
 +import org.apache.storm.cluster.*;
 +import org.apache.storm.daemon.DaemonCommon;
 +import org.apache.storm.daemon.Shutdownable;
 +import org.apache.storm.daemon.StormCommon;
 +import org.apache.storm.executor.Executor;
 +import org.apache.storm.executor.ExecutorShutdown;
 +import org.apache.storm.executor.IRunningExecutor;
 +import org.apache.storm.executor.LocalExecutor;
 +import org.apache.storm.generated.*;
 +import org.apache.storm.messaging.IConnection;
 +import org.apache.storm.messaging.IContext;
 +import org.apache.storm.messaging.TaskMessage;
 +import org.apache.storm.security.auth.AuthUtils;
 +import org.apache.storm.security.auth.IAutoCredentials;
 +import org.apache.storm.stats.StatsUtil;
 +import org.apache.storm.utils.*;
 +import org.apache.zookeeper.data.ACL;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
 +
 +import javax.security.auth.Subject;
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.charset.Charset;
 +import java.security.PrivilegedExceptionAction;
 +import java.util.*;
 +import java.util.concurrent.atomic.AtomicReference;
 +import java.util.function.Function;
 +import java.util.stream.Collectors;
 +
 +public class Worker implements Shutdownable, DaemonCommon {
 +
 +    private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
 +    private final Map conf;
 +    private final IContext context;
 +    private final String topologyId;
 +    private final String assignmentId;
 +    private final int port;
 +    private final String workerId;
 +    private final LogConfigManager logConfigManager;
 +
 +
 +    private WorkerState workerState;
 +    private AtomicReference<List<IRunningExecutor>> executorsAtom;
 +    private Thread transferThread;
 +    private WorkerBackpressureThread backpressureThread;
 +
 +    private AtomicReference<Credentials> credentialsAtom;
 +    private Subject subject;
 +    private Collection<IAutoCredentials> autoCreds;
 +
 +
 +    /**
 +     * TODO: should worker even take the topologyId as input? this should be
 +     * deducable from cluster state (by searching through assignments)
 +     * what about if there's inconsistency in assignments? -> but nimbus 
should guarantee this consistency
 +     *
 +     * @param conf         - Storm configuration
 +     * @param context      -
 +     * @param topologyId   - topology id
 +     * @param assignmentId - assignement id
 +     * @param port         - port on which the worker runs
 +     * @param workerId     - worker id
 +     */
 +
 +    public Worker(Map conf, IContext context, String topologyId, String 
assignmentId, int port, String workerId) {
 +        this.conf = conf;
 +        this.context = context;
 +        this.topologyId = topologyId;
 +        this.assignmentId = assignmentId;
 +        this.port = port;
 +        this.workerId = workerId;
 +        this.logConfigManager = new LogConfigManager();
 +    }
 +
 +    public void start() throws Exception {
 +        LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", 
topologyId, assignmentId, port, workerId,
 +            conf);
 +        // because in local mode, its not a separate
 +        // process. supervisor will register it in this case
 +        // if ConfigUtils.isLocalMode(conf) returns false then it is in 
distributed mode.
 +        if (!ConfigUtils.isLocalMode(conf)) {
 +            // Distributed mode
 +            SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
 +            String pid = Utils.processPid();
 +            FileUtils.touch(new File(ConfigUtils.workerPidPath(conf, 
workerId, pid)));
 +            FileUtils.writeStringToFile(new 
File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
-                 Charset.defaultCharset());
++                Charset.forName("UTF-8"));
 +        }
 +        final Map topologyConf =
 +            
ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf,
 topologyId));
 +        List<ACL> acls = Utils.getWorkerACL(topologyConf);
 +        IStateStorage stateStorage =
 +            ClusterUtils.mkStateStorage(conf, topologyConf, acls, new 
ClusterStateContext(DaemonType.WORKER));
 +        IStormClusterState stormClusterState =
 +            ClusterUtils.mkStormClusterState(stateStorage, acls, new 
ClusterStateContext());
 +        Credentials initialCredentials = 
stormClusterState.credentials(topologyId, null);
 +        autoCreds = AuthUtils.GetAutoCredentials(topologyConf);
 +        subject = AuthUtils.populateSubject(null, autoCreds, 
initialCredentials.get_creds());
 +
 +        Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
 +            @Override public Object run() throws Exception {
 +                workerState =
 +                    new WorkerState(conf, context, topologyId, assignmentId, 
port, workerId, topologyConf, stateStorage,
 +                        stormClusterState);
 +
 +                // Heartbeat here so that worker process dies if this fails
 +                // it's important that worker heartbeat to supervisor ASAP so 
that supervisor knows
 +                // that worker is running and moves on
 +                doHeartBeat();
 +
 +                executorsAtom = new AtomicReference<>(null);
 +
 +                // launch heartbeat threads immediately so that slow-loading 
tasks don't cause the worker to timeout
 +                // to the supervisor
 +                workerState.heartbeatTimer
 +                    .scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
 +                        try {
 +                            doHeartBeat();
 +                        } catch (IOException e) {
 +                            throw new RuntimeException(e);
 +                        }
 +                    });
 +
 +                workerState.executorHeartbeatTimer
 +                    .scheduleRecurring(0, (Integer) 
conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
 +                        Worker.this::doExecutorHeartbeats);
 +
 +                workerState.registerCallbacks();
 +
 +                workerState.refreshConnections(null);
 +
 +                workerState.activateWorkerWhenAllConnectionsReady();
 +
 +                workerState.refreshStormActive(null);
 +
 +                workerState.runWorkerStartHooks();
 +
 +                List<IRunningExecutor> newExecutors = new 
ArrayList<IRunningExecutor>();
 +                for (List<Long> e : workerState.getExecutors()) {
 +                    if (ConfigUtils.isLocalMode(topologyConf)) {
 +                        newExecutors.add(
 +                            LocalExecutor.mkExecutor(workerState, e, 
initialCredentials.get_creds())
 +                                .execute());
 +                    } else {
 +                        newExecutors.add(
 +                            Executor.mkExecutor(workerState, e, 
initialCredentials.get_creds())
 +                                .execute());
 +                    }
 +                }
 +                executorsAtom.set(newExecutors);
 +
 +                EventHandler<Object> tupleHandler = (packets, seqId, 
batchEnd) -> workerState
 +                    .sendTuplesToRemoteWorker((HashMap<Integer, 
ArrayList<TaskMessage>>) packets, seqId, batchEnd);
 +
 +                // This thread will publish the messages destined for remote 
tasks to remote connections
 +                transferThread = Utils.asyncLoop(() -> {
 +                    
workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
 +                    return 0L;
 +                });
 +
 +                DisruptorBackpressureCallback disruptorBackpressureHandler =
 +                    mkDisruptorBackpressureHandler(workerState);
 +                
workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
 +                workerState.transferQueue
 +                    .setEnableBackpressure((Boolean) 
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
 +                workerState.transferQueue
-                     .setHighWaterMark((Double) 
topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK));
++                    
.setHighWaterMark(Utils.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
 +                workerState.transferQueue
-                     .setLowWaterMark((Double) 
topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK));
++                    
.setLowWaterMark(Utils.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
 +
 +                WorkerBackpressureCallback backpressureCallback = 
mkBackpressureHandler();
 +                backpressureThread = new 
WorkerBackpressureThread(workerState.backpressureTrigger, workerState, 
backpressureCallback);
 +                if ((Boolean) 
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
 +                    backpressureThread.start();
 +                    stormClusterState.topologyBackpressure(topologyId, 
workerState::refreshThrottle);
 +                }
 +
 +                credentialsAtom = new 
AtomicReference<Credentials>(initialCredentials);
 +
 +                establishLogSettingCallback();
 +
 +                workerState.stormClusterState.credentials(topologyId, 
Worker.this::checkCredentialsChanged);
 +
 +                workerState.refreshCredentialsTimer.scheduleRecurring(0,
 +                    (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), 
new Runnable() {
 +                        @Override public void run() {
 +                            checkCredentialsChanged();
 +                            if ((Boolean) 
topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
 +                               checkThrottleChanged();
 +                            }
 +                        }
 +                    });
 +
 +                // The jitter allows the clients to get the data at different 
times, and avoids thundering herd
 +                if (!(Boolean) 
topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
 +                    
workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, 
workerState::refreshLoad);
 +                }
 +
 +                workerState.refreshConnectionsTimer.scheduleRecurring(0,
 +                    (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), 
workerState::refreshConnections);
 +
 +                workerState.resetLogTevelsTimer.scheduleRecurring(0,
 +                    (Integer) 
conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), 
logConfigManager::resetLogLevels);
 +
 +                workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) 
conf.get(Config.TASK_REFRESH_POLL_SECS),
 +                    workerState::refreshStormActive);
 +
 +                LOG.info("Worker has topology config {}", 
Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
 +                LOG.info("Worker {} for storm {} on {}:{}  has finished 
loading", workerId, topologyId, assignmentId, port);
 +                return this;
 +            };
 +        });
 +
 +    }
 +
 +    public void doHeartBeat() throws IOException {
 +        LocalState state = ConfigUtils.workerState(workerState.conf, 
workerState.workerId);
 +        state.setWorkerHeartBeat(new 
LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,
 +            workerState.executors.stream()
 +                .map(executor -> new ExecutorInfo(executor.get(0).intValue(), 
executor.get(1).intValue()))
 +                .collect(Collectors.toList()), workerState.port));
 +        state.cleanup(60); // this is just in case supervisor is down so that 
disk doesn't fill up.
 +        // it shouldn't take supervisor 120 seconds between listing dir and 
reading it
 +    }
 +
 +    public void doExecutorHeartbeats() {
 +        Map<List<Integer>, ExecutorStats> stats;
 +        List<IRunningExecutor> executors = this.executorsAtom.get();
 +        if (null == executors) {
 +            stats = StatsUtil.mkEmptyExecutorZkHbs(workerState.executors);
 +        } else {
 +            stats = 
StatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors
 +                .toMap((Function<IRunningExecutor, List<Long>>) 
IRunningExecutor::getExecutorId,
 +                    (Function<IRunningExecutor, ExecutorStats>) 
IRunningExecutor::renderStats)));
 +        }
 +        Map<String, Object> zkHB = 
StatsUtil.mkZkWorkerHb(workerState.topologyId, stats, 
workerState.uptime.upTime());
 +        try {
 +            workerState.stormClusterState
 +                .workerHeartbeat(workerState.topologyId, 
workerState.assignmentId, (long) workerState.port,
 +                    StatsUtil.thriftifyZkWorkerHb(zkHB));
 +        } catch (Exception ex) {
 +            LOG.error("Worker failed to write heartbeats to ZK or 
Pacemaker...will retry", ex);
 +        }
 +    }
 +
 +    public void checkCredentialsChanged() {
 +        Credentials newCreds = 
workerState.stormClusterState.credentials(topologyId, null);
 +        if (! ObjectUtils.equals(newCreds, credentialsAtom.get())) {
 +            // This does not have to be atomic, worst case we update when one 
is not needed
 +            AuthUtils.updateSubject(subject, autoCreds, (null == newCreds) ? 
null : newCreds.get_creds());
 +            for (IRunningExecutor executor : executorsAtom.get()) {
 +                executor.credenetialsChanged(newCreds);
 +            }
 +            credentialsAtom.set(newCreds);
 +        }
 +    }
 +
 +    public void checkThrottleChanged() {
 +        boolean throttleOn = 
workerState.stormClusterState.topologyBackpressure(topologyId, 
this::checkThrottleChanged);
 +        workerState.throttleOn.set(throttleOn);
 +    }
 +
 +    public void checkLogConfigChanged() {
 +        LogConfig logConfig = 
workerState.stormClusterState.topologyLogConfig(topologyId, null);
 +        logConfigManager.processLogConfigChange(logConfig);
 +        establishLogSettingCallback();
 +    }
 +
 +    public void establishLogSettingCallback() {
 +        workerState.stormClusterState.topologyLogConfig(topologyId, 
this::checkLogConfigChanged);
 +    }
 +
 +
 +    /**
 +     * make a handler for the worker's send disruptor queue to
 +     * check highWaterMark and lowWaterMark for backpressure
 +     */
 +    private DisruptorBackpressureCallback 
mkDisruptorBackpressureHandler(WorkerState workerState) {
 +        return new DisruptorBackpressureCallback() {
 +            @Override public void highWaterMark() throws Exception {
 +                workerState.transferBackpressure.set(true);
 +                
WorkerBackpressureThread.notifyBackpressureChecker(workerState.backpressureTrigger);
 +            }
 +
 +            @Override public void lowWaterMark() throws Exception {
 +                workerState.transferBackpressure.set(false);
 +                
WorkerBackpressureThread.notifyBackpressureChecker(workerState.backpressureTrigger);
 +            }
 +        };
 +    }
 +
 +    /**
 +     * make a handler that checks and updates worker's backpressure flag
 +     */
 +    private WorkerBackpressureCallback mkBackpressureHandler() {
 +        final List<IRunningExecutor> executors = executorsAtom.get();
 +        return new WorkerBackpressureCallback() {
 +            @Override public void onEvent(Object obj) {
 +                String topologyId = workerState.topologyId;
 +                String assignmentId = workerState.assignmentId;
 +                int port = workerState.port;
 +                IStormClusterState stormClusterState = 
workerState.stormClusterState;
 +                boolean prevBackpressureFlag = workerState.backpressure.get();
 +                boolean currBackpressureFlag = prevBackpressureFlag;
 +                if (null != executors) {
 +                    currBackpressureFlag = 
workerState.transferQueue.getThrottleOn() || (executors.stream()
 +                        
.map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || 
op2)).get());
 +                }
 +
 +                if (currBackpressureFlag != prevBackpressureFlag) {
 +                    try {
 +                        LOG.debug("worker backpressure flag changing from {} 
to {}", prevBackpressureFlag, currBackpressureFlag);
 +                        stormClusterState.workerBackpressure(topologyId, 
assignmentId, (long) port, currBackpressureFlag);
 +                    } catch (Exception ex) {
 +                        LOG.error("workerBackpressure update failed when 
connecting to ZK ... will retry", ex);
 +                    }
 +                }
 +            }
 +        };
 +    }
 +
 +    @Override public void shutdown() {
 +        try {
 +            LOG.info("Shutting down worker {} {} {}", topologyId, 
assignmentId, port);
 +
 +            for (IConnection socket : 
workerState.cachedNodeToPortSocket.get().values()) {
 +                //this will do best effort flushing since the linger period
 +                // was set on creation
 +                socket.close();
 +            }
 +            LOG.info("Terminating messaging context");
 +            LOG.info("Shutting down executors");
 +            for (IRunningExecutor executor : executorsAtom.get()) {
 +                ((ExecutorShutdown) executor).shutdown();
 +            }
 +            LOG.info("Shut down executors");
 +
 +            // this is fine because the only time this is shared is when it's 
a local context,
 +            // in which case it's a noop
 +            workerState.mqContext.term();
 +            LOG.info("Shutting down transfer thread");
 +            workerState.transferQueue.haltWithInterrupt();
 +
 +            transferThread.interrupt();
 +            transferThread.join();
 +            LOG.info("Shut down transfer thread");
 +
 +            backpressureThread.terminate();
 +            LOG.info("Shut down backpressure thread");
 +
 +            workerState.heartbeatTimer.close();
 +            workerState.refreshConnectionsTimer.close();
 +            workerState.refreshCredentialsTimer.close();
 +            workerState.refreshActiveTimer.close();
 +            workerState.executorHeartbeatTimer.close();
 +            workerState.userTimer.close();
 +            workerState.refreshLoadTimer.close();
 +            workerState.resetLogTevelsTimer.close();
 +            workerState.closeResources();
 +
 +            LOG.info("Trigger any worker shutdown hooks");
 +            workerState.runWorkerShutdownHooks();
 +
 +            workerState.stormClusterState.removeWorkerHeartbeat(topologyId, 
assignmentId, (long) port);
 +            
workerState.stormClusterState.removeWorkerBackpressure(topologyId, 
assignmentId, (long) port);
 +            LOG.info("Disconnecting from storm cluster state context");
 +            workerState.stormClusterState.disconnect();
 +            workerState.stateStorage.close();
 +            LOG.info("Shut down worker {} {} {}", topologyId, assignmentId, 
port);
 +        } catch (Exception ex) {
 +            throw Utils.wrapInRuntime(ex);
 +        }
 +
 +    }
 +
 +    @Override public boolean isWaiting() {
 +        return workerState.heartbeatTimer.isTimerWaiting()
 +            && workerState.refreshConnectionsTimer.isTimerWaiting()
 +            && workerState.refreshLoadTimer.isTimerWaiting()
 +            && workerState.refreshCredentialsTimer.isTimerWaiting()
 +            && workerState.refreshActiveTimer.isTimerWaiting()
 +            && workerState.executorHeartbeatTimer.isTimerWaiting()
 +            && workerState.userTimer.isTimerWaiting();
 +    }
 +
 +    public static void main(String[] args) throws Exception {
 +        Preconditions.checkArgument(args.length == 4, "Illegal number of 
arguemtns. Expected: 4, Actual: " + args.length);
 +        String stormId = args[0];
 +        String assignmentId = args[1];
 +        String portStr = args[2];
 +        String workerId = args[3];
 +        Map conf = Utils.readStormConfig();
 +        Utils.setupDefaultUncaughtExceptionHandler();
 +        StormCommon.validateDistributedMode(conf);
 +        Worker worker = new Worker(conf, null, stormId, assignmentId, 
Integer.parseInt(portStr), workerId);
 +        worker.start();
 +        Utils.addShutdownHookWithForceKillIn1Sec(worker::shutdown);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/f7b8cda6/storm-core/test/clj/org/apache/storm/worker_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/worker_test.clj
index 3848b16,6b6fede..0000000
deleted file mode 100644,100644
--- a/storm-core/test/clj/org/apache/storm/worker_test.clj
+++ /dev/null
@@@ -1,34 -1,205 +1,0 @@@
--;; Licensed to the Apache Software Foundation (ASF) under one
--;; or more contributor license agreements.  See the NOTICE file
--;; distributed with this work for additional information
--;; regarding copyright ownership.  The ASF licenses this file
--;; to you under the Apache License, Version 2.0 (the
--;; "License"); you may not use this file except in compliance
--;; with the License.  You may obtain a copy of the License at
--;;
--;; http://www.apache.org/licenses/LICENSE-2.0
--;;
--;; Unless required by applicable law or agreed to in writing, software
--;; distributed under the License is distributed on an "AS IS" BASIS,
--;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--;; See the License for the specific language governing permissions and
--;; limitations under the License.
--(ns org.apache.storm.worker-test
--  (:use [clojure test])
-   (:use [org.apache.storm testing])
-   (:import [org.apache.storm.messaging ConnectionWithStatus 
ConnectionWithStatus$Status])
-   (:import [org.mockito Mockito]
-            (org.apache.storm.daemon.worker WorkerState))
 -  (:require [org.apache.storm.daemon [worker :as worker]])
 -  (:require [conjure.core])
 -  (:require [clj-time.core :as time])
 -  (:require [clj-time.coerce :as coerce])
 -  (:import [org.apache.storm.generated LogConfig LogLevel LogLevelAction])
 -  (:import [org.apache.logging.log4j Level LogManager])
 -  (:import [org.slf4j Logger])
 -  (:use [conjure core])
 -  (:use [org.apache.storm testing log])
 -  (:use [org.apache.storm.daemon common])
 -  (:use [clojure.string :only [join]])
 -  (:import [org.apache.storm.messaging TaskMessage IContext IConnection 
ConnectionWithStatus ConnectionWithStatus$Status])
 -  (:import [org.mockito Mockito])
--  )
 -
 -
 -(deftest test-log-reset-should-not-trigger-for-future-time
 -  (with-local-cluster [cluster]
 -    (let [worker (:worker cluster)
 -          present (time/now)
 -          the-future (coerce/to-long (time/plus present (time/secs 1)))
 -          mock-config {"foo" {:timeout the-future}}
 -          mock-config-atom (atom mock-config)]
 -      (stubbing [time/now present]
 -        (worker/reset-log-levels mock-config-atom)
 -        ;; if the worker doesn't reset log levels, the atom should not be nil
 -        (is (not(= @mock-config-atom nil)))))))
 -
 -(deftest test-log-reset-triggers-for-past-time
 -  (with-local-cluster [cluster]
 -    (let [worker (:worker cluster)
 -          present (time/now)
 -          past (time/plus present (time/secs -1))
 -          mock-config {"foo" { :timeout (coerce/to-long past)
 -                               :target-log-level Level/INFO
 -                               :reset-log-level Level/WARN}}
 -          mock-config-atom (atom mock-config)]
 -      (stubbing [time/now present]
 -        (worker/reset-log-levels mock-config-atom)
 -        ;; the logger config is removed from atom
 -        (is (= @mock-config-atom {}))))))
 -
 -(deftest test-log-reset-resets-does-nothing-for-empty-log-config
 -  (with-local-cluster [cluster]
 -    (let [worker (:worker cluster)
 -          present (time/now)
 -          past (coerce/to-long (time/plus present (time/secs -1)))
 -          mock-config {}
 -          mock-config-atom (atom mock-config)]
 -      (stubbing [worker/set-logger-level nil
 -                 time/now present]
 -        (worker/reset-log-levels mock-config-atom)
 -        ;; if the worker resets log level, the atom is nil'ed out
 -        (is (= @mock-config-atom {}))
 -        ;; test that the set-logger-level function was not called
 -        (verify-call-times-for worker/set-logger-level 0)))))
 -
 -(deftest test-log-reset-resets-root-logger-if-set
 -  (with-local-cluster [cluster]
 -    (let [worker (:worker cluster)
 -          present (time/now)
 -          past (coerce/to-long (time/plus present (time/secs -1)))
 -          mock-config {LogManager/ROOT_LOGGER_NAME  {:timeout past
 -                                                     :target-log-level 
Level/DEBUG
 -                                                     :reset-log-level 
Level/WARN}}
 -          mock-config-atom (atom mock-config)]
 -      (stubbing [worker/set-logger-level nil
 -                 time/now present]
 -        (worker/reset-log-levels mock-config-atom)
 -        ;; if the worker resets log level, the atom is reset to {}
 -        (is (= @mock-config-atom {}))
 -        ;; ensure we reset back to WARN level
 -        (verify-call-times-for worker/set-logger-level 1)
 -        (verify-first-call-args-for-indices worker/set-logger-level [1 2] 
LogManager/ROOT_LOGGER_NAME Level/WARN)))))
 -
 -;;This should be removed when it goes into conjure
 -(defmacro verify-nth-call-args-for-indices
 -  "Asserts that the function was called at least once, and the nth call was
 -   passed the args specified, into the indices of the arglist specified. In
 -   other words, it checks only the particular args you care about."
 -  [n fn-name indices & args]
 -  `(do
 -     (assert-in-fake-context "verify-first-call-args-for-indices")
 -     (assert-conjurified-fn "verify-first-call-args-for-indices" ~fn-name)
 -     (is (< ~n (count (get @call-times ~fn-name)))
 -         (str "(verify-nth-call-args-for-indices " ~n " " ~fn-name " " 
~indices " " ~(join " " args) ")"))
 -     (let [nth-call-args# (nth (get @call-times ~fn-name) ~n)
 -           indices-in-range?# (< (apply max ~indices) (count nth-call-args#))]
 -       (if indices-in-range?#
 -         (is (= ~(vec args) (map #(nth nth-call-args# %) ~indices))
 -             (str "(verify-first-call-args-for-indices " ~n " " ~fn-name " " 
~indices " " ~(join " " args) ")"))
 -         (is (= :fail (format "indices %s are out of range for the args, %s" 
~indices ~(vec args)))
 -             (str "(verify-first-call-args-for-indices " ~n " " ~fn-name " " 
~indices " " ~(join " " args) ")"))))))
 -
 -(deftest test-log-resets-named-loggers-with-past-timeout
 -  (with-local-cluster [cluster]
 -    (let [worker (:worker cluster)
 -          present (time/now)
 -          past (coerce/to-long (time/plus present (time/secs -1)))
 -          mock-config {"my_debug_logger" {:timeout past
 -                                          :target-log-level Level/DEBUG
 -                                          :reset-log-level Level/INFO} 
 -                       "my_info_logger" {:timeout past
 -                                         :target-log-level Level/INFO
 -                                         :reset-log-level Level/WARN}
 -                       "my_error_logger" {:timeout past
 -                                          :target-log-level Level/ERROR
 -                                          :reset-log-level Level/INFO}}
 -          result (atom {})
 -          mock-config-atom (atom mock-config)]
 -      (stubbing [worker/set-logger-level nil
 -                 time/now present]
 -          (worker/reset-log-levels mock-config-atom)
 -          ;; if the worker resets log level, the atom is reset to {}
 -          (is (= @mock-config-atom {}))
 -          (verify-call-times-for worker/set-logger-level 3)
 -          (verify-nth-call-args-for-indices 0 worker/set-logger-level [1 2] 
"my_debug_logger" Level/INFO)
 -          (verify-nth-call-args-for-indices 1 worker/set-logger-level [1 2] 
"my_error_logger" Level/INFO)
 -          (verify-nth-call-args-for-indices 2 worker/set-logger-level [1 2] 
"my_info_logger" Level/WARN)))))
 -
 -(deftest test-process-root-log-level-to-debug-sets-logger-and-timeout-2
 -  (with-local-cluster [cluster]
 -    (let [worker (:worker cluster)
 -          mock-config (LogConfig.)
 -          root-level (LogLevel.)
 -          mock-config-atom (atom nil)
 -          orig-levels (atom {})
 -          present (time/now)
 -          in-thirty-seconds (coerce/to-long (time/plus present (time/secs 
30)))]
 -      ;; configure the root logger to be debug
 -      (.set_reset_log_level_timeout_epoch root-level in-thirty-seconds)
 -      (.set_target_log_level root-level "DEBUG")
 -      (.set_action root-level LogLevelAction/UPDATE)
 -      (.put_to_named_logger_level mock-config "ROOT" root-level)
 -      (stubbing [worker/set-logger-level nil
 -                 time/now present]
 -          (worker/process-log-config-change mock-config-atom orig-levels 
mock-config)
 -          ;; test that the set-logger-level function was not called
 -          (log-message "Tests " @mock-config-atom)
 -          (verify-call-times-for worker/set-logger-level 1)
 -          (verify-nth-call-args-for-indices 0 worker/set-logger-level [1 2] 
"" Level/DEBUG)
 -          (let [root-result (get @mock-config-atom 
LogManager/ROOT_LOGGER_NAME)]
 -            (is (= (:action root-result) LogLevelAction/UPDATE))
 -            (is (= (:target-log-level root-result) Level/DEBUG))
 -            ;; defaults to INFO level when the logger isn't found previously
 -            (is (= (:reset-log-level root-result) Level/INFO))
 -            (is (= (:timeout root-result) in-thirty-seconds)))))))
 -
 -(deftest test-process-root-log-level-to-debug-sets-logger-and-timeout
 -  (with-local-cluster [cluster]
 -    (let [worker (:worker cluster)
 -          mock-config (LogConfig.)
 -          root-level (LogLevel.)
 -          orig-levels (atom {})
 -          present (time/now)
 -          in-thirty-seconds (coerce/to-long (time/plus present (time/secs 
30)))
 -          mock-config-atom (atom {})]
 -      ;; configure the root logger to be debug
 -      (doseq [named {"ROOT" "DEBUG"
 -                     "my_debug_logger" "DEBUG"
 -                     "my_info_logger" "INFO"
 -                     "my_error_logger" "ERROR"}]
 -        (let [level (LogLevel.)]
 -          (.set_action level LogLevelAction/UPDATE)
 -          (.set_reset_log_level_timeout_epoch level in-thirty-seconds)
 -          (.set_target_log_level level (val named))
 -          (.put_to_named_logger_level mock-config (key named) level)))
 -      (log-message "Tests " mock-config)
 -      (stubbing [worker/set-logger-level nil
 -                 time/now present]
 -          (worker/process-log-config-change mock-config-atom orig-levels 
mock-config)
 -          (verify-call-times-for worker/set-logger-level 4)
 -          (verify-nth-call-args-for-indices 0 worker/set-logger-level [1 2] 
"" Level/DEBUG)
 -          (verify-nth-call-args-for-indices 1 worker/set-logger-level [1 2] 
"my_debug_logger" Level/DEBUG)
 -          (verify-nth-call-args-for-indices 2 worker/set-logger-level [1 2] 
"my_error_logger" Level/ERROR)
 -          (verify-nth-call-args-for-indices 3 worker/set-logger-level [1 2] 
"my_info_logger" Level/INFO)))))
--
--(deftest test-worker-is-connection-ready
--  (let [connection (Mockito/mock ConnectionWithStatus)]
--    (. (Mockito/when (.status connection)) thenReturn 
ConnectionWithStatus$Status/Ready)
-     (is (= true (WorkerState/isConnectionReady connection)))
 -    (is (= true (worker/is-connection-ready connection)))
--
--    (. (Mockito/when (.status connection)) thenReturn 
ConnectionWithStatus$Status/Connecting)
-     (is (= false (WorkerState/isConnectionReady connection)))
 -    (is (= false (worker/is-connection-ready connection)))
--
--    (. (Mockito/when (.status connection)) thenReturn 
ConnectionWithStatus$Status/Closed)
-     (is (= false (WorkerState/isConnectionReady connection)))
 -    (is (= false (worker/is-connection-ready connection)))
--  ))

http://git-wip-us.apache.org/repos/asf/storm/blob/f7b8cda6/storm-core/test/jvm/org/apache/storm/daemon/worker/WorkerTest.java
----------------------------------------------------------------------
diff --cc storm-core/test/jvm/org/apache/storm/daemon/worker/WorkerTest.java
index 0000000,0000000..1414bb0
new file mode 100644
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/daemon/worker/WorkerTest.java
@@@ -1,0 -1,0 +1,39 @@@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ * <p>
++ * http://www.apache.org/licenses/LICENSE-2.0
++ * <p>
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.storm.daemon.worker;
++import org.apache.storm.messaging.ConnectionWithStatus;
++import org.junit.Test;
++import org.mockito.Mockito;
++
++import static org.junit.Assert.*;
++
++public class WorkerTest {
++    @Test
++    public void testWorkerIsConnectionReady() {
++        ConnectionWithStatus connection = 
Mockito.mock(ConnectionWithStatus.class);
++        
Mockito.when(connection.status()).thenReturn(ConnectionWithStatus.Status.Ready);
++        assertTrue(WorkerState.isConnectionReady(connection));
++
++        
Mockito.when(connection.status()).thenReturn(ConnectionWithStatus.Status.Connecting);
++        assertFalse(WorkerState.isConnectionReady(connection));
++
++        
Mockito.when(connection.status()).thenReturn(ConnectionWithStatus.Status.Closed);
++        assertFalse(WorkerState.isConnectionReady(connection));
++    }
++}

Reply via email to