AHeise commented on a change in pull request #16113: URL: https://github.com/apache/flink/pull/16113#discussion_r648090118
########## File path: flink-core/src/main/java/org/apache/flink/api/common/eventtime/Watermark.java ########## @@ -56,6 +56,9 @@ Licensed to the Apache Software Foundation (ASF) under one /** The watermark that signifies end-of-event-time. */ public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE); + /** The watermark that signifies start-of-event-time. */ + public static final Watermark MIN_WATERMARK = new Watermark(Long.MIN_VALUE); Review comment: Can we add that to Public? ########## File path: flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java ########## @@ -19,6 +19,7 @@ package org.apache.flink.api.connector.source.lib.util; import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceOutput; Review comment: I don't get this change. Could you expand commit message or add a test case? ########## File path: flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java ########## @@ -86,6 +86,7 @@ public void registerNewOutput(String id) { checkState(previouslyRegistered == null, "Already contains an output for ID %s", id); combinedWatermarkStatus.add(outputState); + underlyingOutput.emitWatermark(Watermark.MIN_WATERMARK); Review comment: Is this also necessary if there has been a split already? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org