arunpandianp commented on code in PR #38454:
URL: https://github.com/apache/beam/pull/38454#discussion_r3316702128


##########
runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java:
##########
@@ -63,6 +63,12 @@ public interface DoFnRunner<InputT extends @Nullable Object, 
OutputT extends @Nu
   <KeyT extends @Nullable Object> void onWindowExpiration(
       BoundedWindow window, Instant timestamp, KeyT key);
 
+  /**
+   * Performs per-key cleanup or processing after all elements and timers for 
a key have been

Review Comment:
   Expanded the comment a bit more. Also added that the method is used only by 
dataflow right now.
   
   > I'm wondering if it is possible for a runner driving this to process a 
batch of input for a key, then a different key, and then back to the original 
key.
   
   I think if a runner wants to do that, it has to make sure the underlying key 
context is also changed with each batch. Currently only dataflow streaming uses 
this and it does not interleave keys.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java:
##########
@@ -363,6 +363,13 @@ public void processTimers() throws Exception {
     processTimers(TimerType.SYSTEM, stepContext, windowCoder);
   }
 
+  @Override
+  public void finishKey() throws Exception {
+    if (fnRunner != null) {

Review Comment:
   fnRunner is initialized in processBundle or processTimers. If there is a 
bundle without any messages or timers, not sure if that is possible, then 
fnRunner will be null in finishKey. The finishBundle logic is also within a 
`if(fnRunner != null)`. So we should keep the if check.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowsParDoFn.java:
##########
@@ -142,6 +142,13 @@ public void processTimers() throws Exception {
     // it here to build a KeyedWorkItem
   }
 
+  @Override
+  public void finishKey() throws Exception {
+    if (fnRunner != null) {

Review Comment:
   Yes, changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to