[FLINK-3121] Emit Final Watermark in Kafka Source Kafka sources that don't read from any partition never emit a watermark, thereby blocking the progress of event-time in downstream operations. This changes the Kafka Source to emit a Long.MAX_VALUE watermark if it knows that it will never receive data.
This also changes the Timestamp Extraction operator to reacto to a Long.MAX_VALUE watermark by itself emitting a Long.MAX_VALUE watermark. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bd5714d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bd5714d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bd5714d Branch: refs/heads/master Commit: 6bd5714d2a045e581b1a761830d010598f803de7 Parents: 4b64887 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Wed Dec 9 12:13:22 2015 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri Dec 11 10:45:34 2015 +0100 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaConsumer.java | 8 ++- .../streaming/api/operators/StreamSource.java | 16 +++-- .../operators/ExtractTimestampsOperator.java | 15 ++--- .../streaming/timestamp/TimestampITCase.java | 62 ++++++++++++++++++++ 4 files changed, 82 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6bd5714d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java index c4fd654..b139e95 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java @@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.internals.Fetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher; @@ -434,7 +435,12 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T> } } else { - // this source never completes + // this source never completes, so emit a Long.MAX_VALUE watermark + // to not block watermark forwarding + if (getRuntimeContext().getExecutionConfig().areTimestampsEnabled()) { + sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); + } + final Object waitLock = new Object(); while (running) { // wait until we are canceled http://git-wip-us.apache.org/repos/asf/flink/blob/6bd5714d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index 91c846f..996e32c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -60,6 +60,12 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction // This will mostly emit a final +Inf Watermark to make the Watermark logic work // when some sources finish before others do ctx.close(); + + if (executionConfig.areTimestampsEnabled()) { + synchronized (lockingObject) { + output.emitWatermark(new Watermark(Long.MAX_VALUE)); + } + } } public void cancel() { @@ -296,14 +302,6 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction } @Override - public void close() { - // emit one last +Inf watermark to make downstream watermark processing work - // when some sources close early - synchronized (lockingObject) { - if (watermarkMultiplexingEnabled) { - output.emitWatermark(new Watermark(Long.MAX_VALUE)); - } - } - } + public void close() {} } } http://git-wip-us.apache.org/repos/asf/flink/blob/6bd5714d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java index 9c27c6d..bfd9c8b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java @@ -57,14 +57,6 @@ public class ExtractTimestampsOperator<T> } @Override - public void close() throws Exception { - super.close(); - - // emit a final +Inf watermark, just like the sources - output.emitWatermark(new Watermark(Long.MAX_VALUE)); - } - - @Override public void processElement(StreamRecord<T> element) throws Exception { long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.getTimestamp()); output.collect(element.replace(element.getValue(), newTimestamp)); @@ -90,6 +82,11 @@ public class ExtractTimestampsOperator<T> @Override public void processWatermark(Watermark mark) throws Exception { - // ignore them, since we are basically a watermark source + // if we receive a Long.MAX_VALUE watermark we forward it since it is used + // to signal the end of input and to not block watermark progress downstream + if (mark.getTimestamp() == Long.MAX_VALUE && mark.getTimestamp() > currentWatermark) { + currentWatermark = Long.MAX_VALUE; + output.emitWatermark(mark); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6bd5714d/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java index 8e7ada4..6c3ef40 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java @@ -448,6 +448,68 @@ public class TimestampITCase { } /** + * This test verifies that the timestamp extractor forwards Long.MAX_VALUE watermarks. + */ + @Test + public void testTimestampExtractorWithLongMaxWatermarkFromSource() throws Exception { + final int NUM_ELEMENTS = 10; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort()); + env.setParallelism(2); + env.getConfig().disableSysoutLogging(); + env.getConfig().enableTimestamps(); + env.getConfig().setAutoWatermarkInterval(1); + + + DataStream<Integer> source1 = env.addSource(new EventTimeSourceFunction<Integer>() { + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + int index = 0; + while (index < NUM_ELEMENTS) { + ctx.collectWithTimestamp(index, index); + ctx.collectWithTimestamp(index - 1, index - 1); + index++; + ctx.emitWatermark(new Watermark(index-2)); + } + + // emit the final Long.MAX_VALUE watermark, do it twice and verify that + // we only see one in the result + ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); + ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); + } + + @Override + public void cancel() { + + } + }); + + source1.assignTimestamps(new TimestampExtractor<Integer>() { + @Override + public long extractTimestamp(Integer element, long currentTimestamp) { + return element; + } + + @Override + public long extractWatermark(Integer element, long currentTimestamp) { + return Long.MIN_VALUE; + } + + @Override + public long getCurrentWatermark() { + return Long.MIN_VALUE; + } + }) + .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)); + + + env.execute(); + + Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1); + Assert.assertTrue(CustomOperator.finalWatermarks[0].get(0).getTimestamp() == Long.MAX_VALUE); + } + + /** * This tests whether the program throws an exception when an event-time source tries * to emit without timestamp. */