scwhittle commented on code in PR #23333:
URL: https://github.com/apache/beam/pull/23333#discussion_r988819377


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3081,6 +3089,222 @@ public void testActiveWorkRefresh() throws Exception {
     assertThat(server.numGetDataRequests(), greaterThan(0));
   }
 
+  // A class that aggregates LatencyAttribution data from active work refresh 
requests.
+  private static class ActiveWorkRefreshSink {
+    Map<Long, EnumMap<LatencyAttribution.State, Duration>> totalDurations = 
new HashMap<>();
+
+    // Accessor for reading out aggregated LatencyAttribution data.
+    Duration getLatencyAttributionDuration(long workToken, 
LatencyAttribution.State state) {
+      EnumMap<LatencyAttribution.State, Duration> durations = 
totalDurations.get(workToken);
+      if (durations == null) {
+        return Duration.ZERO;
+      }
+      Duration d = durations.get(state);

Review Comment:
   getWithDefault



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3046,19 +3048,25 @@ public void testHugeCommits() throws Exception {
 
   private static class SlowDoFn extends DoFn<String, String> {
 
+    private final Duration sleep;
+
+    SlowDoFn(Duration sleep) {
+      this.sleep = sleep;
+    }
+
+    SlowDoFn() {
+      this(Duration.millis(1000));
+    }
+
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
-      Thread.sleep(1000);
+      Uninterruptibles.sleepUninterruptibly(sleep.getMillis(), 
TimeUnit.MILLISECONDS);

Review Comment:
   would be better to inject a clock to this and the same one into 
StreamingDataflowWorker instead of relying on sleep timing
   
   Can you take a look at if that would be not too bad to add? Somewhat 
existing but would be nice to cleanup while modifying



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1406,7 +1422,13 @@ private void process(
               computationId,
               key,
               workItem.getShardingKey(),
-              workItem.getWorkToken());
+              workItem.getWorkToken(),
+              () -> {

Review Comment:
   how about a single fun that returns a Closable?
   That will help ensure they are matched up



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -2377,12 +2399,48 @@ public List<Windmill.KeyedGetDataRequest> 
getKeysToRefresh(Instant refreshDeadli
           ShardedKey shardedKey = entry.getKey();
           for (Work work : entry.getValue()) {
             if (work.getStartTime().isBefore(refreshDeadline)) {
-              result.add(
+              Map<Windmill.LatencyAttribution.State, Duration> durations =

Review Comment:
   it would be simpler to just build this map up in the work item instead of 
one keyed by State
   That saves merging here since it's already handled there. just need 
translation between enums



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3081,6 +3089,222 @@ public void testActiveWorkRefresh() throws Exception {
     assertThat(server.numGetDataRequests(), greaterThan(0));
   }
 
+  // A class that aggregates LatencyAttribution data from active work refresh 
requests.
+  private static class ActiveWorkRefreshSink {
+    Map<Long, EnumMap<LatencyAttribution.State, Duration>> totalDurations = 
new HashMap<>();
+
+    // Accessor for reading out aggregated LatencyAttribution data.
+    Duration getLatencyAttributionDuration(long workToken, 
LatencyAttribution.State state) {
+      EnumMap<LatencyAttribution.State, Duration> durations = 
totalDurations.get(workToken);
+      if (durations == null) {
+        return Duration.ZERO;
+      }
+      Duration d = durations.get(state);
+      if (d == null) {
+        return Duration.ZERO;
+      }
+      return d;
+    }
+
+    // Handles active work refresh requests when passed to 
FakeWindmillServer.addDataFnToOffer.
+    GetDataResponse getData(GetDataRequest request) {
+      boolean isActiveWorkRefresh = true;
+      for (ComputationGetDataRequest computationRequest : 
request.getRequestsList()) {
+        if 
(!computationRequest.getComputationId().equals(DEFAULT_COMPUTATION_ID)) {
+          isActiveWorkRefresh = false;
+          continue;
+        }
+        for (KeyedGetDataRequest keyedRequest : 
computationRequest.getRequestsList()) {
+          if (keyedRequest.getWorkToken() == 0
+              || keyedRequest.getShardingKey() != DEFAULT_SHARDING_KEY
+              || keyedRequest.getValuesToFetchCount() != 0
+              || keyedRequest.getBagsToFetchCount() != 0
+              || keyedRequest.getTagValuePrefixesToFetchCount() != 0
+              || keyedRequest.getWatermarkHoldsToFetchCount() != 0) {
+            isActiveWorkRefresh = false;
+            continue;
+          }
+          for (LatencyAttribution la : 
keyedRequest.getLatencyAttributionList()) {
+            EnumMap<LatencyAttribution.State, Duration> durations =
+                totalDurations.computeIfAbsent(
+                    keyedRequest.getWorkToken(),
+                    (Long workToken) ->
+                        new EnumMap<LatencyAttribution.State, Duration>(
+                            LatencyAttribution.State.class));
+            Duration old = durations.get(la.getState());
+            Duration cur = Duration.millis(la.getTotalDurationMillis());
+            if (old == null || old.isShorterThan(cur)) {
+              durations.put(la.getState(), cur);
+            }
+          }
+        }
+      }
+      if (!isActiveWorkRefresh) {
+        // The unit test below for state QUEUED relies on this delay.
+        Uninterruptibles.sleepUninterruptibly(2000, TimeUnit.MILLISECONDS);
+      }
+      return EMPTY_DATA_RESPONDER.apply(request);
+    }
+  }
+
+  @Test
+  public void testLatencyAttributionToQueuedState() throws Exception {
+    final int workToken = 323232; // A unique id makes it easier to find logs.
+
+    List<ParallelInstruction> instructions =
+        Arrays.asList(
+            makeSourceInstruction(StringUtf8Coder.of()),
+            makeDoFnInstruction(new SlowDoFn(Duration.millis(2000)), 0, 
StringUtf8Coder.of()),
+            makeSinkInstruction(StringUtf8Coder.of(), 0));
+
+    FakeWindmillServer server = new FakeWindmillServer(errorCollector);
+    server.setGetDataSleep(Duration.ZERO);
+    StreamingDataflowWorkerOptions options = 
createTestingPipelineOptions(server);
+    options.setActiveWorkRefreshPeriodMillis(100);
+    // A single-threaded worker processes work sequentially, leaving a second 
work item in state
+    // QUEUED until the first work item is committed.
+    options.setNumberOfWorkerHarnessThreads(1);
+    StreamingDataflowWorker worker = makeWorker(instructions, options, false 
/* publishCounters */);
+    worker.start();
+
+    ActiveWorkRefreshSink awrSink = new ActiveWorkRefreshSink();
+    for (int i = 0; i < 1000; ++i) {
+      server.addDataFnToOffer(
+          (GetDataRequest request) -> {
+            return awrSink.getData(request);
+          });
+    }
+    server.addWorkToOffer(makeInput(workToken + 1, 
TimeUnit.MILLISECONDS.toMicros(100)));
+    server.addWorkToOffer(makeInput(workToken, 
TimeUnit.MILLISECONDS.toMicros(100)));
+    server.waitForAndGetCommits(2);
+
+    worker.stop();
+
+    assertTrue(

Review Comment:
   verify that other ones are less that queued state?
   otherwise if getLatencyAttributionDruation always returned 1 sec all of the 
tests would pass.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3081,6 +3089,222 @@ public void testActiveWorkRefresh() throws Exception {
     assertThat(server.numGetDataRequests(), greaterThan(0));
   }
 
+  // A class that aggregates LatencyAttribution data from active work refresh 
requests.
+  private static class ActiveWorkRefreshSink {
+    Map<Long, EnumMap<LatencyAttribution.State, Duration>> totalDurations = 
new HashMap<>();
+
+    // Accessor for reading out aggregated LatencyAttribution data.
+    Duration getLatencyAttributionDuration(long workToken, 
LatencyAttribution.State state) {
+      EnumMap<LatencyAttribution.State, Duration> durations = 
totalDurations.get(workToken);
+      if (durations == null) {
+        return Duration.ZERO;
+      }
+      Duration d = durations.get(state);
+      if (d == null) {
+        return Duration.ZERO;
+      }
+      return d;
+    }
+
+    // Handles active work refresh requests when passed to 
FakeWindmillServer.addDataFnToOffer.
+    GetDataResponse getData(GetDataRequest request) {
+      boolean isActiveWorkRefresh = true;
+      for (ComputationGetDataRequest computationRequest : 
request.getRequestsList()) {
+        if 
(!computationRequest.getComputationId().equals(DEFAULT_COMPUTATION_ID)) {
+          isActiveWorkRefresh = false;
+          continue;
+        }
+        for (KeyedGetDataRequest keyedRequest : 
computationRequest.getRequestsList()) {
+          if (keyedRequest.getWorkToken() == 0
+              || keyedRequest.getShardingKey() != DEFAULT_SHARDING_KEY
+              || keyedRequest.getValuesToFetchCount() != 0
+              || keyedRequest.getBagsToFetchCount() != 0
+              || keyedRequest.getTagValuePrefixesToFetchCount() != 0
+              || keyedRequest.getWatermarkHoldsToFetchCount() != 0) {
+            isActiveWorkRefresh = false;
+            continue;
+          }
+          for (LatencyAttribution la : 
keyedRequest.getLatencyAttributionList()) {
+            EnumMap<LatencyAttribution.State, Duration> durations =
+                totalDurations.computeIfAbsent(
+                    keyedRequest.getWorkToken(),
+                    (Long workToken) ->
+                        new EnumMap<LatencyAttribution.State, Duration>(
+                            LatencyAttribution.State.class));
+            Duration old = durations.get(la.getState());
+            Duration cur = Duration.millis(la.getTotalDurationMillis());
+            if (old == null || old.isShorterThan(cur)) {
+              durations.put(la.getState(), cur);
+            }
+          }
+        }
+      }
+      if (!isActiveWorkRefresh) {
+        // The unit test below for state QUEUED relies on this delay.
+        Uninterruptibles.sleepUninterruptibly(2000, TimeUnit.MILLISECONDS);
+      }
+      return EMPTY_DATA_RESPONDER.apply(request);
+    }
+  }
+
+  @Test
+  public void testLatencyAttributionToQueuedState() throws Exception {
+    final int workToken = 323232; // A unique id makes it easier to find logs.
+
+    List<ParallelInstruction> instructions =
+        Arrays.asList(
+            makeSourceInstruction(StringUtf8Coder.of()),
+            makeDoFnInstruction(new SlowDoFn(Duration.millis(2000)), 0, 
StringUtf8Coder.of()),
+            makeSinkInstruction(StringUtf8Coder.of(), 0));
+
+    FakeWindmillServer server = new FakeWindmillServer(errorCollector);
+    server.setGetDataSleep(Duration.ZERO);
+    StreamingDataflowWorkerOptions options = 
createTestingPipelineOptions(server);
+    options.setActiveWorkRefreshPeriodMillis(100);
+    // A single-threaded worker processes work sequentially, leaving a second 
work item in state
+    // QUEUED until the first work item is committed.
+    options.setNumberOfWorkerHarnessThreads(1);
+    StreamingDataflowWorker worker = makeWorker(instructions, options, false 
/* publishCounters */);
+    worker.start();
+
+    ActiveWorkRefreshSink awrSink = new ActiveWorkRefreshSink();
+    for (int i = 0; i < 1000; ++i) {
+      server.addDataFnToOffer(

Review Comment:
   can you modify the server to allow specifying a default? instead of just 
loading 1000 up



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java:
##########
@@ -454,6 +469,10 @@ public void startBatchAndBlock() {
     // First, drain work out of the pending lookups into a set. These will be 
the items we fetch.
     HashSet<StateTag<?>> toFetch = Sets.newHashSet();
     try {
+      if (beforeRead != null) {

Review Comment:
   I was concerned that this might be incorrect if we ever call this from 
threads that are not the main processing thread (ie background pagination of 
bags).  However though we attempt to fetch the next bag page ahead of time it 
isn't issued in the background, just whenever it is needed or another state 
fetch would block.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateReader.java:
##########
@@ -454,6 +469,10 @@ public void startBatchAndBlock() {
     // First, drain work out of the pending lookups into a set. These will be 
the items we fetch.
     HashSet<StateTag<?>> toFetch = Sets.newHashSet();
     try {
+      if (beforeRead != null) {

Review Comment:
   Maybe we should move this down to only where we call getStateData
   That will prevent a transition in cases where we don't actually issue  a 
read to the server.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to