Repository: storm
Updated Branches:
  refs/heads/master c8fab2065 -> a323987bb


STORM-2939 add WorkerMetricsProcessor interface


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1eb355ef
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1eb355ef
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1eb355ef

Branch: refs/heads/master
Commit: 1eb355ef7825740686e2294238c51f3495cbeb88
Parents: 5f05739
Author: Aaron Gresch <agre...@yahoo-inc.com>
Authored: Thu Feb 8 16:09:19 2018 -0600
Committer: Aaron Gresch <agre...@yahoo-inc.com>
Committed: Thu Feb 8 16:09:19 2018 -0600

----------------------------------------------------------------------
 conf/defaults.yaml                              |  1 +
 docs/storm-metricstore.md                       |  3 ++
 .../java/org/apache/storm/DaemonConfig.java     |  8 +++-
 .../storm/daemon/supervisor/Container.java      | 10 +++--
 .../daemon/supervisor/ReadClusterState.java     | 13 ++++++-
 .../apache/storm/daemon/supervisor/Slot.java    | 13 +++++--
 .../storm/metricstore/MetricStoreConfig.java    | 18 +++++++++
 .../metricstore/NimbusMetricProcessor.java      | 41 ++++++++++++++++++++
 .../metricstore/WorkerMetricsProcessor.java     | 40 +++++++++++++++++++
 .../storm/daemon/supervisor/SlotTest.java       | 14 +++----
 10 files changed, 144 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index e15a265..fa8af24 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -296,6 +296,7 @@ storm.daemon.metrics.reporter.plugins:
      - "org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter"
 
 storm.metricstore.class: "org.apache.storm.metricstore.rocksdb.RocksDbStore"
+storm.metricprocessor.class: 
"org.apache.storm.metricstore.NimbusMetricProcessor"
 storm.metricstore.rocksdb.location: "storm_rocks"
 storm.metricstore.rocksdb.create_if_missing: true
 storm.metricstore.rocksdb.metadata_string_cache_capacity: 4000

