pnowojski commented on a change in pull request #14908: URL: https://github.com/apache/flink/pull/14908#discussion_r574515479
########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java ########## @@ -86,11 +89,44 @@ */ public class SourceStreamTaskTest { + @Test + public void testInputEndedBeforeStopWithSavepointConfirmed() throws Exception { + CancelTestSource source = + new CancelTestSource( + STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), "src"); + TestBoundedOneInputStreamOperator chainTail = new TestBoundedOneInputStreamOperator("t"); + StreamTaskMailboxTestHarness<String> harness = + new StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, STRING_TYPE_INFO) + .setupOperatorChain( + new OperatorID(), + new StreamSource<String, CancelTestSource>(source)) + .chain( + new OperatorID(), + chainTail, + STRING_TYPE_INFO.createSerializer(new ExecutionConfig())) + .finish() + .build(); + Future<Boolean> triggerFuture = + harness.streamTask.triggerCheckpointAsync( + new CheckpointMetaData(1, 1), + new CheckpointOptions(SYNC_SAVEPOINT, getDefault()), + false); + while (!triggerFuture.isDone()) { + harness.streamTask.runMailboxStep(); + } + // stopping the input should be treated as "true" end of input + // because checkpoint completion notification will not be sent Review comment: ``` // instead of completing stop with savepoint via `notifyCheckpointCompleted` call // we simulate that source has finished first. As a result we expect that the endOfInput // should have been issued ``` ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org