Repository: beam
Updated Branches:
  refs/heads/master db0ec9991 -> 019d3002b


[BEAM-1672] Use Accumulable MetricsContainers in Flink runner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8c2da9ad
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8c2da9ad
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8c2da9ad

Branch: refs/heads/master
Commit: 8c2da9ad1b8c195757f97feccdbcabcad735c407
Parents: 009cd6e
Author: Aviem Zur <aviem...@gmail.com>
Authored: Fri May 5 23:14:01 2017 +0300
Committer: Aviem Zur <aviem...@gmail.com>
Committed: Sat May 6 08:27:49 2017 +0300

----------------------------------------------------------------------
 .../beam/runners/flink/FlinkRunnerResult.java   |   8 +-
 .../metrics/DoFnRunnerWithMetricsUpdate.java    |  12 +-
 .../flink/metrics/FlinkMetricContainer.java     | 273 ++++++-------------
 .../flink/metrics/FlinkMetricResults.java       | 146 ----------
 .../flink/metrics/MetricsAccumulator.java       |  60 ++++
 .../flink/metrics/ReaderInvocationUtil.java     |   7 +-
 .../translation/wrappers/SourceInputFormat.java |   8 +-
 .../streaming/io/BoundedSourceWrapper.java      |   8 +-
 .../streaming/io/UnboundedSourceWrapper.java    |   9 +-
 9 files changed, 174 insertions(+), 357 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
index 90dc79b..038895a 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -17,12 +17,15 @@
  */
 package org.apache.beam.runners.flink;
 
+import static 
org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
-import org.apache.beam.runners.flink.metrics.FlinkMetricResults;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
 import org.joda.time.Duration;
 
 /**
@@ -72,6 +75,7 @@ public class FlinkRunnerResult implements PipelineResult {
 
   @Override
   public MetricResults metrics() {
-    return new FlinkMetricResults(accumulators);
+    return asAttemptedOnlyMetricResults(
+        (MetricsContainerStepMap) 
accumulators.get(FlinkMetricContainer.ACCUMULATOR_NAME));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
index dae91fe..40191d2 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
@@ -34,6 +34,7 @@ import org.joda.time.Instant;
  */
 public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> implements 
