Repository: beam Updated Branches: refs/heads/jstorm-runner 914889925 -> 7a28bf1af
[BEAM-2824] support gauge and PipelineResults.metrics() in local mode. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cda4e629 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cda4e629 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cda4e629 Branch: refs/heads/jstorm-runner Commit: cda4e6293a13d387cee5c2920335b6bd053574d7 Parents: 9148899 Author: Pei He <p...@apache.org> Authored: Wed Aug 30 15:16:44 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Thu Aug 31 13:52:47 2017 +0800 ---------------------------------------------------------------------- .../beam/runners/jstorm/JStormRunnerResult.java | 3 +- .../jstorm/translation/JStormMetricResults.java | 105 +++++++++++++++++++ .../jstorm/translation/MetricsReporter.java | 30 +++++- 3 files changed, 136 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/cda4e629/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java index b6b5281..98d967f 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java @@ -23,6 +23,7 @@ import backtype.storm.Config; import backtype.storm.LocalCluster; import com.alibaba.jstorm.utils.JStormUtils; import java.io.IOException; +import org.apache.beam.runners.jstorm.translation.JStormMetricResults; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; import org.joda.time.Duration; @@ -102,7 +103,7 @@ public abstract class JStormRunnerResult implements PipelineResult { @Override public MetricResults metrics() { - throw new UnsupportedOperationException("This method is not yet supported."); + return new JStormMetricResults(); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/cda4e629/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java new file mode 100644 index 0000000..dbaa28e --- /dev/null +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation; + +import com.alibaba.jstorm.common.metric.AsmCounter; +import com.alibaba.jstorm.common.metric.AsmGauge; +import com.alibaba.jstorm.metric.AsmMetricRegistry; +import com.alibaba.jstorm.metric.AsmWindow; +import com.alibaba.jstorm.metric.JStormMetrics; +import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.runners.core.metrics.MetricFiltering; +import org.apache.beam.runners.core.metrics.MetricKey; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.GaugeResult; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.joda.time.Instant; + +/** + * Implementation of {@link MetricResults} for the JStorm Runner. + */ +public class JStormMetricResults extends MetricResults { + @Override + public MetricQueryResults queryMetrics(MetricsFilter filter) { + AsmMetricRegistry metricRegistry = JStormMetrics.getTaskMetrics(); + + List<MetricResult<Long>> counters = new ArrayList<>(); + for (Map.Entry<String, AsmCounter> entry : metricRegistry.getCounters().entrySet()) { + MetricKey metricKey = MetricsReporter.toMetricKey(entry.getKey()); + if (!MetricFiltering.matches(filter, metricKey)) { + continue; + } + counters.add( + JStormMetricResult.create( + metricKey.metricName(), + metricKey.stepName(), + (Long) entry.getValue().getValue(AsmWindow.M10_WINDOW))); + } + + List<MetricResult<GaugeResult>> gauges = new ArrayList<>(); + for (Map.Entry<String, AsmGauge> entry : metricRegistry.getGauges().entrySet()) { + MetricKey metricKey = MetricsReporter.toMetricKey(entry.getKey()); + if (!MetricFiltering.matches(filter, metricKey)) { + continue; + } + gauges.add( + JStormMetricResult.create( + metricKey.metricName(), + metricKey.stepName(), + GaugeResult.create( + ((Double) entry.getValue().getValue(AsmWindow.M10_WINDOW)).longValue(), + new Instant(0)))); + } + + return JStormMetricQueryResults.create(counters, gauges); + } + + @AutoValue + abstract static class JStormMetricQueryResults implements MetricQueryResults { + + public abstract @Nullable Iterable<MetricResult<DistributionResult>> distributions(); + + public static MetricQueryResults create( + Iterable<MetricResult<Long>> counters, + Iterable<MetricResult<GaugeResult>> gauges) { + return new AutoValue_JStormMetricResults_JStormMetricQueryResults(counters, gauges, null); + } + } + + @AutoValue + abstract static class JStormMetricResult<T> implements MetricResult<T> { + // need to define these here so they appear in the correct order + // and the generated constructor is usable and consistent + public abstract MetricName name(); + public abstract String step(); + public abstract @Nullable T committed(); + public abstract T attempted(); + + public static <T> MetricResult<T> create(MetricName name, String step, T attempted) { + return new AutoValue_JStormMetricResults_JStormMetricResult<>(name, step, null, attempted); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cda4e629/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java index 0315a59..cc8c1f8 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java @@ -22,9 +22,13 @@ import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAtt import com.alibaba.jstorm.common.metric.AsmCounter; import com.alibaba.jstorm.metric.MetricClient; +import com.alibaba.jstorm.metrics.Gauge; import com.google.common.collect.Maps; import java.util.Map; +import org.apache.beam.runners.core.metrics.MetricKey; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; +import org.apache.beam.sdk.metrics.GaugeResult; +import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.MetricResults; @@ -37,7 +41,7 @@ import org.apache.beam.sdk.metrics.MetricsFilter; class MetricsReporter { private static final String METRIC_KEY_SEPARATOR = "__"; - private static final String COUNTER_PREFIX = "__counter"; + private static final String COUNTER_PREFIX = "__metrics"; private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap(); private final Map<String, Long> reportedCounters = Maps.newHashMap(); @@ -47,6 +51,18 @@ class MetricsReporter { return new MetricsReporter(metricClient); } + /** + * Converts JStorm metric name to {@link MetricKey}. + */ + public static MetricKey toMetricKey(String jstormMetricName) { + String[] nameSplits = jstormMetricName.split(METRIC_KEY_SEPARATOR); + int length = nameSplits.length; + String stepName = length > 2 ? nameSplits[length - 3] : ""; + String namespace = length > 1 ? nameSplits[length - 2] : ""; + String counterName = length > 0 ? nameSplits[length - 1] : ""; + return MetricKey.create(stepName, MetricName.named(namespace, counterName)); + } + private MetricsReporter(MetricClient metricClient) { this.metricClient = checkNotNull(metricClient, "metricClient"); } @@ -60,6 +76,7 @@ class MetricsReporter { MetricQueryResults metricQueryResults = metricResults.queryMetrics(MetricsFilter.builder().build()); updateCounters(metricQueryResults.counters()); + updateGauges(metricQueryResults.gauges()); } private void updateCounters(Iterable<MetricResult<Long>> counters) { @@ -77,6 +94,17 @@ class MetricsReporter { } } + private void updateGauges(Iterable<MetricResult<GaugeResult>> gauges) { + for (final MetricResult<GaugeResult> gaugeResult : gauges) { + String metricName = getMetricNameString(COUNTER_PREFIX, gaugeResult); + metricClient.registerGauge(metricName, new Gauge<Double>() { + @Override + public Double getValue() { + return (double) gaugeResult.attempted().value(); + }}); + } + } + private String getMetricNameString(String prefix, MetricResult<?> metricResult) { return prefix + METRIC_KEY_SEPARATOR + metricResult.step()