Repository: beam
Updated Branches:
  refs/heads/jstorm-runner 914889925 -> 7a28bf1af


[BEAM-2824] support gauge and PipelineResults.metrics() in local mode.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cda4e629
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cda4e629
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cda4e629

Branch: refs/heads/jstorm-runner
Commit: cda4e6293a13d387cee5c2920335b6bd053574d7
Parents: 9148899
Author: Pei He <p...@apache.org>
Authored: Wed Aug 30 15:16:44 2017 +0800
Committer: Pei He <p...@apache.org>
Committed: Thu Aug 31 13:52:47 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/jstorm/JStormRunnerResult.java |   3 +-
 .../jstorm/translation/JStormMetricResults.java | 105 +++++++++++++++++++
 .../jstorm/translation/MetricsReporter.java     |  30 +++++-
 3 files changed, 136 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cda4e629/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
----------------------------------------------------------------------
diff --git 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
index b6b5281..98d967f 100644
--- 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
+++ 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
@@ -23,6 +23,7 @@ import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import com.alibaba.jstorm.utils.JStormUtils;
 import java.io.IOException;
+import org.apache.beam.runners.jstorm.translation.JStormMetricResults;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.joda.time.Duration;
@@ -102,7 +103,7 @@ public abstract class JStormRunnerResult implements 
PipelineResult {
 
     @Override
     public MetricResults metrics() {
-      throw new UnsupportedOperationException("This method is not yet 
supported.");
+      return new JStormMetricResults();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/cda4e629/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java
----------------------------------------------------------------------
diff --git 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java
 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java
new file mode 100644
index 0000000..dbaa28e
--- /dev/null
+++ 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMetricResults.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import com.alibaba.jstorm.common.metric.AsmCounter;
+import com.alibaba.jstorm.common.metric.AsmGauge;
+import com.alibaba.jstorm.metric.AsmMetricRegistry;
+import com.alibaba.jstorm.metric.AsmWindow;
+import com.alibaba.jstorm.metric.JStormMetrics;
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.metrics.MetricFiltering;
+import org.apache.beam.runners.core.metrics.MetricKey;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.joda.time.Instant;
+
+/**
+ * Implementation of {@link MetricResults} for the JStorm Runner.
+ */
+public class JStormMetricResults extends MetricResults {
+  @Override
+  public MetricQueryResults queryMetrics(MetricsFilter filter) {
+    AsmMetricRegistry metricRegistry = JStormMetrics.getTaskMetrics();
+
+    List<MetricResult<Long>> counters = new ArrayList<>();
+    for (Map.Entry<String, AsmCounter> entry : 
metricRegistry.getCounters().entrySet()) {
+      MetricKey metricKey = MetricsReporter.toMetricKey(entry.getKey());
+      if (!MetricFiltering.matches(filter, metricKey)) {
+        continue;
+      }
+      counters.add(
+          JStormMetricResult.create(
+              metricKey.metricName(),
+              metricKey.stepName(),
+              (Long) entry.getValue().getValue(AsmWindow.M10_WINDOW)));
+    }
+
+    List<MetricResult<GaugeResult>> gauges = new ArrayList<>();
+    for (Map.Entry<String, AsmGauge> entry : 
metricRegistry.getGauges().entrySet()) {
+      MetricKey metricKey = MetricsReporter.toMetricKey(entry.getKey());
+      if (!MetricFiltering.matches(filter, metricKey)) {
+        continue;
+      }
+      gauges.add(
+          JStormMetricResult.create(
+              metricKey.metricName(),
+              metricKey.stepName(),
+              GaugeResult.create(
+                  ((Double) 
entry.getValue().getValue(AsmWindow.M10_WINDOW)).longValue(),
+                  new Instant(0))));
+    }
+
+    return JStormMetricQueryResults.create(counters, gauges);
+  }
+
+  @AutoValue
+  abstract static class JStormMetricQueryResults implements MetricQueryResults 
{
+
+    public abstract @Nullable Iterable<MetricResult<DistributionResult>> 
distributions();
+
+    public static MetricQueryResults create(
+        Iterable<MetricResult<Long>> counters,
+        Iterable<MetricResult<GaugeResult>> gauges) {
+      return new 
AutoValue_JStormMetricResults_JStormMetricQueryResults(counters, gauges, null);
+    }
+  }
+
+  @AutoValue
+  abstract static class JStormMetricResult<T> implements MetricResult<T> {
+    // need to define these here so they appear in the correct order
+    // and the generated constructor is usable and consistent
+    public abstract MetricName name();
+    public abstract String step();
+    public abstract @Nullable T committed();
+    public abstract T attempted();
+
+    public static <T> MetricResult<T> create(MetricName name, String step, T 
attempted) {
+      return new AutoValue_JStormMetricResults_JStormMetricResult<>(name, 
step, null, attempted);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cda4e629/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
----------------------------------------------------------------------
diff --git 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
index 0315a59..cc8c1f8 100644
--- 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
+++ 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
@@ -22,9 +22,13 @@ import static 
org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAtt
 
 import com.alibaba.jstorm.common.metric.AsmCounter;
 import com.alibaba.jstorm.metric.MetricClient;
+import com.alibaba.jstorm.metrics.Gauge;
 import com.google.common.collect.Maps;
 import java.util.Map;
+import org.apache.beam.runners.core.metrics.MetricKey;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
@@ -37,7 +41,7 @@ import org.apache.beam.sdk.metrics.MetricsFilter;
 class MetricsReporter {
 
   private static final String METRIC_KEY_SEPARATOR = "__";
-  private static final String COUNTER_PREFIX = "__counter";
+  private static final String COUNTER_PREFIX = "__metrics";
 
   private final MetricsContainerStepMap metricsContainers = new 
MetricsContainerStepMap();
   private final Map<String, Long> reportedCounters = Maps.newHashMap();
@@ -47,6 +51,18 @@ class MetricsReporter {
     return new MetricsReporter(metricClient);
   }
 
+  /**
+   * Converts JStorm metric name to {@link MetricKey}.
+   */
+  public static MetricKey toMetricKey(String jstormMetricName) {
+    String[] nameSplits = jstormMetricName.split(METRIC_KEY_SEPARATOR);
+    int length = nameSplits.length;
+    String stepName = length > 2 ? nameSplits[length - 3] : "";
+    String namespace = length > 1 ? nameSplits[length - 2] : "";
+    String counterName = length > 0 ? nameSplits[length - 1] : "";
+    return MetricKey.create(stepName, MetricName.named(namespace, 
counterName));
+  }
+
   private MetricsReporter(MetricClient metricClient) {
     this.metricClient = checkNotNull(metricClient, "metricClient");
   }
@@ -60,6 +76,7 @@ class MetricsReporter {
     MetricQueryResults metricQueryResults =
         metricResults.queryMetrics(MetricsFilter.builder().build());
     updateCounters(metricQueryResults.counters());
+    updateGauges(metricQueryResults.gauges());
   }
 
   private void updateCounters(Iterable<MetricResult<Long>> counters) {
@@ -77,6 +94,17 @@ class MetricsReporter {
     }
   }
 
+  private void updateGauges(Iterable<MetricResult<GaugeResult>> gauges) {
+    for (final MetricResult<GaugeResult> gaugeResult : gauges) {
+      String metricName = getMetricNameString(COUNTER_PREFIX, gaugeResult);
+      metricClient.registerGauge(metricName, new Gauge<Double>() {
+        @Override
+        public Double getValue() {
+          return (double) gaugeResult.attempted().value();
+        }});
+    }
+  }
+
   private String getMetricNameString(String prefix, MetricResult<?> 
metricResult) {
     return prefix
         + METRIC_KEY_SEPARATOR + metricResult.step()

Reply via email to