[ 
https://issues.apache.org/jira/browse/BEAM-6165?focusedWorklogId=177572&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177572
 ]

ASF GitHub Bot logged work on BEAM-6165:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Dec/18 16:10
            Start Date: 20/Dec/18 16:10
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #7183: [BEAM-6165] send 
metrics to Flink in portable Flink runner
URL: https://github.com/apache/beam/pull/7183
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
index 85425a5c7a9d..c39fee074ecd 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DistributionCell.java
@@ -57,6 +57,11 @@ public void update(long n) {
     update(DistributionData.singleton(n));
   }
 
+  @Override
+  public void update(long sum, long count, long min, long max) {
+    update(DistributionData.create(sum, count, min, max));
+  }
+
   void update(DistributionData data) {
     DistributionData original;
     do {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index 9802defeb1a4..f766cf0b60b3 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.flink;
 
+import static 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getDefaultLocalParallelism;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Splitter;
 import java.util.List;
@@ -132,7 +134,9 @@ static StreamExecutionEnvironment 
createStreamExecutionEnvironment(
 
     // depending on the master, create the right environment.
     if ("[local]".equals(masterUrl)) {
-      flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
+      flinkStreamEnv =
+          StreamExecutionEnvironment.createLocalEnvironment(
+              getDefaultLocalParallelism(), flinkConfig);
     } else if ("[auto]".equals(masterUrl)) {
       flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
     } else if (masterUrl.matches(".*:\\d*")) {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java
new file mode 100644
index 000000000000..83a707d41785
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.metrics;
+
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+
+/**
+ * Flink {@link org.apache.flink.metrics.reporter.MetricReporter metrics 
reporter} for writing
+ * metrics to a file specified via the "metrics.reporter.file.path" config key 
(assuming an alias of
+ * "file" for this reporter in the "metrics.reporters" setting).
+ */
+public class FileReporter extends AbstractReporter {
+  @Override
+  public String filterCharacters(String input) {
+    return input;
+  }
+
+  private String path;
+  private PrintStream ps;
+
+  @Override
+  public void open(MetricConfig config) {
+    synchronized (this) {
+      if (path == null) {
+        path = config.getString("path", null);
+        log.info("Opening file: {}", path);
+        if (path == null) {
+          throw new IllegalStateException("FileReporter metrics config needs 
'path' key");
+        }
+        try {
+          FileOutputStream fos = new FileOutputStream(path);
+          ps = new PrintStream(fos);
+        } catch (FileNotFoundException e) {
+          throw new IllegalStateException("FileReporter couldn't open file", 
e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void notifyOfRemovedMetric(Metric metric, String metricName, 
MetricGroup group) {
+    final String name = group.getMetricIdentifier(metricName, this);
+    super.notifyOfRemovedMetric(metric, metricName, group);
+    synchronized (this) {
+      ps.printf("%s: %s%n", name, Metrics.toString(metric));
+    }
+  }
+
+  @Override
+  public void close() {
+    ps.close();
+    log.info("wrote metrics to {}", path);
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
index 73fe62e219fc..71e8879eae51 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
@@ -17,14 +17,19 @@
  */
 package org.apache.beam.runners.flink.metrics;
 
+import static 
org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoUrns.Enum.USER_COUNTER_URN_PREFIX;
 import static 
org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.DistributionResult;
 import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
@@ -77,12 +82,72 @@ public FlinkMetricContainer(RuntimeContext runtimeContext) {
     this.metricsAccumulator = (MetricsAccumulator) metricsAccumulator;
   }
 
-  MetricsContainer getMetricsContainer(String stepName) {
+  public MetricsContainer getMetricsContainer(String stepName) {
     return metricsAccumulator != null
         ? metricsAccumulator.getLocalValue().getContainer(stepName)
         : null;
   }
 
+  /**
+   * Parse a {@link MetricName} from a {@link
+   * org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoUrns.Enum}
+   *
+   * <p>Should be consistent with {@code parse_namespace_and_name} in 
monitoring_infos.py
+   *
+   * <p>TODO: not flink-specific; where should it live?
+   */
+  public static MetricName parseUrn(String urn) {
+    if (urn.startsWith(USER_COUNTER_URN_PREFIX.toString())) {
+      urn = urn.substring(USER_COUNTER_URN_PREFIX.toString().length());
+    }
+    // If it is not a user counter, just use the first part of the URN, i.e. 
'beam'
+    String[] pieces = urn.split(":", 2);
+    if (pieces.length != 2) {
+      throw new IllegalArgumentException("Invalid metric URN: " + urn);
+    }
+    return MetricName.named(pieces[0], pieces[1]);
+  }
+
+  public void updateMetrics(String stepName, List<BeamFnApi.MonitoringInfo> 
monitoringInfos) {
+    MetricsContainer metricsContainer = getMetricsContainer(stepName);
+    monitoringInfos.forEach(
+        monitoringInfo -> {
+          if (monitoringInfo.hasMetric()) {
+            String urn = monitoringInfo.getUrn();
+            MetricName metricName = parseUrn(urn);
+            BeamFnApi.Metric metric = monitoringInfo.getMetric();
+            if (metric.hasCounterData()) {
+              BeamFnApi.CounterData counterData = metric.getCounterData();
+              org.apache.beam.sdk.metrics.Counter counter = 
metricsContainer.getCounter(metricName);
+              if (counterData.getValueCase() == 
BeamFnApi.CounterData.ValueCase.INT64_VALUE) {
+                counter.inc(counterData.getInt64Value());
+              } else {
+                throw new IllegalArgumentException("Unsupported CounterData 
type: " + counterData);
+              }
+            } else if (metric.hasDistributionData()) {
+              BeamFnApi.DistributionData distributionData = 
metric.getDistributionData();
+              Distribution distribution = 
metricsContainer.getDistribution(metricName);
+              if (distributionData.hasIntDistributionData()) {
+                BeamFnApi.IntDistributionData intDistributionData =
+                    distributionData.getIntDistributionData();
+                distribution.update(
+                    intDistributionData.getSum(),
+                    intDistributionData.getCount(),
+                    intDistributionData.getMin(),
+                    intDistributionData.getMax());
+              } else {
+                throw new IllegalArgumentException(
+                    "Unsupported DistributionData type: " + distributionData);
+              }
+            } else if (metric.hasExtremaData()) {
+              BeamFnApi.ExtremaData extremaData = metric.getExtremaData();
+              throw new IllegalArgumentException("Extrema metric unsupported: 
" + extremaData);
+            }
+          }
+        });
+    updateMetrics(stepName);
+  }
+
   void updateMetrics(String stepName) {
     MetricResults metricResults = 
asAttemptedOnlyMetricResults(metricsAccumulator.getLocalValue());
     MetricQueryResults metricQueryResults =
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/Metrics.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/Metrics.java
new file mode 100644
index 000000000000..697b22bdad53
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/Metrics.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+
+/** Helper for pretty-printing {@link Metric Flink metrics}. */
+public class Metrics {
+  public static String toString(Metric metric) {
+    if (metric instanceof Counter) {
+      return Long.toString(((Counter) metric).getCount());
+    } else if (metric instanceof Gauge) {
+      return ((Gauge) metric).getValue().toString();
+    } else if (metric instanceof Meter) {
+      return Double.toString(((Meter) metric).getRate());
+    } else if (metric instanceof Histogram) {
+      HistogramStatistics stats = ((Histogram) metric).getStatistics();
+      return String.format(
+          "count=%d, min=%d, max=%d, mean=%f, stddev=%f, p50=%f, p75=%f, 
p95=%f",
+          stats.size(),
+          stats.getMin(),
+          stats.getMax(),
+          stats.getMean(),
+          stats.getStdDev(),
+          stats.getQuantile(0.5),
+          stats.getQuantile(0.75),
+          stats.getQuantile(0.95));
+    } else {
+      throw new IllegalStateException(
+          String.format(
+              "Cannot remove unknown metric type %s. This indicates that the 
reporter "
+                  + "does not support this metric type.",
+              metric.getClass().getName()));
+    }
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index f311e2a7fd70..91d7d1cd3c5b 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -30,6 +30,8 @@
 import java.util.function.BiConsumer;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
+import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.InMemoryStateInternals;
@@ -43,6 +45,7 @@
 import org.apache.beam.runners.core.construction.Timer;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.TimerReference;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
 import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
 import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
@@ -97,9 +100,12 @@
   private final Map<String, Integer> outputMap;
   private final FlinkExecutableStageContext.Factory contextFactory;
   private final Coder windowCoder;
+  // Unique name for namespacing metrics; currently just takes the input ID
+  private final String stageName;
 
   // Worker-local fields. These should only be constructed and consumed on 
Flink TaskManagers.
   private transient RuntimeContext runtimeContext;
+  private transient FlinkMetricContainer container;
   private transient StateRequestHandler stateRequestHandler;
   private transient FlinkExecutableStageContext stageContext;
   private transient StageBundleFactory stageBundleFactory;
@@ -121,6 +127,7 @@ public FlinkExecutableStageFunction(
     this.outputMap = outputMap;
     this.contextFactory = contextFactory;
     this.windowCoder = windowCoder;
+    this.stageName = stagePayload.getInput();
   }
 
   @Override
@@ -130,6 +137,7 @@ public void open(Configuration parameters) throws Exception 
{
     FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
     executableStage = ExecutableStage.fromPayload(stagePayload);
     runtimeContext = getRuntimeContext();
+    container = new FlinkMetricContainer(getRuntimeContext());
     // TODO: Wire this into the distributed cache and make it pluggable.
     stageContext = contextFactory.get(jobInfo);
     stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
@@ -139,7 +147,18 @@ public void open(Configuration parameters) throws 
Exception {
     stateRequestHandler =
         getStateRequestHandler(
             executableStage, stageBundleFactory.getProcessBundleDescriptor(), 
runtimeContext);
-    progressHandler = BundleProgressHandler.ignored();
+    progressHandler =
+        new BundleProgressHandler() {
+          @Override
+          public void onProgress(ProcessBundleProgressResponse progress) {
+            container.updateMetrics(stageName, 
progress.getMonitoringInfosList());
+          }
+
+          @Override
+          public void onCompleted(ProcessBundleResponse response) {
+            container.updateMetrics(stageName, 
response.getMonitoringInfosList());
+          }
+        };
   }
 
   private StateRequestHandler getStateRequestHandler(
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 1e42d7e6afa8..73e0bed214a9 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -148,7 +148,7 @@
 
   protected transient FlinkStateInternals<?> keyedStateInternals;
 
-  private final String stepName;
+  protected final String stepName;
 
   private final Coder<WindowedValue<InputT>> windowedInputCoder;
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 31ad20a98917..5270a511589e 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -33,6 +33,8 @@
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
+import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.DoFnRunner;
@@ -43,6 +45,7 @@
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.construction.Timer;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkStreamingSideInputHandlerFactory;
 import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
@@ -104,6 +107,7 @@
   private transient StageBundleFactory stageBundleFactory;
   private transient ExecutableStage executableStage;
   private transient SdkHarnessDoFnRunner<InputT, OutputT> sdkHarnessRunner;
+  private transient FlinkMetricContainer flinkMetricContainer;
   private transient long backupWatermarkHold = Long.MIN_VALUE;
 
   public ExecutableStageDoFnOperator(
@@ -157,10 +161,22 @@ public void open() throws Exception {
     // bundle "factory" (manager?) but not the job or Flink bundle factories. 
How do we make
     // ownership of the higher level "factories" explicit? Do we care?
     stageContext = contextFactory.get(jobInfo);
+    flinkMetricContainer = new FlinkMetricContainer(getRuntimeContext());
 
     stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
     stateRequestHandler = getStateRequestHandler(executableStage);
-    progressHandler = BundleProgressHandler.ignored();
+    progressHandler =
+        new BundleProgressHandler() {
+          @Override
+          public void onProgress(ProcessBundleProgressResponse progress) {
+            flinkMetricContainer.updateMetrics(stepName, 
progress.getMonitoringInfosList());
+          }
+
+          @Override
+          public void onCompleted(ProcessBundleResponse response) {
+            flinkMetricContainer.updateMetrics(stepName, 
response.getMonitoringInfosList());
+          }
+        };
 
     // This will call {@code createWrappingDoFnRunner} which needs the above 
dependencies.
     super.open();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DeltaDistributionCell.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DeltaDistributionCell.java
index 12ac19227087..f930aa96e501 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DeltaDistributionCell.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DeltaDistributionCell.java
@@ -52,6 +52,11 @@ void update(DistributionData data) {
     } while (!value.compareAndSet(original, original.combine(data)));
   }
 
+  @Override
+  public void update(long sum, long count, long min, long max) {
+    update(DistributionData.create(sum, count, min, max));
+  }
+
   @Override
   public DirtyState getDirty() {
     throw new UnsupportedOperationException(
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
index 06cbad571b5d..4922a04d4b7a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
@@ -25,4 +25,6 @@
 public interface Distribution extends Metric {
   /** Add an observation to this distribution. */
   void update(long value);
+
+  void update(long sum, long count, long min, long max);
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
index 36afd750c68f..fd22120b6772 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
@@ -157,6 +157,14 @@ public void update(long value) {
       }
     }
 
+    @Override
+    public void update(long sum, long count, long min, long max) {
+      MetricsContainer container = MetricsEnvironment.getCurrentContainer();
+      if (container != null) {
+        container.getDistribution(name).update(sum, count, min, max);
+      }
+    }
+
     @Override
     public MetricName getName() {
       return name;
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py 
b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index 4ccf89de8d33..e94ad6759c70 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -19,12 +19,16 @@
 
 import argparse
 import logging
-import shutil
 import sys
-import tempfile
 import unittest
+from os import linesep
+from os import path
+from os.path import exists
+from shutil import rmtree
+from tempfile import mkdtemp
 
 import apache_beam as beam
+from apache_beam.metrics import Metrics
 from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import FlinkOptions
 from apache_beam.options.pipeline_options import PortableOptions
@@ -65,19 +69,58 @@ class 
FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
     _use_grpc = True
     _use_subprocesses = True
 
+    conf_dir = None
+
+    @classmethod
+    def tearDownClass(cls):
+      if cls.conf_dir and exists(cls.conf_dir):
+        logging.info("removing conf dir: %s" % cls.conf_dir)
+        rmtree(cls.conf_dir)
+      super(FlinkRunnerTest, cls).tearDownClass()
+
+    @classmethod
+    def _create_conf_dir(cls):
+      """Create (and save a static reference to) a "conf dir", used to provide
+       metrics configs and verify metrics output
+
+       It gets cleaned up when the suite is done executing"""
+
+      if hasattr(cls, 'conf_dir'):
+        cls.conf_dir = mkdtemp(prefix='flinktest-conf')
+
+        # path for a FileReporter to write metrics to
+        cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
+
+        # path to write Flink configuration to
+        conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
+        file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter'
+        with open(conf_path, 'w') as f:
+          f.write(linesep.join([
+              'metrics.reporters: file',
+              'metrics.reporter.file.class: %s' % file_reporter,
+              'metrics.reporter.file.path: %s' % cls.test_metrics_path
+          ]))
+
     @classmethod
     def _subprocess_command(cls, port):
-      tmp_dir = tempfile.mkdtemp(prefix='flinktest')
+      # will be cleaned up at the end of this method, and recreated and used by
+      # the job server
+      tmp_dir = mkdtemp(prefix='flinktest')
+
+      cls._create_conf_dir()
+
       try:
         return [
             'java',
             '-jar', flink_job_server_jar,
+            '--flink-master-url', '[local]',
+            '--flink-conf-dir', cls.conf_dir,
             '--artifacts-dir', tmp_dir,
             '--job-port', str(port),
             '--artifact-port', '0',
         ]
       finally:
-        shutil.rmtree(tmp_dir)
+        rmtree(tmp_dir)
 
     @classmethod
     def get_runner(cls):
@@ -120,6 +163,45 @@ def test_error_message_includes_stage(self):
     def test_error_traceback_includes_user_code(self):
       raise unittest.SkipTest("BEAM-6019")
 
+    def test_metrics(self):
+      """Run a simple DoFn that increments a counter, and verify that its
+       expected value is written to a temporary file by the FileReporter"""
+
+      counter_name = 'elem_counter'
+
+      class DoFn(beam.DoFn):
+        def __init__(self):
+          self.counter = Metrics.counter(self.__class__, counter_name)
+          logging.info('counter: %s' % self.counter.metric_name)
+
+        def process(self, v):
+          self.counter.inc()
+
+      p = self.create_pipeline()
+      n = 100
+
+      # pylint: disable=expression-not-assigned
+      p \
+      | beam.Create(list(range(n))) \
+      | beam.ParDo(DoFn())
+
+      result = p.run()
+      result.wait_until_finish()
+
+      with open(self.test_metrics_path, 'r') as f:
+        lines = [line for line in f.readlines() if counter_name in line]
+        self.assertEqual(
+            len(lines), 1,
+            msg='Expected 1 line matching "%s":\n%s' % (
+                counter_name, '\n'.join(lines))
+        )
+        line = lines[0]
+        self.assertTrue(
+            '%s: 100' % counter_name in line,
+            msg='Failed to find expected counter %s in line %s' % (
+                counter_name, line)
+        )
+
     # Inherits all other tests.
 
   # Run the tests.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 177572)
    Time Spent: 6h 20m  (was: 6h 10m)

> Send metrics to Flink in portable Flink runner
> ----------------------------------------------
>
>                 Key: BEAM-6165
>                 URL: https://issues.apache.org/jira/browse/BEAM-6165
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>    Affects Versions: 2.8.0
>            Reporter: Ryan Williams
>            Assignee: Ryan Williams
>            Priority: Major
>              Labels: metrics, portability, portability-flink
>          Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> Metrics are sent from the fn harness to runnerĀ in the Python SDK (and likely 
> Java soon), but the portable Flink runner doesn't pass them on to Flink, 
> which it should, so that users can see them in e.g. the Flink UI or via any 
> Flink metrics reporters.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to