DoFnRunner<InputT, OutputT> {
 
+  private final String stepName;
   private final FlinkMetricContainer container;
   private final DoFnRunner<InputT, OutputT> delegate;
 
@@ -41,14 +42,15 @@ public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> 
implements DoFnRunner<
       String stepName,
       DoFnRunner<InputT, OutputT> delegate,
       RuntimeContext runtimeContext) {
+    this.stepName = stepName;
     this.delegate = delegate;
-    container = new FlinkMetricContainer(stepName, runtimeContext);
+    container = new FlinkMetricContainer(runtimeContext);
   }
 
   @Override
   public void startBundle() {
     try (Closeable ignored =
-             
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+             
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName)))
 {
       delegate.startBundle();
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -58,7 +60,7 @@ public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> 
implements DoFnRunner<
   @Override
   public void processElement(final WindowedValue<InputT> elem) {
     try (Closeable ignored =
-             
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+             
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName)))
 {
       delegate.processElement(elem);
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -69,7 +71,7 @@ public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> 
implements DoFnRunner<
   public void onTimer(final String timerId, final BoundedWindow window, final 
Instant timestamp,
                       final TimeDomain timeDomain) {
     try (Closeable ignored =
-             
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+             
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName)))
 {
       delegate.onTimer(timerId, window, timestamp, timeDomain);
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -79,7 +81,7 @@ public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> 
implements DoFnRunner<
   @Override
   public void finishBundle() {
     try (Closeable ignored =
-             
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+             
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName)))
 {
       delegate.finishBundle();
     } catch (IOException e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
index d020f69..f81205e 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
@@ -17,19 +17,24 @@
  */
 package org.apache.beam.runners.flink.metrics;
 
+import static 
org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.beam.sdk.metrics.DistributionData;
-import org.apache.beam.sdk.metrics.GaugeData;
-import org.apache.beam.sdk.metrics.MetricKey;
-import org.apache.beam.sdk.metrics.MetricName;
-import org.apache.beam.sdk.metrics.MetricUpdates;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
+import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Helper class for holding a {@link MetricsContainer} and forwarding Beam 
metrics to
@@ -37,46 +42,61 @@ import org.apache.flink.metrics.Gauge;
  */
 public class FlinkMetricContainer {
 
+  public static final String ACCUMULATOR_NAME = "__metricscontainers";
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkMetricContainer.class);
+
   private static final String METRIC_KEY_SEPARATOR = "__";
-  static final String COUNTER_PREFIX = "__counter";
-  static final String DISTRIBUTION_PREFIX = "__distribution";
-  static final String GAUGE_PREFIX = "__gauge";
+  private static final String COUNTER_PREFIX = "__counter";
+  private static final String DISTRIBUTION_PREFIX = "__distribution";
+  private static final String GAUGE_PREFIX = "__gauge";
 
-  private final MetricsContainer metricsContainer;
   private final RuntimeContext runtimeContext;
   private final Map<String, Counter> flinkCounterCache;
   private final Map<String, FlinkDistributionGauge> 
flinkDistributionGaugeCache;
   private final Map<String, FlinkGauge> flinkGaugeCache;
+  private final MetricsAccumulator metricsAccumulator;
 
-  public FlinkMetricContainer(String stepName, RuntimeContext runtimeContext) {
-    metricsContainer = new MetricsContainer(stepName);
+  public FlinkMetricContainer(RuntimeContext runtimeContext) {
     this.runtimeContext = runtimeContext;
-    flinkCounterCache = new HashMap<>();
-    flinkDistributionGaugeCache = new HashMap<>();
-    flinkGaugeCache = new HashMap<>();
+    this.flinkCounterCache = new HashMap<>();
+    this.flinkDistributionGaugeCache = new HashMap<>();
+    this.flinkGaugeCache = new HashMap<>();
+
+    Accumulator<MetricsContainerStepMap, MetricsContainerStepMap> 
metricsAccumulator =
+        runtimeContext.getAccumulator(ACCUMULATOR_NAME);
+    if (metricsAccumulator == null) {
+      metricsAccumulator = new MetricsAccumulator();
+      try {
+        runtimeContext.addAccumulator(ACCUMULATOR_NAME, metricsAccumulator);
+      } catch (Exception e) {
+        LOG.error("Failed to create metrics accumulator.", e);
+      }
+    }
+    this.metricsAccumulator = (MetricsAccumulator) metricsAccumulator;
   }
 
-  public MetricsContainer getMetricsContainer() {
-    return metricsContainer;
+  MetricsContainer getMetricsContainer(String stepName) {
+    return metricsAccumulator != null
+        ? metricsAccumulator.getLocalValue().getContainer(stepName)
+        : null;
   }
 
-  public void updateMetrics() {
-    // update metrics
-    MetricUpdates updates = metricsContainer.getUpdates();
-    if (updates != null) {
-      updateCounters(updates.counterUpdates());
-      updateDistributions(updates.distributionUpdates());
-      updateGauge(updates.gaugeUpdates());
-      metricsContainer.commitUpdates();
-    }
+  void updateMetrics() {
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(metricsAccumulator.getLocalValue());
+    MetricQueryResults metricQueryResults =
+        metricResults.queryMetrics(MetricsFilter.builder().build());
+    updateCounters(metricQueryResults.counters());
+    updateDistributions(metricQueryResults.distributions());
+    updateGauge(metricQueryResults.gauges());
   }
 
-  private void updateCounters(Iterable<MetricUpdates.MetricUpdate<Long>> 
updates) {
-
-    for (MetricUpdates.MetricUpdate<Long> metricUpdate : updates) {
+  private void updateCounters(Iterable<MetricResult<Long>> counters) {
+    for (MetricResult<Long> metricResult : counters) {
+      String flinkMetricName = getFlinkMetricNameString(COUNTER_PREFIX, 
metricResult);
 
-      String flinkMetricName = getFlinkMetricNameString(COUNTER_PREFIX, 
metricUpdate.getKey());
-      Long update = metricUpdate.getUpdate();
+      Long update = metricResult.attempted();
 
       // update flink metric
       Counter counter = flinkCounterCache.get(flinkMetricName);
@@ -86,26 +106,15 @@ public class FlinkMetricContainer {
       }
       counter.dec(counter.getCount());
       counter.inc(update);
-
-      // update flink accumulator
-      Accumulator<Long, Long> accumulator = 
runtimeContext.getAccumulator(flinkMetricName);
-      if (accumulator == null) {
-        accumulator = new LongCounter(update);
-        runtimeContext.addAccumulator(flinkMetricName, accumulator);
-      } else {
-        accumulator.resetLocal();
-        accumulator.add(update);
-      }
     }
   }
 
-  private void 
updateDistributions(Iterable<MetricUpdates.MetricUpdate<DistributionData>> 
updates) {
-
-    for (MetricUpdates.MetricUpdate<DistributionData> metricUpdate : updates) {
-
+  private void updateDistributions(Iterable<MetricResult<DistributionResult>> 
distributions) {
+    for (MetricResult<DistributionResult> metricResult : distributions) {
       String flinkMetricName =
-          getFlinkMetricNameString(DISTRIBUTION_PREFIX, metricUpdate.getKey());
-      DistributionData update = metricUpdate.getUpdate();
+          getFlinkMetricNameString(DISTRIBUTION_PREFIX, metricResult);
+
+      DistributionResult update = metricResult.attempted();
 
       // update flink metric
       FlinkDistributionGauge gauge = 
flinkDistributionGaugeCache.get(flinkMetricName);
@@ -116,26 +125,15 @@ public class FlinkMetricContainer {
       } else {
         gauge.update(update);
       }
-
-      // update flink accumulator
-      Accumulator<DistributionData, DistributionData> accumulator =
-          runtimeContext.getAccumulator(flinkMetricName);
-      if (accumulator == null) {
-        accumulator = new FlinkDistributionDataAccumulator(update);
-        runtimeContext.addAccumulator(flinkMetricName, accumulator);
-      } else {
-        accumulator.resetLocal();
-        accumulator.add(update);
-      }
     }
   }
 
-  private void updateGauge(Iterable<MetricUpdates.MetricUpdate<GaugeData>> 
updates) {
-    for (MetricUpdates.MetricUpdate<GaugeData> metricUpdate : updates) {
-
+  private void updateGauge(Iterable<MetricResult<GaugeResult>> gauges) {
+    for (MetricResult<GaugeResult> metricResult : gauges) {
       String flinkMetricName =
-          getFlinkMetricNameString(GAUGE_PREFIX, metricUpdate.getKey());
-      GaugeData update = metricUpdate.getUpdate();
+          getFlinkMetricNameString(GAUGE_PREFIX, metricResult);
+
+      GaugeResult update = metricResult.attempted();
 
       // update flink metric
       FlinkGauge gauge = flinkGaugeCache.get(flinkMetricName);
@@ -146,170 +144,55 @@ public class FlinkMetricContainer {
       } else {
         gauge.update(update);
       }
-
-      // update flink accumulator
-      Accumulator<GaugeData, GaugeData> accumulator =
-          runtimeContext.getAccumulator(flinkMetricName);
-      if (accumulator == null) {
-        accumulator = new FlinkGaugeAccumulator(update);
-        runtimeContext.addAccumulator(flinkMetricName, accumulator);
-      }
-      accumulator.resetLocal();
-      accumulator.add(update);
     }
   }
 
-  private static String getFlinkMetricNameString(String prefix, MetricKey key) 
{
+  private static String getFlinkMetricNameString(String prefix, 
MetricResult<?> metricResult) {
     return prefix
-        + METRIC_KEY_SEPARATOR + key.stepName()
-        + METRIC_KEY_SEPARATOR + key.metricName().namespace()
-        + METRIC_KEY_SEPARATOR + key.metricName().name();
-  }
-
-  static MetricKey parseMetricKey(String flinkMetricName) {
-    String[] arr = flinkMetricName.split(METRIC_KEY_SEPARATOR);
-    return MetricKey.create(arr[2], MetricName.named(arr[3], arr[4]));
+        + METRIC_KEY_SEPARATOR + metricResult.step()
+        + METRIC_KEY_SEPARATOR + metricResult.name().namespace()
+        + METRIC_KEY_SEPARATOR + metricResult.name().name();
   }
 
   /**
-   * Flink {@link Gauge} for {@link DistributionData}.
+   * Flink {@link Gauge} for {@link DistributionResult}.
    */
-  public static class FlinkDistributionGauge implements 
Gauge<DistributionData> {
+  public static class FlinkDistributionGauge implements 
Gauge<DistributionResult> {
 
-    DistributionData data;
+    DistributionResult data;
 
-    FlinkDistributionGauge(DistributionData data) {
+    FlinkDistributionGauge(DistributionResult data) {
       this.data = data;
     }
 
-    void update(DistributionData data) {
+    void update(DistributionResult data) {
       this.data = data;
     }
 
     @Override
-    public DistributionData getValue() {
+    public DistributionResult getValue() {
       return data;
     }
   }
 
   /**
-   * Flink {@link Gauge} for {@link GaugeData}.
+   * Flink {@link Gauge} for {@link GaugeResult}.
    */
-  public static class FlinkGauge implements Gauge<GaugeData> {
+  public static class FlinkGauge implements Gauge<GaugeResult> {
 
-    GaugeData data;
+    GaugeResult data;
 
-    FlinkGauge(GaugeData data) {
+    FlinkGauge(GaugeResult data) {
       this.data = data;
     }
 
-    void update(GaugeData update) {
-      this.data = data.combine(update);
+    void update(GaugeResult update) {
+      this.data = update;
     }
 
     @Override
-    public GaugeData getValue() {
+    public GaugeResult getValue() {
       return data;
     }
   }
-
-  /**
-   * Flink {@link Accumulator} for {@link GaugeData}.
-   */
-  public static class FlinkDistributionDataAccumulator implements
-      Accumulator<DistributionData, DistributionData> {
-
-    private static final long serialVersionUID = 1L;
-
-    private DistributionData data;
-
-    public FlinkDistributionDataAccumulator(DistributionData data) {
-      this.data = data;
-    }
-
-    @Override
-    public void add(DistributionData value) {
-      if (data == null) {
-        this.data = value;
-      } else {
-        this.data = this.data.combine(value);
-      }
-    }
-
-    @Override
-    public DistributionData getLocalValue() {
-      return data;
-    }
-
-    @Override
-    public void resetLocal() {
-      data = null;
-    }
-
-    @Override
-    public void merge(Accumulator<DistributionData, DistributionData> other) {
-      data = data.combine(other.getLocalValue());
-    }
-
-    @Override
-    public Accumulator<DistributionData, DistributionData> clone() {
-      try {
-        super.clone();
-      } catch (CloneNotSupportedException e) {
-        throw new RuntimeException(e);
-      }
-
-      return new FlinkDistributionDataAccumulator(
-          DistributionData.create(data.sum(), data.count(), data.min(), 
data.max()));
-    }
-  }
-
-  /**
-   * Flink {@link Accumulator} for {@link GaugeData}.
-   */
-  public static class FlinkGaugeAccumulator implements Accumulator<GaugeData, 
GaugeData> {
-
-    private GaugeData data;
-
-    public FlinkGaugeAccumulator(GaugeData data) {
-      this.data = data;
-    }
-
-    @Override
-    public void add(GaugeData value) {
-      if (data == null) {
-        this.data = value;
-      } else {
-        this.data = this.data.combine(value);
-      }
-    }
-
-    @Override
-    public GaugeData getLocalValue() {
-      return data;
-    }
-
-    @Override
-    public void resetLocal() {
-      this.data = null;
-    }
-
-    @Override
-    public void merge(Accumulator<GaugeData, GaugeData> other) {
-      data = data.combine(other.getLocalValue());
-    }
-
-    @Override
-    public Accumulator<GaugeData, GaugeData> clone() {
-      try {
-        super.clone();
-      } catch (CloneNotSupportedException e) {
-        throw new RuntimeException(e);
-      }
-
-      return new FlinkGaugeAccumulator(
-          GaugeData.create(data.value()));
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java
deleted file mode 100644
index 9e1430b..0000000
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.metrics;
-
-
-import static 
org.apache.beam.runners.flink.metrics.FlinkMetricContainer.COUNTER_PREFIX;
-import static 
org.apache.beam.runners.flink.metrics.FlinkMetricContainer.DISTRIBUTION_PREFIX;
-import static 
org.apache.beam.runners.flink.metrics.FlinkMetricContainer.GAUGE_PREFIX;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.metrics.DistributionData;
-import org.apache.beam.sdk.metrics.DistributionResult;
-import org.apache.beam.sdk.metrics.GaugeData;
-import org.apache.beam.sdk.metrics.GaugeResult;
-import org.apache.beam.sdk.metrics.MetricFiltering;
-import org.apache.beam.sdk.metrics.MetricKey;
-import org.apache.beam.sdk.metrics.MetricName;
-import org.apache.beam.sdk.metrics.MetricQueryResults;
-import org.apache.beam.sdk.metrics.MetricResult;
-import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.metrics.MetricsFilter;
-
-/**
- * Implementation of {@link MetricResults} for the Flink Runner.
- */
-public class FlinkMetricResults extends MetricResults {
-
-  private Map<String, Object> accumulators;
-
-  public FlinkMetricResults(Map<String, Object> accumulators) {
-    this.accumulators = accumulators;
-  }
-
-  @Override
-  public MetricQueryResults queryMetrics(MetricsFilter filter) {
-    return new FlinkMetricQueryResults(filter);
-  }
-
-  private class FlinkMetricQueryResults implements MetricQueryResults {
-
-    private MetricsFilter filter;
-
-    FlinkMetricQueryResults(MetricsFilter filter) {
-      this.filter = filter;
-    }
-
-    @Override
-    public Iterable<MetricResult<Long>> counters() {
-      List<MetricResult<Long>> result = new ArrayList<>();
-      for (Map.Entry<String, Object> accumulator : accumulators.entrySet()) {
-        if (accumulator.getKey().startsWith(COUNTER_PREFIX)) {
-          MetricKey metricKey = 
FlinkMetricContainer.parseMetricKey(accumulator.getKey());
-          if (MetricFiltering.matches(filter, metricKey)) {
-            result.add(new FlinkMetricResult<>(
-                metricKey.metricName(), metricKey.stepName(), (Long) 
accumulator.getValue()));
-          }
-        }
-      }
-      return result;
-    }
-
-    @Override
-    public Iterable<MetricResult<DistributionResult>> distributions() {
-      List<MetricResult<DistributionResult>> result = new ArrayList<>();
-      for (Map.Entry<String, Object> accumulator : accumulators.entrySet()) {
-        if (accumulator.getKey().startsWith(DISTRIBUTION_PREFIX)) {
-          MetricKey metricKey = 
FlinkMetricContainer.parseMetricKey(accumulator.getKey());
-          DistributionData data = (DistributionData) accumulator.getValue();
-          if (MetricFiltering.matches(filter, metricKey)) {
-            result.add(new FlinkMetricResult<>(
-                metricKey.metricName(), metricKey.stepName(), 
data.extractResult()));
-          }
-        }
-      }
-      return result;
-    }
-
-    @Override
-    public Iterable<MetricResult<GaugeResult>> gauges() {
-      List<MetricResult<GaugeResult>> result = new ArrayList<>();
-      for (Map.Entry<String, Object> accumulator : accumulators.entrySet()) {
-        if (accumulator.getKey().startsWith(GAUGE_PREFIX)) {
-          MetricKey metricKey = 
FlinkMetricContainer.parseMetricKey(accumulator.getKey());
-          GaugeData data = (GaugeData) accumulator.getValue();
-          if (MetricFiltering.matches(filter, metricKey)) {
-            result.add(new FlinkMetricResult<>(
-                metricKey.metricName(), metricKey.stepName(), 
data.extractResult()));
-          }
-        }
-      }
-      return result;
-    }
-
-  }
-
-  private static class FlinkMetricResult<T> implements MetricResult<T> {
-    private final MetricName name;
-    private final String step;
-    private final T result;
-
-    FlinkMetricResult(MetricName name, String step, T result) {
-      this.name = name;
-      this.step = step;
-      this.result = result;
-    }
-
-    @Override
-    public MetricName name() {
-      return name;
-    }
-
-    @Override
-    public String step() {
-      return step;
-    }
-
-    @Override
-    public T committed() {
-      throw new UnsupportedOperationException("Flink runner does not currently 
support committed"
-          + " metrics results. Please use 'attempted' instead.");
-    }
-
-    @Override
-    public T attempted() {
-      return result;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java
new file mode 100644
index 0000000..a9dc2ce
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.metrics;
+
+import org.apache.beam.sdk.metrics.MetricsContainerStepMap;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.SimpleAccumulator;
+
+/**
+ * Accumulator of {@link MetricsContainerStepMap}.
+ */
+public class MetricsAccumulator implements 
SimpleAccumulator<MetricsContainerStepMap> {
+  private MetricsContainerStepMap metricsContainers = new 
MetricsContainerStepMap();
+
+  @Override
+  public void add(MetricsContainerStepMap value) {
+    metricsContainers.updateAll(value);
+  }
+
+  @Override
+  public MetricsContainerStepMap getLocalValue() {
+    return metricsContainers;
+  }
+
+  @Override
+  public void resetLocal() {
+    this.metricsContainers = new MetricsContainerStepMap();
+  }
+
+  @Override
+  public void merge(Accumulator<MetricsContainerStepMap, 
MetricsContainerStepMap> other) {
+    this.add(other.getLocalValue());
+  }
+
+  @Override
+  public Accumulator<MetricsContainerStepMap, MetricsContainerStepMap> clone() 
{
+    try {
+      super.clone();
+    } catch (CloneNotSupportedException ignored) {
+    }
+    MetricsAccumulator metricsAccumulator = new MetricsAccumulator();
+    metricsAccumulator.getLocalValue().updateAll(this.getLocalValue());
+    return metricsAccumulator;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
index 38263d9..64738cc 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java
@@ -32,13 +32,16 @@ import org.apache.beam.sdk.options.PipelineOptions;
  */
 public class ReaderInvocationUtil<OutputT, ReaderT extends 
Source.Reader<OutputT>> {
 
+  private final String stepName;
   private final FlinkMetricContainer container;
   private final Boolean enableMetrics;
 
   public ReaderInvocationUtil(
+      String stepName,
       PipelineOptions options,
       FlinkMetricContainer container) {
     FlinkPipelineOptions flinkPipelineOptions = 
options.as(FlinkPipelineOptions.class);
+    this.stepName = stepName;
     enableMetrics = flinkPipelineOptions.getEnableMetrics();
     this.container = container;
   }
@@ -46,7 +49,7 @@ public class ReaderInvocationUtil<OutputT, ReaderT extends 
Source.Reader<OutputT
   public boolean invokeStart(ReaderT reader) throws IOException {
     if (enableMetrics) {
       try (Closeable ignored =
-               
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+               
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName)))
 {
         boolean result = reader.start();
         container.updateMetrics();
         return result;
@@ -59,7 +62,7 @@ public class ReaderInvocationUtil<OutputT, ReaderT extends 
Source.Reader<OutputT
   public boolean invokeAdvance(ReaderT reader) throws IOException {
     if (enableMetrics) {
       try (Closeable ignored =
-               
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) {
+               
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName)))
 {
         boolean result = reader.advance();
         container.updateMetrics();
         return result;

http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
index f2b81fc..27e6912 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -71,9 +71,13 @@ public class SourceInputFormat<T>
 
   @Override
   public void open(SourceInputSplit<T> sourceInputSplit) throws IOException {
-    FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, 
getRuntimeContext());
+    FlinkMetricContainer metricContainer = new 
FlinkMetricContainer(getRuntimeContext());
+
     readerInvoker =
-        new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), 
metricContainer);
+        new ReaderInvocationUtil<>(
+            stepName,
+            serializedOptions.getPipelineOptions(),
+            metricContainer);
 
     reader = ((BoundedSource<T>) 
sourceInputSplit.getSource()).createReader(options);
     inputAvailable = readerInvoker.invokeStart(reader);

http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
index a142685..6d75688 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -104,9 +104,13 @@ public class BoundedSourceWrapper<OutputT>
         numSubtasks,
         localSources);
 
-    FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, 
getRuntimeContext());
+    FlinkMetricContainer metricContainer = new 
FlinkMetricContainer(getRuntimeContext());
+
     ReaderInvocationUtil<OutputT, BoundedSource.BoundedReader<OutputT>> 
readerInvoker =
-        new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), 
metricContainer);
+        new ReaderInvocationUtil<>(
+            stepName,
+            serializedOptions.getPipelineOptions(),
+            metricContainer);
 
     readers = new ArrayList<>();
     // initialize readers from scratch

http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index ee20fd5..b9c431d 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -214,10 +214,13 @@ public class UnboundedSourceWrapper<
 
     context = ctx;
 
-    FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, 
getRuntimeContext());
-    ReaderInvocationUtil<OutputT, UnboundedSource.UnboundedReader<OutputT>> 
readerInvoker =
-        new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), 
metricContainer);
+    FlinkMetricContainer metricContainer = new 
FlinkMetricContainer(getRuntimeContext());
 
+    ReaderInvocationUtil<OutputT, UnboundedSource.UnboundedReader<OutputT>> 
readerInvoker =
+        new ReaderInvocationUtil<>(
+            stepName,
+            serializedOptions.getPipelineOptions(),
+            metricContainer);
 
     if (localReaders.size() == 0) {
       // do nothing, but still look busy ...

Reply via email to