Izeren commented on code in PR #27634:
URL: https://github.com/apache/flink/pull/27634#discussion_r2823258320


##########
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java:
##########
@@ -1162,9 +1163,22 @@ public InputStatus pollNext(ReaderOutput<Integer> out) {
                             out.collect(3, 3);
                             out.collect(4, 4);
                             emitted = true;
-                            return InputStatus.END_OF_INPUT;
                         }
-                        return InputStatus.END_OF_INPUT;
+
+                        // We're using SingleSplitEnumerator below which DOES 
send operator events
+                        // to the source tasks. Therefore, if we finish this 
task prematurely (by
+                        // returning END_OF_INPUT) such event delivery might 
fail, causing job and
+                        // test failure. Usually, this doesn't happen because 
usually the task

Review Comment:
   !nit, double use of usually for the same



##########
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java:
##########
@@ -1162,9 +1163,22 @@ public InputStatus pollNext(ReaderOutput<Integer> out) {
                             out.collect(3, 3);
                             out.collect(4, 4);
                             emitted = true;
-                            return InputStatus.END_OF_INPUT;
                         }
-                        return InputStatus.END_OF_INPUT;
+
+                        // We're using SingleSplitEnumerator below which DOES 
send operator events
+                        // to the source tasks. Therefore, if we finish this 
task prematurely (by
+                        // returning END_OF_INPUT) such event delivery might 
fail, causing job and
+                        // test failure. Usually, this doesn't happen because 
usually the task
+                        // finishes before the RPC starts.
+                        // To avoid flakiness, we intentinoally wait for 
NO_MORE_SPLITS event on TM.
+                        return noMoreSplits
+                                ? InputStatus.END_OF_INPUT
+                                : InputStatus.NOTHING_AVAILABLE;
+                    }
+
+                    @Override
+                    public void notifyNoMoreSplits() {
+                        this.noMoreSplits = true;

Review Comment:
   Is this is used in a single thread? Can there be visibility problem?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to