[ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172891&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172891
 ]

ASF GitHub Bot logged work on BEAM-6181:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Dec/18 00:07
            Start Date: 07/Dec/18 00:07
    Worklog Time Spent: 10m 
      Work Description: swegner closed pull request #7202: [BEAM-6181] 
Reporting user counters via MonitoringInfos in Portable Dataflow Runner.
URL: https://github.com/apache/beam/pull/7202
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java
index 2d840e3f4356..28f239784bf7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java
@@ -263,6 +263,7 @@ private synchronized WorkItemStatus 
createStatusUpdate(boolean isFinal) {
     return status;
   }
 
+  // todo(migryz) this method should return List<CounterUpdate> instead of 
updating member variable
   @VisibleForTesting
   synchronized void populateCounterUpdates(WorkItemStatus status) {
     if (worker == null) {
@@ -270,13 +271,18 @@ synchronized void populateCounterUpdates(WorkItemStatus 
status) {
     }
 
     boolean isFinalUpdate = Boolean.TRUE.equals(status.getCompleted());
-    ImmutableList.Builder<CounterUpdate> counterUpdatesBuilder = 
ImmutableList.builder();
-    counterUpdatesBuilder.addAll(extractCounters(worker.getOutputCounters()));
-    counterUpdatesBuilder.addAll(extractMetrics(isFinalUpdate));
-    counterUpdatesBuilder.addAll(extractMsecCounters(isFinalUpdate));
-    counterUpdatesBuilder.addAll(worker.extractMetricUpdates());
 
-    ImmutableList<CounterUpdate> counterUpdates = 
counterUpdatesBuilder.build();
+    ImmutableList.Builder<CounterUpdate> counterUpdatesListBuilder = 
ImmutableList.builder();
+    // Output counters
+    
counterUpdatesListBuilder.addAll(extractCounters(worker.getOutputCounters()));
+    // User metrics reported in Worker
+    counterUpdatesListBuilder.addAll(extractMetrics(isFinalUpdate));
+    // MSec counters reported in worker
+    counterUpdatesListBuilder.addAll(extractMsecCounters(isFinalUpdate));
+    // Metrics reported in SDK runner.
+    counterUpdatesListBuilder.addAll(worker.extractMetricUpdates());
+
+    ImmutableList<CounterUpdate> counterUpdates = 
counterUpdatesListBuilder.build();
     status.setCounterUpdates(counterUpdates);
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
index bc0fb54d9521..6c1d43f952f9 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
@@ -17,20 +17,25 @@
  */
 package org.apache.beam.runners.dataflow.worker.fn.control;
 
+import com.google.api.services.dataflow.model.CounterMetadata;
+import com.google.api.services.dataflow.model.CounterStructuredName;
+import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata;
 import com.google.api.services.dataflow.model.CounterUpdate;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Iterables;
 import io.opencensus.common.Scope;
 import io.opencensus.trace.SpanBuilder;
 import io.opencensus.trace.Tracer;
 import io.opencensus.trace.Tracing;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
@@ -39,20 +44,27 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Metrics;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
+import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
 import org.apache.beam.runners.core.construction.metrics.MetricKey;
 import org.apache.beam.runners.core.metrics.DistributionData;
 import org.apache.beam.runners.core.metrics.GaugeData;
 import org.apache.beam.runners.core.metrics.MetricUpdates;
+import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
 import org.apache.beam.runners.core.metrics.MetricsTranslation;
+import 
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
 import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor;
 import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter;
+import 
org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Origin;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
+import 
org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
 import 
org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitRequest;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitResult;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.Progress;
@@ -62,7 +74,6 @@
 import org.apache.beam.sdk.util.MoreFutures;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 /**
  * A {@link WorkExecutor} that processes a list of {@link Operation}s.
  *
@@ -128,7 +139,7 @@ private AutoCloseable 
progressTrackerCloseable(ProgressTracker progressTracker)
 
   @Override
   @Nullable
-  public NativeReader.Progress getWorkerProgress() throws Exception {
+  public Progress getWorkerProgress() throws Exception {
     return progressTracker.getWorkerProgress();
   }
 
@@ -139,30 +150,41 @@ private AutoCloseable 
progressTrackerCloseable(ProgressTracker progressTracker)
    */
   @Override
   public Iterable<CounterUpdate> extractMetricUpdates() {
+    List<CounterUpdate> result = progressTracker.extractCounterUpdates();
+    if ((result != null) && (result.size() > 0)) {
+      return result;
+    }
+
+    // todo(BEAM-6189): Remove this fallback once Metrics is deprecated from 
SDKs.
     MetricUpdates updates = progressTracker.extractMetricUpdates();
-    return Iterables.concat(
-        FluentIterable.from(updates.counterUpdates())
-            .transform(
-                update ->
-                    MetricsToCounterUpdateConverter.fromCounter(
-                        update.getKey(), true, update.getUpdate())),
-        FluentIterable.from(updates.distributionUpdates())
-            .transform(
-                update ->
-                    MetricsToCounterUpdateConverter.fromDistribution(
-                        update.getKey(), true, update.getUpdate())));
+
+    Iterable<CounterUpdate> deprecatedMetrics =
+        Iterables.concat(
+            StreamSupport.stream(updates.counterUpdates().spliterator(), false)
+                .map(
+                    update ->
+                        MetricsToCounterUpdateConverter.fromCounter(
+                            update.getKey(), true, update.getUpdate()))
+                .collect(Collectors.toList()),
+            StreamSupport.stream(updates.distributionUpdates().spliterator(), 
false)
+                .map(
+                    update ->
+                        MetricsToCounterUpdateConverter.fromDistribution(
+                            update.getKey(), true, update.getUpdate()))
+                .collect(Collectors.toList()));
+
+    return deprecatedMetrics;
   }
 
   @Override
   @Nullable
-  public NativeReader.DynamicSplitResult requestCheckpoint() throws Exception {
+  public DynamicSplitResult requestCheckpoint() throws Exception {
     return progressTracker.requestCheckpoint();
   }
 
   @Override
   @Nullable
-  public NativeReader.DynamicSplitResult requestDynamicSplit(
-      NativeReader.DynamicSplitRequest splitRequest) throws Exception {
+  public DynamicSplitResult requestDynamicSplit(DynamicSplitRequest 
splitRequest) throws Exception {
     return progressTracker.requestDynamicSplit(splitRequest);
   }
 
@@ -177,21 +199,24 @@ public ReadOperation getReadOperation() throws Exception {
     throw new IllegalStateException(String.format("ReadOperation not found in 
%s", operations));
   }
 
-  private static interface ProgressTracker {
+  private interface ProgressTracker {
     @Nullable
-    public NativeReader.Progress getWorkerProgress() throws Exception;
+    public Progress getWorkerProgress() throws Exception;
 
     /**
      * Returns an metric updates accumulated since the last call to {@link 
#extractMetricUpdates()}.
      */
+    @Deprecated
     public MetricUpdates extractMetricUpdates();
 
+    public List<CounterUpdate> extractCounterUpdates();
+
     @Nullable
-    public NativeReader.DynamicSplitResult requestCheckpoint() throws 
Exception;
+    public DynamicSplitResult requestCheckpoint() throws Exception;
 
     @Nullable
-    public NativeReader.DynamicSplitResult requestDynamicSplit(
-        NativeReader.DynamicSplitRequest splitRequest) throws Exception;
+    public DynamicSplitResult requestDynamicSplit(DynamicSplitRequest 
splitRequest)
+        throws Exception;
 
     public default void start() {}
 
@@ -210,6 +235,11 @@ public MetricUpdates extractMetricUpdates() {
       return MetricUpdates.EMPTY;
     }
 
+    @Override
+    public List<CounterUpdate> extractCounterUpdates() {
+      return Collections.emptyList();
+    }
+
     @Nullable
     @Override
     public DynamicSplitResult requestCheckpoint() {
@@ -241,6 +271,11 @@ public MetricUpdates extractMetricUpdates() {
       return MetricUpdates.EMPTY;
     }
 
+    @Override
+    public List<CounterUpdate> extractCounterUpdates() {
+      return Collections.emptyList();
+    }
+
     @Nullable
     @Override
     public DynamicSplitResult requestCheckpoint() throws Exception {
@@ -272,9 +307,11 @@ public DynamicSplitResult 
requestDynamicSplit(DynamicSplitRequest splitRequest)
     private ScheduledFuture<?> nextProgressFuture;
     private final Consumer<Integer> grpcWriteOperationElementsProcessed;
 
-    private final Map<MetricKey, MetricUpdates.MetricUpdate<Long>> 
counterUpdates;
-    private final Map<MetricKey, MetricUpdates.MetricUpdate<DistributionData>> 
distributionUpdates;
-    private final Map<MetricKey, MetricUpdates.MetricUpdate<GaugeData>> 
gaugeUpdates;
+    private List<CounterUpdate> counterUpdates = new ArrayList<>();
+
+    private final Map<MetricKey, MetricUpdate<Long>> deprecatedCounterUpdates;
+    private final Map<MetricKey, MetricUpdate<DistributionData>> 
deprecatedDistributionUpdates;
+    private final Map<MetricKey, MetricUpdate<GaugeData>> 
deprecatedGaugeUpdates;
 
     public SingularProcessBundleProgressTracker(
         ReadOperation readOperation,
@@ -291,9 +328,9 @@ protected Progress interpolate(Progress prev, Progress 
next, double fraction) {
               return prev;
             }
           };
-      this.counterUpdates = new HashMap<>();
-      this.distributionUpdates = new HashMap<>();
-      this.gaugeUpdates = new HashMap<>();
+      this.deprecatedCounterUpdates = new HashMap<>();
+      this.deprecatedDistributionUpdates = new HashMap<>();
+      this.deprecatedGaugeUpdates = new HashMap<>();
     }
 
     private void periodicProgressUpdate() {
@@ -307,10 +344,18 @@ void updateProgress() {
           grpcWriteOperation.abortWait();
         }
 
-        BeamFnApi.Metrics metrics = 
MoreFutures.get(bundleProcessOperation.getMetrics());
+        // TODO(BEAM-6189): Replace getProcessBundleProgress with 
getMonitoringInfos when Metrics
+        // is deprecated.
+        ProcessBundleProgressResponse processBundleProgressResponse =
+            MoreFutures.get(bundleProcessOperation.getProcessBundleProgress());
+        updateMetrics(processBundleProgressResponse.getMonitoringInfosList());
 
-        updateMetrics(metrics);
+        // Supporting deprecated metrics until all supported runners are 
migrated to using
+        // MonitoringInfos
+        Metrics metrics = processBundleProgressResponse.getMetrics();
+        updateMetricsDeprecated(metrics);
 
+        // todo(migryz): utilize monitoringInfos here.
         double elementsConsumed = 
bundleProcessOperation.getInputElementsConsumed(metrics);
 
         grpcWriteOperationElementsProcessed.accept((int) elementsConsumed);
@@ -351,7 +396,110 @@ void updateProgress() {
       }
     }
 
-    private void updateMetrics(BeamFnApi.Metrics metrics) {
+    // Will extract to separate file and generalize when more counter types 
are added.
+    // todo(migryz): define counter transformer factory
+    // that can provide respective counter transformer for different type of 
counters.
+    // (ie RowCountCounterTranformer, MSecCounterTransformer, 
UserCounterTransformer, etc)
+    private static class MonitoringInfoToCounterUpdateTransformer {
+
+      private final Map<String, DataflowStepContext> transformIdMapping;
+
+      public MonitoringInfoToCounterUpdateTransformer(
+          final Map<String, DataflowStepContext> transformIdMapping) {
+        this.transformIdMapping = transformIdMapping;
+      }
+
+      // todo: search code for "beam:metrics"... and replace them with 
relevant enums from
+      // proto after rebasing above https://github.com/apache/beam/pull/6799 
that
+      // introduces relevant proto entries.
+      final String BEAM_METRICS_USER_PREFIX = "beam:metric:user";
+
+      private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo 
monitoringInfo) {
+        long value = 
monitoringInfo.getMetric().getCounterData().getInt64Value();
+        String urn = monitoringInfo.getUrn();
+
+        String type = monitoringInfo.getType();
+
+        // todo(migryz): run MonitoringInfo through Proto validation process.
+        // Requires https://github.com/apache/beam/pull/6799 to be merged.
+        if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) {
+          if (!type.equals("beam:metrics:sum_int_64")) {
+            throw new RuntimeException(
+                "Encountered user-counter MonitoringInfo with unexpected type."
+                    + "Expected: beam:metrics:sum_int_64. Received: "
+                    + monitoringInfo.toString());
+          }
+
+          final String ptransform = 
monitoringInfo.getLabelsMap().get("PTRANSFORM");
+          if (ptransform == null) {
+            throw new RuntimeException(
+                "Encountered user-counter MonitoringInfo with missing 
ptransformId: "
+                    + monitoringInfo.toString());
+          }
+
+          DataflowStepContext stepContext = transformIdMapping.get(ptransform);
+          if (stepContext == null) {
+            throw new RuntimeException(
+                "Encountered user-counter MonitoringInfo with unknown 
ptransformId: "
+                    + monitoringInfo.toString());
+          }
+
+          CounterStructuredNameAndMetadata name = new 
CounterStructuredNameAndMetadata();
+
+          String nameWithNamespace =
+              monitoringInfo
+                  .getUrn()
+                  .substring(BEAM_METRICS_USER_PREFIX.length())
+                  .replace("^:", "");
+
+          final int lastColonIndex = nameWithNamespace.lastIndexOf(':');
+          String counterName = nameWithNamespace.substring(lastColonIndex + 1);
+          String counterNamespace = nameWithNamespace.substring(0, 
lastColonIndex);
+
+          name.setName(
+                  new CounterStructuredName()
+                      .setOrigin(Origin.USER.toString())
+                      // Workaround for bug in python sdk that missed colon 
after ...metric:user.
+                      .setName(counterName)
+                      
.setOriginalStepName(stepContext.getNameContext().originalName())
+                      
.setExecutionStepName(stepContext.getNameContext().systemName())
+                      .setOriginNamespace(counterNamespace))
+              .setMetadata(new CounterMetadata().setKind("SUM"));
+
+          return new CounterUpdate()
+              .setStructuredNameAndMetadata(name)
+              .setCumulative(false)
+              
.setInteger(DataflowCounterUpdateExtractor.longToSplitInt(value));
+        }
+        return null;
+      }
+    }
+
+    /**
+     * Updates internal metrics state from provided monitoringInfos list.
+     *
+     * @param monitoringInfos Usually received from FnApi.
+     */
+    private void updateMetrics(List<MonitoringInfo> monitoringInfos) {
+      final MonitoringInfoToCounterUpdateTransformer 
monitoringInfoToCounterUpdateTransformer =
+          new MonitoringInfoToCounterUpdateTransformer(
+              bundleProcessOperation.getPtransformIdToUserStepContext());
+
+      counterUpdates =
+          monitoringInfos
+              .stream()
+              
.map(monitoringInfoToCounterUpdateTransformer::monitoringInfoToCounterUpdate)
+              .filter(Objects::nonNull)
+              .collect(Collectors.toList());
+    }
+
+    /**
+     * Updates internal metrics from provided (deprecated) Metrics object.
+     *
+     * @param metrics Metrics object received from FnApi.
+     */
+    @Deprecated
+    private void updateMetricsDeprecated(Metrics metrics) {
       metrics
           .getPtransformsMap()
           .entrySet()
@@ -360,19 +508,17 @@ private void updateMetrics(BeamFnApi.Metrics metrics) {
                 MetricUpdates ptransformMetricUpdates =
                     MetricsTranslation.metricUpdatesFromProto(
                         ptransformEntry.getKey(), 
ptransformEntry.getValue().getUserList());
-                for (MetricUpdates.MetricUpdate<Long> update :
-                    ptransformMetricUpdates.counterUpdates()) {
-                  counterUpdates.put(update.getKey(), update);
+                for (MetricUpdate<Long> update : 
ptransformMetricUpdates.counterUpdates()) {
+                  deprecatedCounterUpdates.put(update.getKey(), update);
                 }
 
-                for (MetricUpdates.MetricUpdate<DistributionData> update :
+                for (MetricUpdate<DistributionData> update :
                     ptransformMetricUpdates.distributionUpdates()) {
-                  distributionUpdates.put(update.getKey(), update);
+                  deprecatedDistributionUpdates.put(update.getKey(), update);
                 }
 
-                for (MetricUpdates.MetricUpdate<GaugeData> update :
-                    ptransformMetricUpdates.gaugeUpdates()) {
-                  gaugeUpdates.put(update.getKey(), update);
+                for (MetricUpdate<GaugeData> update : 
ptransformMetricUpdates.gaugeUpdates()) {
+                  deprecatedGaugeUpdates.put(update.getKey(), update);
                 }
               });
     }
@@ -383,12 +529,17 @@ public Progress getWorkerProgress() throws Exception {
       return latestProgress.get();
     }
 
+    @Override
+    public List<CounterUpdate> extractCounterUpdates() {
+      return counterUpdates;
+    }
+
     @Override
     public MetricUpdates extractMetricUpdates() {
-      Map<MetricKey, MetricUpdates.MetricUpdate<Long>> snapshotCounterUpdates 
= counterUpdates;
-      Map<MetricKey, MetricUpdates.MetricUpdate<DistributionData>> 
snapshotDistributionUpdates =
-          distributionUpdates;
-      Map<MetricKey, MetricUpdates.MetricUpdate<GaugeData>> 
snapshotGaugeUpdates = gaugeUpdates;
+      Map<MetricKey, MetricUpdate<Long>> snapshotCounterUpdates = 
deprecatedCounterUpdates;
+      Map<MetricKey, MetricUpdate<DistributionData>> 
snapshotDistributionUpdates =
+          deprecatedDistributionUpdates;
+      Map<MetricKey, MetricUpdate<GaugeData>> snapshotGaugeUpdates = 
deprecatedGaugeUpdates;
       return MetricUpdates.create(
           snapshotCounterUpdates.values(),
           snapshotDistributionUpdates.values(),
@@ -435,11 +586,12 @@ public void stop() {
 
       // Set final metrics to precisely the values in this update. This should 
overwrite, not
       // be combined with, all prior updates.
-      counterUpdates.clear();
-      distributionUpdates.clear();
-      gaugeUpdates.clear();
+      deprecatedCounterUpdates.clear();
+      deprecatedDistributionUpdates.clear();
+      deprecatedGaugeUpdates.clear();
       try {
-        
updateMetrics(MoreFutures.get(bundleProcessOperation.getFinalMetrics()));
+        
updateMetrics(MoreFutures.get(bundleProcessOperation.getFinalMonitoringInfos()));
+        
updateMetricsDeprecated(MoreFutures.get(bundleProcessOperation.getFinalMetrics()));
       } catch (ExecutionException | InterruptedException exn) {
         LOG.info("Failed to get final metrics for bundle", exn);
       }
@@ -569,12 +721,15 @@ private ProgressTracker createProgressTracker() {
     ReadOperation readOperation;
     RemoteGrpcPortWriteOperation grpcWriteOperation;
     RegisterAndProcessBundleOperation bundleProcessOperation;
+
     try {
       readOperation = getReadOperation();
     } catch (Exception exn) {
       readOperation = null;
       LOG.info("Unable to get read operation.", exn);
+      return new NullProgressTracker();
     }
+
     // If there is a exactly one of each of RemoteGrpcPortWriteOperation and
     // RegisterAndProcessBundleOperation we know they have the right topology.
     try {
@@ -591,13 +746,11 @@ private ProgressTracker createProgressTracker() {
       LOG.debug("Does not have exactly one grpcWRite and bundleProcess 
operation.", exn);
     }
 
-    if (readOperation != null && grpcWriteOperation != null && 
bundleProcessOperation != null) {
+    if (grpcWriteOperation != null && bundleProcessOperation != null) {
       return new SingularProcessBundleProgressTracker(
           readOperation, grpcWriteOperation, bundleProcessOperation);
-    } else if (readOperation != null) {
-      return new ReadOperationProgressTracker(readOperation);
     } else {
-      return new NullProgressTracker();
+      return new ReadOperationProgressTracker(readOperation);
     }
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
index c316ea71dfb3..b9800ff4a1da 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperation.java
@@ -26,6 +26,7 @@
 import com.google.common.collect.Table;
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.CompletableFuture;
@@ -36,6 +37,7 @@
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
 import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleRequest;
@@ -219,6 +221,8 @@ private static String escapeDot(String s) {
   /**
    * Returns an id for the current bundle being processed.
    *
+   * <p>Generates new id with idGenerator if no id is cached.
+   *
    * <p><b>Note</b>: This operation could be used across multiple bundles, so 
a unique id is
    * generated for every bundle. {@link Operation Operations} accessing the 
bundle id should only
    * call this once per bundle and cache the id in the {@link 
Operation#start()} method and clear it
@@ -291,6 +295,10 @@ public void abort() throws Exception {
     }
   }
 
+  public Map<String, DataflowStepContext> getPtransformIdToUserStepContext() {
+    return ptransformIdToUserStepContext;
+  }
+
   /**
    * Returns the compound metrics recorded, by issuing a request to the SDK 
harness.
    *
@@ -305,13 +313,16 @@ public void abort() throws Exception {
    * @throws InterruptedException
    * @throws ExecutionException
    */
-  public CompletionStage<BeamFnApi.Metrics> getMetrics()
+  public CompletionStage<BeamFnApi.ProcessBundleProgressResponse> 
getProcessBundleProgress()
       throws InterruptedException, ExecutionException {
     // processBundleId may be reset if this bundle finishes asynchronously.
     String processBundleId = this.processBundleId;
+
     if (processBundleId == null) {
-      return 
CompletableFuture.completedFuture(BeamFnApi.Metrics.getDefaultInstance());
+      return CompletableFuture.completedFuture(
+          BeamFnApi.ProcessBundleProgressResponse.getDefaultInstance());
     }
+
     InstructionRequest processBundleRequest =
         InstructionRequest.newBuilder()
             .setInstructionId(idGenerator.getId())
@@ -326,7 +337,7 @@ public void abort() throws Exception {
               if (!response.getError().isEmpty()) {
                 throw new IllegalStateException(response.getError());
               }
-              return response.getProcessBundleProgress().getMetrics();
+              return response.getProcessBundleProgress();
             });
   }
 
@@ -336,6 +347,11 @@ public void abort() throws Exception {
         .thenApply(response -> response.getMetrics());
   }
 
+  public CompletionStage<List<MonitoringInfo>> getFinalMonitoringInfos() {
+    return getProcessBundleResponse(processBundleResponse)
+        .thenApply(response -> response.getMonitoringInfosList());
+  }
+
   public boolean hasFailed() throws ExecutionException, InterruptedException {
     if (processBundleResponse != null && 
processBundleResponse.toCompletableFuture().isDone()) {
       return 
!processBundleResponse.toCompletableFuture().get().getError().isEmpty();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
index dd07d994cea0..1c52a5a2c727 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
@@ -29,13 +29,21 @@
 import com.google.common.collect.ImmutableTable;
 import com.google.common.collect.Iterables;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CountDownLatch;
+import javax.annotation.Nullable;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import 
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
+import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import 
org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation;
 import org.apache.beam.runners.dataflow.worker.util.CounterHamcrestMatchers;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
@@ -44,9 +52,12 @@
 import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
 import org.apache.beam.runners.fnexecution.state.StateDelegator;
 import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.MoreFutures;
+import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -306,7 +317,7 @@ public void close() {}
   }
 
   @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10)
-  public void testFinalUserMetrics() throws Exception {
+  public void testFinalUserMetricsDeprecated() throws Exception {
     final String stepName = "fakeStepNameWithUserMetrics";
     final String namespace = "sdk/whatever";
     final String name = "someCounter";
@@ -423,6 +434,196 @@ public void close() {}
         contains(new 
CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(finalCounterValue)));
   }
 
+  @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10)
+  public void 
testExtractCounterUpdatesReturnsValidProgressTrackerCounterUpdatesIfPresent()
+      throws Exception {
+    final String stepName = "fakeStepNameWithUserMetrics";
+    final String namespace = "sdk/whatever";
+    final String name = "someCounter";
+    final int counterValue = 42;
+    final int finalCounterValue = 77;
+    final CountDownLatch progressSentLatch = new CountDownLatch(1);
+    final CountDownLatch processBundleLatch = new CountDownLatch(1);
+
+    final BeamFnApi.Metrics.User.MetricName metricName =
+        BeamFnApi.Metrics.User.MetricName.newBuilder()
+            .setNamespace(namespace)
+            .setName(name)
+            .build();
+
+    final BeamFnApi.Metrics deprecatedMetrics =
+        BeamFnApi.Metrics.newBuilder()
+            .putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
+            .putPtransforms(
+                stepName,
+                BeamFnApi.Metrics.PTransform.newBuilder()
+                    .addUser(
+                        BeamFnApi.Metrics.User.newBuilder()
+                            .setMetricName(metricName)
+                            .setCounterData(
+                                BeamFnApi.Metrics.User.CounterData.newBuilder()
+                                    .setValue(finalCounterValue)))
+                    .build())
+            .build();
+
+    final int expectedCounterValue = 5;
+    final BeamFnApi.MonitoringInfo expectedMonitoringInfo =
+        BeamFnApi.MonitoringInfo.newBuilder()
+            .setUrn("beam:metric:user:ExpectedCounter")
+            .setType("beam:metrics:sum_int_64")
+            .putLabels("PTRANSFORM", "ExpectedPTransform")
+            .setMetric(
+                BeamFnApi.Metric.newBuilder()
+                    .setCounterData(
+                        BeamFnApi.CounterData.newBuilder()
+                            .setInt64Value(expectedCounterValue)
+                            .build())
+                    .build())
+            .build();
+
+    InstructionRequestHandler instructionRequestHandler =
+        new InstructionRequestHandler() {
+          @Override
+          public CompletionStage<InstructionResponse> 
handle(InstructionRequest request) {
+            switch (request.getRequestCase()) {
+              case REGISTER:
+                return 
CompletableFuture.completedFuture(responseFor(request).build());
+              case PROCESS_BUNDLE:
+                return MoreFutures.supplyAsync(
+                    () -> {
+                      processBundleLatch.await();
+                      return responseFor(request)
+                          .setProcessBundle(
+                              BeamFnApi.ProcessBundleResponse.newBuilder()
+                                  .setMetrics(deprecatedMetrics)
+                                  .addMonitoringInfos(expectedMonitoringInfo))
+                          .build();
+                    });
+              case PROCESS_BUNDLE_PROGRESS:
+                progressSentLatch.countDown();
+                return CompletableFuture.completedFuture(
+                    responseFor(request)
+                        .setProcessBundleProgress(
+                            
BeamFnApi.ProcessBundleProgressResponse.newBuilder()
+                                .setMetrics(deprecatedMetrics)
+                                .addMonitoringInfos(expectedMonitoringInfo))
+                        .build());
+              default:
+                throw new RuntimeException("Reached unexpected code path");
+            }
+          }
+
+          @Override
+          public void close() {}
+        };
+
+    Map<String, DataflowStepContext> stepContextMap = new HashMap<>();
+    stepContextMap.put("ExpectedPTransform", 
generateDataflowStepContext("Expected"));
+
+    RegisterAndProcessBundleOperation processOperation =
+        new RegisterAndProcessBundleOperation(
+            IdGenerators.decrementingLongs(),
+            instructionRequestHandler,
+            mockBeamFnStateDelegator,
+            REGISTER_REQUEST,
+            ImmutableMap.of(),
+            stepContextMap,
+            ImmutableMap.of(),
+            ImmutableTable.of(),
+            mockContext);
+
+    BeamFnMapTaskExecutor mapTaskExecutor =
+        BeamFnMapTaskExecutor.forOperations(
+            ImmutableList.of(readOperation, grpcPortWriteOperation, 
processOperation),
+            executionStateTracker);
+
+    // Launch the BeamFnMapTaskExecutor and wait until we are sure there has 
been one
+    // tentative update
+    CompletionStage<Void> doneFuture = 
MoreFutures.runAsync(mapTaskExecutor::execute);
+    progressSentLatch.await();
+
+    Iterable<CounterUpdate> metricsCounterUpdates = Collections.emptyList();
+    while (Iterables.size(metricsCounterUpdates) == 0) {
+      Thread.sleep(ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS);
+      metricsCounterUpdates = mapTaskExecutor.extractMetricUpdates();
+    }
+
+    // Get the final metrics
+    processBundleLatch.countDown();
+    MoreFutures.get(doneFuture);
+    metricsCounterUpdates = mapTaskExecutor.extractMetricUpdates();
+
+    assertThat(Iterables.size(metricsCounterUpdates), equalTo(1));
+
+    assertThat(
+        metricsCounterUpdates,
+        contains(
+            new 
CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(expectedCounterValue)));
+  }
+
+  /**
+   * Generates bare minumum DataflowStepContext to use for testing.
+   *
+   * @param valuesPrefix prefix for all types of names that are specified in 
DataflowStepContext.
+   * @return new instance of DataflowStepContext
+   */
+  private DataflowStepContext generateDataflowStepContext(String valuesPrefix) 
{
+    NameContext nc =
+        new NameContext() {
+          @Nullable
+          @Override
+          public String stageName() {
+            return valuesPrefix + "Stage";
+          }
+
+          @Nullable
+          @Override
+          public String originalName() {
+            return valuesPrefix + "OriginalName";
+          }
+
+          @Nullable
+          @Override
+          public String systemName() {
+            return valuesPrefix + "SystemName";
+          }
+
+          @Nullable
+          @Override
+          public String userName() {
+            return valuesPrefix + "UserName";
+          }
+        };
+    DataflowStepContext dsc =
+        new DataflowStepContext(nc) {
+          @Nullable
+          @Override
+          public <W extends BoundedWindow> TimerData 
getNextFiredTimer(Coder<W> windowCoder) {
+            return null;
+          }
+
+          @Override
+          public <W extends BoundedWindow> void setStateCleanupTimer(
+              String timerId, W window, Coder<W> windowCoder, Instant 
cleanupTime) {}
+
+          @Override
+          public DataflowStepContext namespacedToUser() {
+            return this;
+          }
+
+          @Override
+          public StateInternals stateInternals() {
+            return null;
+          }
+
+          @Override
+          public TimerInternals timerInternals() {
+            return null;
+          }
+        };
+    return dsc;
+  }
+
   private BeamFnApi.InstructionResponse.Builder 
responseFor(BeamFnApi.InstructionRequest request) {
     return 
BeamFnApi.InstructionResponse.newBuilder().setInstructionId(request.getInstructionId());
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
index 8e24bf8603cb..303cb51eb128 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
@@ -23,6 +23,7 @@
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
@@ -51,6 +52,7 @@
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
 import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest.RequestCase;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
+import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateAppendRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateGetRequest;
@@ -322,7 +324,7 @@ public void close() {}
 
     operation.start();
 
-    BeamFnApi.Metrics metrics = MoreFutures.get(operation.getMetrics());
+    BeamFnApi.Metrics metrics = 
MoreFutures.get(operation.getProcessBundleProgress()).getMetrics();
     assertThat(metrics.getPtransformsOrThrow(stepName).getUserCount(), 
equalTo(1));
 
     BeamFnApi.Metrics.User userMetric = 
metrics.getPtransformsOrThrow(stepName).getUser(0);
@@ -426,7 +428,7 @@ public void close() {}
     operation.start();
 
     // Force some intermediate metrics to test crosstalk is not introduced
-    BeamFnApi.Metrics metrics = MoreFutures.get(operation.getMetrics());
+    BeamFnApi.Metrics metrics = 
MoreFutures.get(operation.getProcessBundleProgress()).getMetrics();
     BeamFnApi.Metrics.User userMetric = 
metrics.getPtransformsOrThrow(stepName).getUser(0);
     assertThat(userMetric.getMetricName(), equalTo(metricName));
     assertThat(userMetric.getCounterData().getValue(), 
not(equalTo(finalCounterValue)));
@@ -857,4 +859,58 @@ private void completeFuture(
             .setInstructionId(request.getInstructionId())
             .build());
   }
+
+  @Test
+  public void 
testGetProcessBundleProgressReturnsDefaultInstanceIfNoBundleIdCached()
+      throws Exception {
+    InstructionRequestHandler mockInstructionRequestHandler = 
mock(InstructionRequestHandler.class);
+
+    RegisterAndProcessBundleOperation operation =
+        new RegisterAndProcessBundleOperation(
+            IdGenerators.decrementingLongs(),
+            mockInstructionRequestHandler,
+            mockBeamFnStateDelegator,
+            REGISTER_REQUEST,
+            ImmutableMap.of(),
+            ImmutableMap.of(),
+            ImmutableMap.of(),
+            ImmutableTable.of(),
+            mockContext);
+
+    assertEquals(
+        ProcessBundleProgressResponse.getDefaultInstance(),
+        MoreFutures.get(operation.getProcessBundleProgress()));
+  }
+
+  @Test
+  public void 
testGetProcessBundleProgressFetchesProgressResponseWhenBundleIdCached()
+      throws Exception {
+    InstructionRequestHandler mockInstructionRequestHandler = 
mock(InstructionRequestHandler.class);
+
+    RegisterAndProcessBundleOperation operation =
+        new RegisterAndProcessBundleOperation(
+            IdGenerators.decrementingLongs(),
+            mockInstructionRequestHandler,
+            mockBeamFnStateDelegator,
+            REGISTER_REQUEST,
+            ImmutableMap.of(),
+            ImmutableMap.of(),
+            ImmutableMap.of(),
+            ImmutableTable.of(),
+            mockContext);
+
+    operation.getProcessBundleInstructionId(); // this generates and caches 
bundleId
+
+    ProcessBundleProgressResponse expectedResult =
+        ProcessBundleProgressResponse.newBuilder().build();
+    InstructionResponse instructionResponse =
+        
InstructionResponse.newBuilder().setProcessBundleProgress(expectedResult).build();
+    CompletableFuture resultFuture = 
CompletableFuture.completedFuture(instructionResponse);
+    when(mockInstructionRequestHandler.handle(any())).thenReturn(resultFuture);
+
+    final ProcessBundleProgressResponse result =
+        MoreFutures.get(operation.getProcessBundleProgress());
+
+    assertSame("Return value from mockInstructionRequestHandler", 
expectedResult, result);
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/SingularProcessBundleProgressTrackerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/SingularProcessBundleProgressTrackerTest.java
index 5179268ac31d..b46982994847 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/SingularProcessBundleProgressTrackerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/SingularProcessBundleProgressTrackerTest.java
@@ -78,8 +78,10 @@ public void testProgressInterpolation() throws Exception {
     when(grpcWrite.getElementsSent()).thenReturn(1, 10, 20, 30);
 
     // This test ignores them, directly working on mocked 
getInputElementsConsumed
-    when(process.getMetrics())
-        
.thenReturn(CompletableFuture.completedFuture(BeamFnApi.Metrics.getDefaultInstance()));
+    when(process.getProcessBundleProgress())
+        .thenReturn(
+            CompletableFuture.completedFuture(
+                BeamFnApi.ProcessBundleProgressResponse.getDefaultInstance()));
 
     when(process.getInputElementsConsumed(any(BeamFnApi.Metrics.class)))
         .thenReturn(1.0, 4.0, 10.0)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 172891)
    Time Spent: 6.5h  (was: 6h 20m)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> -------------------------------------------------------------------------------
>
>                 Key: BEAM-6181
>                 URL: https://issues.apache.org/jira/browse/BEAM-6181
>             Project: Beam
>          Issue Type: Bug
>          Components: java-fn-execution
>            Reporter: Mikhail Gryzykhin
>            Assignee: Mikhail Gryzykhin
>            Priority: Major
>          Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> New approach to report metrics in FnApi is to utilize MetricInfo structures.
> This approach is implemented in Python SDK and work is ongoing in Java SDK.
> This tasks includes plumbing User metrics reported via MetricInfos through 
> Dataflow Java Runner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to