pnowojski commented on a change in pull request #14908:
URL: https://github.com/apache/flink/pull/14908#discussion_r574515479



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
##########
@@ -86,11 +89,44 @@
  */
 public class SourceStreamTaskTest {
 
+    @Test
+    public void testInputEndedBeforeStopWithSavepointConfirmed() throws 
Exception {
+        CancelTestSource source =
+                new CancelTestSource(
+                        STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()), "src");
+        TestBoundedOneInputStreamOperator chainTail = new 
TestBoundedOneInputStreamOperator("t");
+        StreamTaskMailboxTestHarness<String> harness =
+                new 
StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, STRING_TYPE_INFO)
+                        .setupOperatorChain(
+                                new OperatorID(),
+                                new StreamSource<String, 
CancelTestSource>(source))
+                        .chain(
+                                new OperatorID(),
+                                chainTail,
+                                STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()))
+                        .finish()
+                        .build();
+        Future<Boolean> triggerFuture =
+                harness.streamTask.triggerCheckpointAsync(
+                        new CheckpointMetaData(1, 1),
+                        new CheckpointOptions(SYNC_SAVEPOINT, getDefault()),
+                        false);
+        while (!triggerFuture.isDone()) {
+            harness.streamTask.runMailboxStep();
+        }
+        // stopping the input should be treated as "true" end of input
+        // because checkpoint completion notification will not be sent

Review comment:
       ```
   // instead of completing stop with savepoint via `notifyCheckpointCompleted` 
call
   // we simulate that source has finished first. As a result we expect that 
the endOfInput
   // should have been issued
   ```
   ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to