scwhittle commented on code in PR #38814:
URL: https://github.com/apache/beam/pull/38814#discussion_r3428447432


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -248,46 +246,39 @@ private void processWork(
   }
 
   private void processWork(
-      ComputationState computationState, Work work, 
BoundedQueueExecutorWorkHandle unusedHandle) {
+      ComputationState computationState, Work work, 
BoundedQueueExecutorWorkHandle handle) {
     Windmill.WorkItem workItem = work.getWorkItem();
     String computationId = computationState.getComputationId();
-    ByteString key = workItem.getKey();
     work.setProcessingThreadName(Thread.currentThread().getName());
     work.setState(Work.State.PROCESSING);
     setUpWorkLoggingContext(work.getLatencyTrackingId(), computationId);
     LOG.debug("Starting processing for {}:\n{}", computationId, work);
 
     if (workItem.getSourceState().getOnlyFinalize()) {
-      Windmill.WorkItemCommitRequest.Builder outputBuilder = 
initializeOutputBuilder(key, workItem);
-      
outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true));
-      work.setState(Work.State.COMMIT_QUEUED);
-      work.queueCommit(outputBuilder.build(), computationState);
+      handleOnlyFinalize(computationState, work, workItem);
       return;
     }
 
     long processingStartTimeNanos = System.nanoTime();
-    MapTask mapTask = computationState.getMapTask();
-    StageInfo stageInfo =
-        stageInfoMap.computeIfAbsent(
-            mapTask.getStageName(), s -> StageInfo.create(s, 
mapTask.getSystemName()));
+    StageInfo stageInfo = getStageInfo(computationState);
 
+    List<Work> workBatch = null;

Review Comment:
   mark Nullable? I don't see supression, any idea why it doesn't complain?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -243,50 +291,120 @@ public byte[] getCurrentRecordOffset() {
     return checkStateNotNull(activeReader).getCurrentRecordOffset();
   }
 