http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/docs/storm-metricstore.md
----------------------------------------------------------------------
diff --git a/docs/storm-metricstore.md b/docs/storm-metricstore.md
index 4111322..cafc2b5 100644
--- a/docs/storm-metricstore.md
+++ b/docs/storm-metricstore.md
@@ -23,6 +23,7 @@ The following configuation options exist:
 
 ```yaml
 storm.metricstore.class: "org.apache.storm.metricstore.rocksdb.RocksDbStore"
+storm.metricprocessor.class: 
"org.apache.storm.metricstore.NimbusMetricProcessor"
 storm.metricstore.rocksdb.location: "storm_rocks"
 storm.metricstore.rocksdb.create_if_missing: true
 storm.metricstore.rocksdb.metadata_string_cache_capacity: 4000
@@ -31,6 +32,8 @@ storm.metricstore.rocksdb.retention_hours: 240
 
 * storm.metricstore.class is the class that implements the 
 
([`MetricStore`]({{page.git-blob-base}}/storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java)).
+* storm.metricprocessor.class is the class that implements the 
+([`WorkerMetricsProcessor`]({{page.git-blob-base}}/storm-server/src/main/java/org/apache/storm/metricstore/WorkerMetricsProcessor.java)).
 * storm.metricstore.rocksdb.location provides to location of the RocksDB 
database on Nimbus
 * storm.metricstore.rocksdb.create_if_missing permits creating a RocksDB 
database if missing
 * storm.metricstore.rocksdb.metadata_string_cache_capacity controls the number 
of metadata strings cached in memory.

http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java 
b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index 3230f70..8e88bef 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -33,7 +33,6 @@ import static 
org.apache.storm.validation.ConfigValidationAnnotations.isNoDuplic
 import static 
org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryCustom;
 
 import org.apache.storm.container.ResourceIsolationInterface;
-import org.apache.storm.metricstore.MetricStore;
 import org.apache.storm.nimbus.ITopologyActionNotifierPlugin;
 import org.apache.storm.scheduler.blacklist.reporters.IReporter;
 import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
@@ -1041,6 +1040,13 @@ public class DaemonConfig implements Validated {
     public static final String STORM_METRIC_STORE_CLASS = 
"storm.metricstore.class";
 
     /**
+     * Class implementing WorkerMetricsProcessor.
+     */
+    @NotNull
+    @isString
+    public static final String STORM_METRIC_PROCESSOR_CLASS = 
"storm.metricprocessor.class";
+
+    /**
      * RocksDB file location. This setting is specific to the 
org.apache.storm.metricstore.rocksdb.RocksDbStore
      * implementation for the storm.metricstore.class.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
index a06e44c..9958f1b 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java
@@ -44,6 +44,8 @@ import org.apache.storm.generated.WorkerMetricPoint;
 import org.apache.storm.generated.WorkerMetricList;
 import org.apache.storm.generated.WorkerMetrics;
 import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.metricstore.MetricException;
+import org.apache.storm.metricstore.WorkerMetricsProcessor;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.NimbusClient;
@@ -710,7 +712,7 @@ public abstract class Container implements Killable {
     /**
      * Send worker metrics to Nimbus.
      */
-    void processMetrics(OnlyLatestExecutor<Integer> exec) {
+    void processMetrics(OnlyLatestExecutor<Integer> exec, 
WorkerMetricsProcessor processor) {
         try {
             if (_usedMemory.get(_port) != null) {
                 // Make sure we don't process too frequently.
@@ -733,9 +735,9 @@ public abstract class Container implements Killable {
                 WorkerMetrics metrics = new WorkerMetrics(_topologyId, _port, 
hostname, metricList);
 
                 exec.execute(_port, () -> {
-                    try (NimbusClient client = 
NimbusClient.getConfiguredClient(_conf)) {
-                        client.getClient().processWorkerMetrics(metrics);
-                    } catch (Exception e) {
+                    try {
+                        processor.processWorkerMetrics(_conf, metrics);
+                    } catch (MetricException e) {
                         LOG.error("Failed to process metrics", e);
                     }
                 });

http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
index 5d8bb33..23b8b64 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java
@@ -42,6 +42,8 @@ import org.apache.storm.generated.NodeInfo;
 import org.apache.storm.generated.ProfileRequest;
 import org.apache.storm.generated.WorkerResources;
 import org.apache.storm.localizer.AsyncLocalizer;
+import org.apache.storm.metricstore.MetricStoreConfig;
+import org.apache.storm.metricstore.WorkerMetricsProcessor;
 import org.apache.storm.scheduler.ISupervisor;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.Time;
@@ -66,6 +68,7 @@ public class ReadClusterState implements Runnable, 
AutoCloseable {
     private final LocalState localState;
     private final AtomicReference<Map<Long, LocalAssignment>> 
cachedAssignments;
     private final OnlyLatestExecutor<Integer> metricsExec;
+    private WorkerMetricsProcessor metricsProcessor;
 
     public ReadClusterState(Supervisor supervisor) throws Exception {
         this.superConf = supervisor.getConf();
@@ -81,6 +84,14 @@ public class ReadClusterState implements Runnable, 
AutoCloseable {
         this.metricsExec = new 
OnlyLatestExecutor<>(supervisor.getHeartbeatExecutor());
         
         this.launcher = ContainerLauncher.make(superConf, assignmentId, 
supervisor.getSharedContext());
+
+        this.metricsProcessor = null;
+        try {
+            this.metricsProcessor = 
MetricStoreConfig.configureMetricProcessor(superConf);
+        } catch (Exception e) {
+            // the metrics processor is not critical to the operation of the 
cluster, allow Supervisor to come up
+            LOG.error("Failed to initialize metric processor", e);
+        }
         
         @SuppressWarnings("unchecked")
         List<Number> ports = 
(List<Number>)superConf.get(DaemonConfig.SUPERVISOR_SLOTS_PORTS);
@@ -110,7 +121,7 @@ public class ReadClusterState implements Runnable, 
AutoCloseable {
 
     private Slot mkSlot(int port) throws Exception {
         return new Slot(localizer, superConf, launcher, host, port,
-                localState, stormClusterState, iSuper, cachedAssignments, 
metricsExec);
+                localState, stormClusterState, iSuper, cachedAssignments, 
metricsExec, metricsProcessor);
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index 6700291..5a629ad 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -49,6 +49,7 @@ import org.apache.storm.localizer.BlobChangingCallback;
 import org.apache.storm.localizer.GoodToGo;
 import org.apache.storm.localizer.LocallyCachedBlob;
 import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.metricstore.WorkerMetricsProcessor;
 import org.apache.storm.scheduler.ISupervisor;
 import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.ObjectReader;
@@ -96,13 +97,15 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
         public final LocalState localState;
         public final BlobChangingCallback changingCallback;
         public final OnlyLatestExecutor<Integer> metricsExec;
+        public final WorkerMetricsProcessor metricsProcessor;
 
         StaticState(AsyncLocalizer localizer, long hbTimeoutMs, long 
firstHbTimeoutMs,
                     long killSleepMs, long monitorFreqMs,
                     ContainerLauncher containerLauncher, String host, int port,
                     ISupervisor iSupervisor, LocalState localState,
                     BlobChangingCallback changingCallback,
-                    OnlyLatestExecutor<Integer> metricsExec) {
+                    OnlyLatestExecutor<Integer> metricsExec,
+                    WorkerMetricsProcessor metricsProcessor) {
             this.localizer = localizer;
             this.hbTimeoutMs = hbTimeoutMs;
             this.firstHbTimeoutMs = firstHbTimeoutMs;
@@ -115,6 +118,7 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
             this.localState = localState;
             this.changingCallback = changingCallback;
             this.metricsExec = metricsExec;
+            this.metricsProcessor = metricsProcessor;
         }
     }
 
@@ -940,7 +944,7 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
             dynamicState = dynamicState.withProfileActions(mod, modPending);
         }
 
-        dynamicState.container.processMetrics(staticState.metricsExec);
+        dynamicState.container.processMetrics(staticState.metricsExec, 
staticState.metricsProcessor);
 
         Time.sleep(staticState.monitorFreqMs);
         return dynamicState;
@@ -982,7 +986,8 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
                 IStormClusterState clusterState,
                 ISupervisor iSupervisor,
                 AtomicReference<Map<Long, LocalAssignment>> 
cachedCurrentAssignments,
-                OnlyLatestExecutor<Integer> metricsExec) throws Exception {
+                OnlyLatestExecutor<Integer> metricsExec,
+                WorkerMetricsProcessor metricsProcessor) throws Exception {
         super("SLOT_"+port);
         this.metricsExec = metricsExec;
 
@@ -1031,7 +1036,7 @@ public class Slot extends Thread implements 
AutoCloseable, BlobChangingCallback
             iSupervisor,
             localState,
             this,
-            metricsExec);
+            metricsExec, metricsProcessor);
         this.newAssignment.set(dynamicState.newAssignment);
         if (MachineState.RUNNING == dynamicState.state) {
             //We are running so we should recover the blobs.

http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java
 
b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java
index 2f2ad76..96bd9e4 100644
--- 
a/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java
+++ 
b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java
@@ -41,5 +41,23 @@ public class MetricStoreConfig {
             throw new MetricException("Failed to create metric store", t);
         }
     }
+
+    /**
+     * Configures metric processor to use the class specified in the conf.
+     * @param conf Storm config map
+     * @return WorkerMetricsProcessor prepared processor
+     * @throws MetricException  on misconfiguration
+     */
+    public static WorkerMetricsProcessor configureMetricProcessor(Map conf) 
throws MetricException {
+
+        try {
+            String processorClass = 
(String)conf.get(DaemonConfig.STORM_METRIC_PROCESSOR_CLASS);
+            WorkerMetricsProcessor processor = (WorkerMetricsProcessor) 
(Class.forName(processorClass)).newInstance();
+            processor.prepare(conf);
+            return processor;
+        } catch (Throwable t) {
+            throw new MetricException("Failed to create metric processor", t);
+        }
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/storm-server/src/main/java/org/apache/storm/metricstore/NimbusMetricProcessor.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/metricstore/NimbusMetricProcessor.java
 
b/storm-server/src/main/java/org/apache/storm/metricstore/NimbusMetricProcessor.java
new file mode 100644
index 0000000..64688c9
--- /dev/null
+++ 
b/storm-server/src/main/java/org/apache/storm/metricstore/NimbusMetricProcessor.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore;
+
+import java.util.Map;
+import org.apache.storm.generated.WorkerMetrics;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.thrift.TException;
+
+/**
+ * Implementation of WorkerMetricsProcessor that sends metric data to Nimbus 
for processing.
+ */
+public class NimbusMetricProcessor implements  WorkerMetricsProcessor {
+    @Override
+    public void processWorkerMetrics(Map<String, Object> conf, WorkerMetrics 
metrics) throws MetricException {
+        try (NimbusClient client = NimbusClient.getConfiguredClient(conf)) {
+            client.getClient().processWorkerMetrics(metrics);
+        } catch (TException e) {
+            throw new MetricException("Failed to process metrics", e);
+        }
+    }
+
+    @Override
+    public void prepare(Map config) throws MetricException {}
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/storm-server/src/main/java/org/apache/storm/metricstore/WorkerMetricsProcessor.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/metricstore/WorkerMetricsProcessor.java
 
b/storm-server/src/main/java/org/apache/storm/metricstore/WorkerMetricsProcessor.java
new file mode 100644
index 0000000..72e0ac0
--- /dev/null
+++ 
b/storm-server/src/main/java/org/apache/storm/metricstore/WorkerMetricsProcessor.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore;
+
+import java.util.Map;
+import org.apache.storm.generated.WorkerMetrics;
+
+public interface WorkerMetricsProcessor {
+
+    /**
+     * Process insertion of worker metrics.
+     * @param conf Storm config map
+     * @param metrics  the metrics to process
+     * @throws MetricException  on error
+     */
+    void processWorkerMetrics(Map<String, Object> conf, WorkerMetrics metrics) 
throws MetricException;
+
+    /**
+     * Prepares the metric processor.
+     * @param config Storm config map
+     * @throws MetricException  on error
+     */
+    void prepare(Map config) throws MetricException;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java 
b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
index 08a5266..2867e6a 100644
--- 
a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
+++ 
b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -149,7 +149,7 @@ public class SlotTest {
             ContainerLauncher containerLauncher = 
mock(ContainerLauncher.class);
             ISupervisor iSuper = mock(ISupervisor.class);
             StaticState staticState = new StaticState(localizer, 1000, 1000, 
1000, 1000,
-                    containerLauncher, "localhost", 8080, iSuper, state, cb, 
null);
+                    containerLauncher, "localhost", 8080, iSuper, state, cb, 
null, null);
             DynamicState dynamicState = new DynamicState(null, null, null);
             DynamicState nextState = Slot.handleEmpty(dynamicState, 
staticState);
             assertEquals(MachineState.EMPTY, nextState.state);
@@ -181,7 +181,7 @@ public class SlotTest {
             
             ISupervisor iSuper = mock(ISupervisor.class);
             StaticState staticState = new StaticState(localizer, 5000, 120000, 
1000, 1000,
-                    containerLauncher, "localhost", port, iSuper, state, cb, 
null);
+                    containerLauncher, "localhost", port, iSuper, state, cb, 
null, null);
             DynamicState dynamicState = new DynamicState(null, null, null)
                     .withNewAssignment(newAssignment);
 
@@ -250,7 +250,7 @@ public class SlotTest {
             ISupervisor iSuper = mock(ISupervisor.class);
             LocalState state = mock(LocalState.class);
             StaticState staticState = new StaticState(localizer, 5000, 120000, 
1000, 1000,
-                    containerLauncher, "localhost", port, iSuper, state, cb, 
null);
+                    containerLauncher, "localhost", port, iSuper, state, cb, 
null, null);
             DynamicState dynamicState = new DynamicState(assignment, 
container, assignment);
             
             DynamicState nextState = Slot.stateMachineStep(dynamicState, 
staticState);
@@ -311,7 +311,7 @@ public class SlotTest {
             
             ISupervisor iSuper = mock(ISupervisor.class);
             StaticState staticState = new StaticState(localizer, 5000, 120000, 
1000, 1000,
-                    containerLauncher, "localhost", port, iSuper, state, cb, 
null);
+                    containerLauncher, "localhost", port, iSuper, state, cb, 
null, null);
             DynamicState dynamicState = new DynamicState(cAssignment, 
cContainer, nAssignment);
             
             DynamicState nextState = Slot.stateMachineStep(dynamicState, 
staticState);
@@ -392,7 +392,7 @@ public class SlotTest {
             ISupervisor iSuper = mock(ISupervisor.class);
             LocalState state = mock(LocalState.class);
             StaticState staticState = new StaticState(localizer, 5000, 120000, 
1000, 1000,
-                    containerLauncher, "localhost", port, iSuper, state, cb, 
null);
+                    containerLauncher, "localhost", port, iSuper, state, cb, 
null, null);
             DynamicState dynamicState = new DynamicState(cAssignment, 
cContainer, null);
             
             DynamicState nextState = Slot.stateMachineStep(dynamicState, 
staticState);
@@ -453,7 +453,7 @@ public class SlotTest {
             ISupervisor iSuper = mock(ISupervisor.class);
             LocalState state = mock(LocalState.class);
             StaticState staticState = new StaticState(localizer, 5000, 120000, 
1000, 1000,
-                    containerLauncher, "localhost", port, iSuper, state, cb, 
null);
+                    containerLauncher, "localhost", port, iSuper, state, cb, 
null, null);
             Set<TopoProfileAction> profileActions = new HashSet<>();
             ProfileRequest request = new ProfileRequest();
             request.set_action(ProfileAction.JPROFILE_STOP);
@@ -531,7 +531,7 @@ public class SlotTest {
             ISupervisor iSuper = mock(ISupervisor.class);
             long heartbeatTimeoutMs = 5000;
             StaticState staticState = new StaticState(localizer, 
heartbeatTimeoutMs, 120_000, 1000, 1000,
-                containerLauncher, "localhost", port, iSuper, state, cb, null);
+                containerLauncher, "localhost", port, iSuper, state, cb, null, 
null);
 
             Set<Slot.BlobChanging> changing = new HashSet<>();
             LocallyCachedBlob stormJar = mock(LocallyCachedBlob.class);

Reply via email to