Repository: storm Updated Branches: refs/heads/master c8fab2065 -> a323987bb
STORM-2939 add WorkerMetricsProcessor interface Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1eb355ef Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1eb355ef Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1eb355ef Branch: refs/heads/master Commit: 1eb355ef7825740686e2294238c51f3495cbeb88 Parents: 5f05739 Author: Aaron Gresch <agre...@yahoo-inc.com> Authored: Thu Feb 8 16:09:19 2018 -0600 Committer: Aaron Gresch <agre...@yahoo-inc.com> Committed: Thu Feb 8 16:09:19 2018 -0600 ---------------------------------------------------------------------- conf/defaults.yaml | 1 + docs/storm-metricstore.md | 3 ++ .../java/org/apache/storm/DaemonConfig.java | 8 +++- .../storm/daemon/supervisor/Container.java | 10 +++-- .../daemon/supervisor/ReadClusterState.java | 13 ++++++- .../apache/storm/daemon/supervisor/Slot.java | 13 +++++-- .../storm/metricstore/MetricStoreConfig.java | 18 +++++++++ .../metricstore/NimbusMetricProcessor.java | 41 ++++++++++++++++++++ .../metricstore/WorkerMetricsProcessor.java | 40 +++++++++++++++++++ .../storm/daemon/supervisor/SlotTest.java | 14 +++---- 10 files changed, 144 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index e15a265..fa8af24 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -296,6 +296,7 @@ storm.daemon.metrics.reporter.plugins: - "org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter" storm.metricstore.class: "org.apache.storm.metricstore.rocksdb.RocksDbStore" +storm.metricprocessor.class: "org.apache.storm.metricstore.NimbusMetricProcessor" storm.metricstore.rocksdb.location: "storm_rocks" storm.metricstore.rocksdb.create_if_missing: true storm.metricstore.rocksdb.metadata_string_cache_capacity: 4000 http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/docs/storm-metricstore.md ---------------------------------------------------------------------- diff --git a/docs/storm-metricstore.md b/docs/storm-metricstore.md index 4111322..cafc2b5 100644 --- a/docs/storm-metricstore.md +++ b/docs/storm-metricstore.md @@ -23,6 +23,7 @@ The following configuation options exist: ```yaml storm.metricstore.class: "org.apache.storm.metricstore.rocksdb.RocksDbStore" +storm.metricprocessor.class: "org.apache.storm.metricstore.NimbusMetricProcessor" storm.metricstore.rocksdb.location: "storm_rocks" storm.metricstore.rocksdb.create_if_missing: true storm.metricstore.rocksdb.metadata_string_cache_capacity: 4000 @@ -31,6 +32,8 @@ storm.metricstore.rocksdb.retention_hours: 240 * storm.metricstore.class is the class that implements the ([`MetricStore`]({{page.git-blob-base}}/storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java)). +* storm.metricprocessor.class is the class that implements the +([`WorkerMetricsProcessor`]({{page.git-blob-base}}/storm-server/src/main/java/org/apache/storm/metricstore/WorkerMetricsProcessor.java)). * storm.metricstore.rocksdb.location provides to location of the RocksDB database on Nimbus * storm.metricstore.rocksdb.create_if_missing permits creating a RocksDB database if missing * storm.metricstore.rocksdb.metadata_string_cache_capacity controls the number of metadata strings cached in memory. http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/storm-server/src/main/java/org/apache/storm/DaemonConfig.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java index 3230f70..8e88bef 100644 --- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java +++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java @@ -33,7 +33,6 @@ import static org.apache.storm.validation.ConfigValidationAnnotations.isNoDuplic import static org.apache.storm.validation.ConfigValidationAnnotations.isMapEntryCustom; import org.apache.storm.container.ResourceIsolationInterface; -import org.apache.storm.metricstore.MetricStore; import org.apache.storm.nimbus.ITopologyActionNotifierPlugin; import org.apache.storm.scheduler.blacklist.reporters.IReporter; import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy; @@ -1041,6 +1040,13 @@ public class DaemonConfig implements Validated { public static final String STORM_METRIC_STORE_CLASS = "storm.metricstore.class"; /** + * Class implementing WorkerMetricsProcessor. + */ + @NotNull + @isString + public static final String STORM_METRIC_PROCESSOR_CLASS = "storm.metricprocessor.class"; + + /** * RocksDB file location. This setting is specific to the org.apache.storm.metricstore.rocksdb.RocksDbStore * implementation for the storm.metricstore.class. */ http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java index a06e44c..9958f1b 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java @@ -44,6 +44,8 @@ import org.apache.storm.generated.WorkerMetricPoint; import org.apache.storm.generated.WorkerMetricList; import org.apache.storm.generated.WorkerMetrics; import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.metricstore.MetricException; +import org.apache.storm.metricstore.WorkerMetricsProcessor; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.LocalState; import org.apache.storm.utils.NimbusClient; @@ -710,7 +712,7 @@ public abstract class Container implements Killable { /** * Send worker metrics to Nimbus. */ - void processMetrics(OnlyLatestExecutor<Integer> exec) { + void processMetrics(OnlyLatestExecutor<Integer> exec, WorkerMetricsProcessor processor) { try { if (_usedMemory.get(_port) != null) { // Make sure we don't process too frequently. @@ -733,9 +735,9 @@ public abstract class Container implements Killable { WorkerMetrics metrics = new WorkerMetrics(_topologyId, _port, hostname, metricList); exec.execute(_port, () -> { - try (NimbusClient client = NimbusClient.getConfiguredClient(_conf)) { - client.getClient().processWorkerMetrics(metrics); - } catch (Exception e) { + try { + processor.processWorkerMetrics(_conf, metrics); + } catch (MetricException e) { LOG.error("Failed to process metrics", e); } }); http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java index 5d8bb33..23b8b64 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/ReadClusterState.java @@ -42,6 +42,8 @@ import org.apache.storm.generated.NodeInfo; import org.apache.storm.generated.ProfileRequest; import org.apache.storm.generated.WorkerResources; import org.apache.storm.localizer.AsyncLocalizer; +import org.apache.storm.metricstore.MetricStoreConfig; +import org.apache.storm.metricstore.WorkerMetricsProcessor; import org.apache.storm.scheduler.ISupervisor; import org.apache.storm.utils.LocalState; import org.apache.storm.utils.Time; @@ -66,6 +68,7 @@ public class ReadClusterState implements Runnable, AutoCloseable { private final LocalState localState; private final AtomicReference<Map<Long, LocalAssignment>> cachedAssignments; private final OnlyLatestExecutor<Integer> metricsExec; + private WorkerMetricsProcessor metricsProcessor; public ReadClusterState(Supervisor supervisor) throws Exception { this.superConf = supervisor.getConf(); @@ -81,6 +84,14 @@ public class ReadClusterState implements Runnable, AutoCloseable { this.metricsExec = new OnlyLatestExecutor<>(supervisor.getHeartbeatExecutor()); this.launcher = ContainerLauncher.make(superConf, assignmentId, supervisor.getSharedContext()); + + this.metricsProcessor = null; + try { + this.metricsProcessor = MetricStoreConfig.configureMetricProcessor(superConf); + } catch (Exception e) { + // the metrics processor is not critical to the operation of the cluster, allow Supervisor to come up + LOG.error("Failed to initialize metric processor", e); + } @SuppressWarnings("unchecked") List<Number> ports = (List<Number>)superConf.get(DaemonConfig.SUPERVISOR_SLOTS_PORTS); @@ -110,7 +121,7 @@ public class ReadClusterState implements Runnable, AutoCloseable { private Slot mkSlot(int port) throws Exception { return new Slot(localizer, superConf, launcher, host, port, - localState, stormClusterState, iSuper, cachedAssignments, metricsExec); + localState, stormClusterState, iSuper, cachedAssignments, metricsExec, metricsProcessor); } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java index 6700291..5a629ad 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java @@ -49,6 +49,7 @@ import org.apache.storm.localizer.BlobChangingCallback; import org.apache.storm.localizer.GoodToGo; import org.apache.storm.localizer.LocallyCachedBlob; import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.metricstore.WorkerMetricsProcessor; import org.apache.storm.scheduler.ISupervisor; import org.apache.storm.utils.LocalState; import org.apache.storm.utils.ObjectReader; @@ -96,13 +97,15 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback public final LocalState localState; public final BlobChangingCallback changingCallback; public final OnlyLatestExecutor<Integer> metricsExec; + public final WorkerMetricsProcessor metricsProcessor; StaticState(AsyncLocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs, long killSleepMs, long monitorFreqMs, ContainerLauncher containerLauncher, String host, int port, ISupervisor iSupervisor, LocalState localState, BlobChangingCallback changingCallback, - OnlyLatestExecutor<Integer> metricsExec) { + OnlyLatestExecutor<Integer> metricsExec, + WorkerMetricsProcessor metricsProcessor) { this.localizer = localizer; this.hbTimeoutMs = hbTimeoutMs; this.firstHbTimeoutMs = firstHbTimeoutMs; @@ -115,6 +118,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback this.localState = localState; this.changingCallback = changingCallback; this.metricsExec = metricsExec; + this.metricsProcessor = metricsProcessor; } } @@ -940,7 +944,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback dynamicState = dynamicState.withProfileActions(mod, modPending); } - dynamicState.container.processMetrics(staticState.metricsExec); + dynamicState.container.processMetrics(staticState.metricsExec, staticState.metricsProcessor); Time.sleep(staticState.monitorFreqMs); return dynamicState; @@ -982,7 +986,8 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback IStormClusterState clusterState, ISupervisor iSupervisor, AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments, - OnlyLatestExecutor<Integer> metricsExec) throws Exception { + OnlyLatestExecutor<Integer> metricsExec, + WorkerMetricsProcessor metricsProcessor) throws Exception { super("SLOT_"+port); this.metricsExec = metricsExec; @@ -1031,7 +1036,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback iSupervisor, localState, this, - metricsExec); + metricsExec, metricsProcessor); this.newAssignment.set(dynamicState.newAssignment); if (MachineState.RUNNING == dynamicState.state) { //We are running so we should recover the blobs. http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java index 2f2ad76..96bd9e4 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java @@ -41,5 +41,23 @@ public class MetricStoreConfig { throw new MetricException("Failed to create metric store", t); } } + + /** + * Configures metric processor to use the class specified in the conf. + * @param conf Storm config map + * @return WorkerMetricsProcessor prepared processor + * @throws MetricException on misconfiguration + */ + public static WorkerMetricsProcessor configureMetricProcessor(Map conf) throws MetricException { + + try { + String processorClass = (String)conf.get(DaemonConfig.STORM_METRIC_PROCESSOR_CLASS); + WorkerMetricsProcessor processor = (WorkerMetricsProcessor) (Class.forName(processorClass)).newInstance(); + processor.prepare(conf); + return processor; + } catch (Throwable t) { + throw new MetricException("Failed to create metric processor", t); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/storm-server/src/main/java/org/apache/storm/metricstore/NimbusMetricProcessor.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/NimbusMetricProcessor.java b/storm-server/src/main/java/org/apache/storm/metricstore/NimbusMetricProcessor.java new file mode 100644 index 0000000..64688c9 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/metricstore/NimbusMetricProcessor.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore; + +import java.util.Map; +import org.apache.storm.generated.WorkerMetrics; +import org.apache.storm.utils.NimbusClient; +import org.apache.thrift.TException; + +/** + * Implementation of WorkerMetricsProcessor that sends metric data to Nimbus for processing. + */ +public class NimbusMetricProcessor implements WorkerMetricsProcessor { + @Override + public void processWorkerMetrics(Map<String, Object> conf, WorkerMetrics metrics) throws MetricException { + try (NimbusClient client = NimbusClient.getConfiguredClient(conf)) { + client.getClient().processWorkerMetrics(metrics); + } catch (TException e) { + throw new MetricException("Failed to process metrics", e); + } + } + + @Override + public void prepare(Map config) throws MetricException {} +} http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/storm-server/src/main/java/org/apache/storm/metricstore/WorkerMetricsProcessor.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/WorkerMetricsProcessor.java b/storm-server/src/main/java/org/apache/storm/metricstore/WorkerMetricsProcessor.java new file mode 100644 index 0000000..72e0ac0 --- /dev/null +++ b/storm-server/src/main/java/org/apache/storm/metricstore/WorkerMetricsProcessor.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.storm.metricstore; + +import java.util.Map; +import org.apache.storm.generated.WorkerMetrics; + +public interface WorkerMetricsProcessor { + + /** + * Process insertion of worker metrics. + * @param conf Storm config map + * @param metrics the metrics to process + * @throws MetricException on error + */ + void processWorkerMetrics(Map<String, Object> conf, WorkerMetrics metrics) throws MetricException; + + /** + * Prepares the metric processor. + * @param config Storm config map + * @throws MetricException on error + */ + void prepare(Map config) throws MetricException; +} http://git-wip-us.apache.org/repos/asf/storm/blob/1eb355ef/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java index 08a5266..2867e6a 100644 --- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java +++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/SlotTest.java @@ -149,7 +149,7 @@ public class SlotTest { ContainerLauncher containerLauncher = mock(ContainerLauncher.class); ISupervisor iSuper = mock(ISupervisor.class); StaticState staticState = new StaticState(localizer, 1000, 1000, 1000, 1000, - containerLauncher, "localhost", 8080, iSuper, state, cb, null); + containerLauncher, "localhost", 8080, iSuper, state, cb, null, null); DynamicState dynamicState = new DynamicState(null, null, null); DynamicState nextState = Slot.handleEmpty(dynamicState, staticState); assertEquals(MachineState.EMPTY, nextState.state); @@ -181,7 +181,7 @@ public class SlotTest { ISupervisor iSuper = mock(ISupervisor.class); StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, - containerLauncher, "localhost", port, iSuper, state, cb, null); + containerLauncher, "localhost", port, iSuper, state, cb, null, null); DynamicState dynamicState = new DynamicState(null, null, null) .withNewAssignment(newAssignment); @@ -250,7 +250,7 @@ public class SlotTest { ISupervisor iSuper = mock(ISupervisor.class); LocalState state = mock(LocalState.class); StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, - containerLauncher, "localhost", port, iSuper, state, cb, null); + containerLauncher, "localhost", port, iSuper, state, cb, null, null); DynamicState dynamicState = new DynamicState(assignment, container, assignment); DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState); @@ -311,7 +311,7 @@ public class SlotTest { ISupervisor iSuper = mock(ISupervisor.class); StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, - containerLauncher, "localhost", port, iSuper, state, cb, null); + containerLauncher, "localhost", port, iSuper, state, cb, null, null); DynamicState dynamicState = new DynamicState(cAssignment, cContainer, nAssignment); DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState); @@ -392,7 +392,7 @@ public class SlotTest { ISupervisor iSuper = mock(ISupervisor.class); LocalState state = mock(LocalState.class); StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, - containerLauncher, "localhost", port, iSuper, state, cb, null); + containerLauncher, "localhost", port, iSuper, state, cb, null, null); DynamicState dynamicState = new DynamicState(cAssignment, cContainer, null); DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState); @@ -453,7 +453,7 @@ public class SlotTest { ISupervisor iSuper = mock(ISupervisor.class); LocalState state = mock(LocalState.class); StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, - containerLauncher, "localhost", port, iSuper, state, cb, null); + containerLauncher, "localhost", port, iSuper, state, cb, null, null); Set<TopoProfileAction> profileActions = new HashSet<>(); ProfileRequest request = new ProfileRequest(); request.set_action(ProfileAction.JPROFILE_STOP); @@ -531,7 +531,7 @@ public class SlotTest { ISupervisor iSuper = mock(ISupervisor.class); long heartbeatTimeoutMs = 5000; StaticState staticState = new StaticState(localizer, heartbeatTimeoutMs, 120_000, 1000, 1000, - containerLauncher, "localhost", port, iSuper, state, cb, null); + containerLauncher, "localhost", port, iSuper, state, cb, null, null); Set<Slot.BlobChanging> changing = new HashSet<>(); LocallyCachedBlob stormJar = mock(LocallyCachedBlob.class);