+  public void clear() {

Review Comment:
   is clear preparing for reuse or getting rid of it?
   I've seen reset for the former and discard used in fnharness for the latter, 
those might be better choices here too
   
   a comment could also help



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -243,50 +291,120 @@ public byte[] getCurrentRecordOffset() {
     return checkStateNotNull(activeReader).getCurrentRecordOffset();
   }
 
+  public void clear() {
+    for (Work w : executedWorks) {
+      w.setOnFailureListener(null);

Review Comment:
   is this cleanup needed for local retries of work? if so add comment
   
   if not, what about just making a new atomicbool to avoid this loop?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -384,89 +381,143 @@ private ExecuteWorkResult executeWork(
 
     try {
       WindmillStateReader stateReader = work.createWindmillStateReader();
-      SideInputStateFetcher localSideInputStateFetcher =
-          
sideInputStateFetcherFactory.createSideInputStateFetcher(work::fetchSideInput);
-
-      // If the read output KVs, then we can decode Windmill's byte key into 
userland
-      // key object and provide it to the execution context for use with 
per-key state.
-      // Otherwise, we pass null.
-      //
-      // The coder type that will be present is:
-      //     WindowedValueCoder(TimerOrElementCoder(KvCoder))
-      Optional<Coder<?>> keyCoder = computationWorkExecutor.keyCoder();
-      @SuppressWarnings("deprecation")
-      @Nullable
-      final Object executionKey =
-          !keyCoder.isPresent() ? null : keyCoder.get().decode(key.newInput(), 
Coder.Context.OUTER);
-
-      if (workItem.hasHotKeyInfo()) {
-        Windmill.HotKeyInfo hotKeyInfo = workItem.getHotKeyInfo();
-        Duration hotKeyAge = Duration.millis(hotKeyInfo.getHotKeyAgeUsec() / 
1000);
-
-        String stepName = 
getShuffleTaskStepName(computationState.getMapTask());
-        if (executionKey != null
-            && (options.isHotKeyLoggingEnabled()
-                || hasExperiment(options, "enable_hot_key_logging"))
-            && keyCoder.isPresent()) {
-          hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge, executionKey);
-        } else {
-          hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge);
-        }
-      }
+
+      KeyTransitionListener keyTransitionListener = 
createKeyTransitionListener();
 
       // Blocks while executing work.
       computationWorkExecutor.executeWork(
-          executionKey, work, stateReader, localSideInputStateFetcher, 
outputBuilder);
+          work, stateReader, workExecutor, handle, keyTransitionListener);
+
+      List<Work> workBatch;
+      List<Windmill.WorkItemCommitRequest> workItemCommits;
+      Map<Long, Pair<Instant, Runnable>> accumulatedCallbacks;
+      long stateBytesRead;
+      {
+        StreamingModeExecutionContext context = 
computationWorkExecutor.context();

Review Comment:
   should we return this from executeWork instead of separate call?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java:
##########
@@ -129,73 +126,86 @@ public static <K, T> WindowingWindmillReader<K, T> create(
     return new WindowingWindmillReader<>(coder, context, 
skipUndecodableElements);
   }
 
+  private KeyedWorkItem<K, T> createKeyedWorkItem() {
+    @SuppressWarnings("unchecked")
+    @Nullable
+    K key = (K) context.getKey();
+    return new WindmillKeyedWorkItem<>(
+        key,
+        context.getWorkItem(),
+        windowCoder,
+        windowsCoder,
+        valueCoder,
+        context.getWindmillTagEncoding(),
+        context.getDrainMode(),
+        skipUndecodableElements.isAccessible()
+            && Boolean.TRUE.equals(skipUndecodableElements.get()));
+  }
+
+  private boolean isEmpty(KeyedWorkItem<K, T> keyedWorkItem) {
+    return Iterables.isEmpty(keyedWorkItem.timersIterable())
+        && Iterables.isEmpty(keyedWorkItem.elementsIterable());
+  }
+
   @Override
   public NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>> iterator() 
throws IOException {
-    final K key =
-        keyCoder.decode(
-            checkStateNotNull(context.getSerializedKey()).newInput(), 
Coder.Context.OUTER);
-    final WorkItem workItem = context.getWorkItem();
-    KeyedWorkItem<K, T> keyedWorkItem =
-        new WindmillKeyedWorkItem<>(
-            key,
-            workItem,
-            windowCoder,
-            windowsCoder,
-            valueCoder,
-            context.getWindmillTagEncoding(),
-            context.getDrainMode(),
-            skipUndecodableElements.isAccessible()
-                && Boolean.TRUE.equals(skipUndecodableElements.get()));
-    final boolean isEmptyWorkItem =
-        (Iterables.isEmpty(keyedWorkItem.timersIterable())
-            && Iterables.isEmpty(keyedWorkItem.elementsIterable()));
-    final WindowedValue<KeyedWorkItem<K, T>> value = new 
ValueInEmptyWindows<>(keyedWorkItem);
-
-    // Return a noop iterator when current workitem is an empty workitem.
-    if (isEmptyWorkItem) {
-      return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {
-        @Override
-        public boolean start() throws IOException {
-          context.finishKey();
-          return false;
+    final KeyedWorkItem<K, T> firstKeyedWorkItem = createKeyedWorkItem();
+    final boolean firstKeyIsEmpty = isEmpty(firstKeyedWorkItem);
+    final WindowedValue<KeyedWorkItem<K, T>> firstValue =
+        new ValueInEmptyWindows<>(firstKeyedWorkItem);
+
+    return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {
+      private @Nullable WindowedValue<KeyedWorkItem<K, T>> current = null;
+      private boolean started = false;
+
+      @Override
+      public boolean start() throws IOException {
+        if (context.workIsFailed()) {
+          throw new WorkItemCancelledException(
+              checkStateNotNull(context.getWorkItem()).getShardingKey());
         }
-
-        @Override
-        public boolean advance() throws IOException {
+        if (started) {
           return false;
         }
-
-        @Override
-        public WindowedValue<KeyedWorkItem<K, T>> getCurrent() {
-          throw new NoSuchElementException();
+        started = true;
+        if (firstKeyIsEmpty) {
+          return advance(); // Try to transition immediately if the first key 
is empty!
         }
-      };
-    } else {
-      return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {
-        private @Nullable WindowedValue<KeyedWorkItem<K, T>> current = null;
-
-        @Override
-        public boolean start() throws IOException {
-          current = value;
-          return true;
+        current = firstValue;

Review Comment:
   should we pass in first value to constructor and initialize current to it, 
instead of binding it? seems like that would allow it to be gc'd



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -187,21 +184,22 @@ private static Windmill.WorkItemCommitRequest 
buildWorkItemTruncationRequest(
 
   /** Sets the stage name and workId of the Thread executing the {@link Work} 
for logging. */
   private static void setUpWorkLoggingContext(String workLatencyTrackingId, 
String computationId) {
-    DataflowWorkerLoggingMDC.setWorkId(workLatencyTrackingId);
+    setLoggingContextWorkId(workLatencyTrackingId);
+    setLoggingContextComputation(computationId);
+  }
+
+  private static void setLoggingContextComputation(@Nullable String 
computationId) {
     DataflowWorkerLoggingMDC.setStageName(computationId);
   }
 
-  private static String getShuffleTaskStepName(MapTask mapTask) {
-    // The MapTask instruction is ordered by dependencies, such that the first 
element is
-    // always going to be the shuffle task.
-    return mapTask.getInstructions().get(0).getName();
+  private static void setLoggingContextWorkId(@Nullable String 
workLatencyTrackingId) {
+    DataflowWorkerLoggingMDC.setWorkId(workLatencyTrackingId);
   }
 
   /** Resets logging context of the Thread executing the {@link Work} for 
logging. */
-  private void resetWorkLoggingContext(String workLatencyTrackingId) {
-    sampler.resetForWorkId(workLatencyTrackingId);
-    DataflowWorkerLoggingMDC.setWorkId(null);
-    DataflowWorkerLoggingMDC.setStageName(null);
+  private void resetWorkLoggingContext() {

Review Comment:
   what about just inlining this instead of a separate method? then it can 
remain next to the sampler reset



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to