[ 
https://issues.apache.org/jira/browse/BEAM-6597?focusedWorklogId=425980&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-425980
 ]

ASF GitHub Bot logged work on BEAM-6597:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Apr/20 01:38
            Start Date: 22/Apr/20 01:38
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on a change in pull request #11487:
URL: https://github.com/apache/beam/pull/11487#discussion_r412605654



##########
File path: 
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
##########
@@ -630,72 +644,100 @@ public void process(ProcessContext ctxt) {
               (Coder<WindowedValue<?>>) remoteOutputCoder.getValue(), 
outputContents::add));
     }
 
-    Iterable<String> sideInputData = Arrays.asList("A", "B", "C");
+    final String testPTransformId = "create/ParMultiDo(Metrics)";
+    BundleProgressHandler progressHandler =
+        new BundleProgressHandler() {
+          @Override
+          public void onProgress(ProcessBundleProgressResponse response) {
+            MetricsDoFn.ALLOW_COMPLETION.get(metricsDoFn.uuid).countDown();
+            List<Matcher<MonitoringInfo>> matchers = new ArrayList<>();
 
-    StateRequestHandler stateRequestHandler =
-        StateRequestHandlers.forSideInputHandlerFactory(
-            descriptor.getSideInputSpecs(),
-            new SideInputHandlerFactory() {
-              @Override
-              public <V, W extends BoundedWindow>
-                  IterableSideInputHandler<V, W> forIterableSideInput(
-                      String pTransformId,
-                      String sideInputId,
-                      Coder<V> elementCoder,
-                      Coder<W> windowCoder) {
-                throw new UnsupportedOperationException();
-              }
+            // We expect all user counters except for the ones in @FinishBundle
+            // Since non-user metrics are registered at bundle creation time, 
they will still report
+            // values most of which will be 0.
 
-              @Override
-              public <K, V, W extends BoundedWindow>
-                  MultimapSideInputHandler<K, V, W> forMultimapSideInput(
-                      String pTransformId,
-                      String sideInputId,
-                      KvCoder<K, V> elementCoder,
-                      Coder<W> windowCoder) {
-                return new MultimapSideInputHandler<K, V, W>() {
-                  @Override
-                  public Iterable<V> get(BoundedWindow window) {
-                    return null;
-                  }
+            SimpleMonitoringInfoBuilder builder = new 
SimpleMonitoringInfoBuilder();
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, 
RemoteExecutionTest.class.getName())
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAME, 
MetricsDoFn.PROCESS_USER_COUNTER_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, 
testPTransformId);
+            builder.setInt64SumValue(1);
+            
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
-                  @Override
-                  public Coder<K> keyCoder() {
-                    return elementCoder.getKeyCoder();
-                  }
+            builder = new SimpleMonitoringInfoBuilder();
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, 
RemoteExecutionTest.class.getName())
+                .setLabel(MonitoringInfoConstants.Labels.NAME, 
MetricsDoFn.START_USER_COUNTER_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, 
testPTransformId);
+            builder.setInt64SumValue(10);
+            
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
-                  @Override
-                  public Coder<V> valueCoder() {
-                    return elementCoder.getValueCoder();
-                  }
+            builder = new SimpleMonitoringInfoBuilder();
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, 
RemoteExecutionTest.class.getName())
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAME, 
MetricsDoFn.FINISH_USER_COUNTER_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, 
testPTransformId);
+            
matchers.add(not(MonitoringInfoMatchers.matchSetFields(builder.build())));
 
-                  @Override
-                  public Iterable<V> get(K key, W window) {
-                    return (Iterable) sideInputData;
-                  }
-                };
-              }
-            });
+            // User Distributions.
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, 
RemoteExecutionTest.class.getName())
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAME,
+                    MetricsDoFn.PROCESS_USER_DISTRIBUTION_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, 
testPTransformId);
+            builder.setInt64DistributionValue(DistributionData.create(1, 1, 1, 
1));
+            
matchers.add(MonitoringInfoMatchers.matchSetFields(builder.build()));
 
-    String testPTransformId = "create/ParMultiDo(Anonymous)";
-    BundleProgressHandler progressHandler =
-        new BundleProgressHandler() {
-          @Override
-          public void onProgress(ProcessBundleProgressResponse progress) {}
+            builder = new SimpleMonitoringInfoBuilder();
+            builder
+                .setUrn(MonitoringInfoConstants.Urns.USER_DISTRIBUTION_INT64)
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAMESPACE, 
RemoteExecutionTest.class.getName())
+                .setLabel(
+                    MonitoringInfoConstants.Labels.NAME, 
MetricsDoFn.START_USER_DISTRIBUTION_NAME);
+            builder.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, 
testPTransformId);
+            builder.setInt64DistributionValue(DistributionData.create(10, 1, 
10, 10));

Review comment:
       The values should only change if the test changes since they are 
expected to be emitted once. Only the msec counters are non-deterministic 
really.
   
   Also, the MonitoringInfo matcher only compares set fields and ignores fields 
that aren't set.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 425980)
    Time Spent: 40m  (was: 0.5h)

> Put MonitoringInfos/metrics in the Java SDK ProcessBundleProgressResponse
> -------------------------------------------------------------------------
>
>                 Key: BEAM-6597
>                 URL: https://issues.apache.org/jira/browse/BEAM-6597
>             Project: Beam
>          Issue Type: New Feature
>          Components: java-fn-execution
>            Reporter: Alex Amato
>            Assignee: Luke Cwik
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> I think this is the correct approach, as I don't believe there is any hook in 
> the Java SDK yet for ProcessBundleProgressResponses.
> (1) Implement ProcessBundleProgressResponse
> See FnHarness.main to add a handle for RequestCase.PROGRESS_BUNDLE.
> (2) Refactor ProgressBundleHandler so that the metrics can be extracted from 
> the MetricContainerStep map and SimpleExecutionStates for the instrucitonId 
> when the call comes in. (Right now all these objects only live in the local 
> function, they may need to live in an object instead which can be accessed by 
> both process bundle and progress bundle responses). Be careful to not 
> introduce thread contention. Ideally we need a way to read the values without 
> locking new ones from being written.
> (Test 1) Also be sure to simplify RemoteExecutionTest.testMetrics().
> By inspecting the metric progress, we can remove the sleeps from this code. 
> Currently there are sleeps in start, process and finish to ensure execution 
> time metrics are added. Instead, once progress bundle responses are 
> introduced, the metrics can be examined here



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to