[FLINK-3121] Emit Final Watermark in Kafka Source

Kafka sources that don't read from any partition never emit a watermark,
thereby blocking the progress of event-time in downstream operations.
This changes the Kafka Source to emit a Long.MAX_VALUE watermark if it
knows that it will never receive data.

This also changes the Timestamp Extraction operator to reacto to a
Long.MAX_VALUE watermark by itself emitting a Long.MAX_VALUE watermark.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bd5714d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bd5714d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bd5714d

Branch: refs/heads/master
Commit: 6bd5714d2a045e581b1a761830d010598f803de7
Parents: 4b64887
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Dec 9 12:13:22 2015 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Fri Dec 11 10:45:34 2015 +0100

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer.java    |  8 ++-
 .../streaming/api/operators/StreamSource.java   | 16 +++--
 .../operators/ExtractTimestampsOperator.java    | 15 ++---
 .../streaming/timestamp/TimestampITCase.java    | 62 ++++++++++++++++++++
 4 files changed, 82 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6bd5714d/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
index c4fd654..b139e95 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher;
@@ -434,7 +435,12 @@ public class FlinkKafkaConsumer<T> extends 
RichParallelSourceFunction<T>
                        }
                }
                else {
-                       // this source never completes
+                       // this source never completes, so emit a 
Long.MAX_VALUE watermark
+                       // to not block watermark forwarding
+                       if 
(getRuntimeContext().getExecutionConfig().areTimestampsEnabled()) {
+                               sourceContext.emitWatermark(new 
Watermark(Long.MAX_VALUE));
+                       }
+
                        final Object waitLock = new Object();
                        while (running) {
                                // wait until we are canceled

http://git-wip-us.apache.org/repos/asf/flink/blob/6bd5714d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 91c846f..996e32c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -60,6 +60,12 @@ public class StreamSource<T> extends 
AbstractUdfStreamOperator<T, SourceFunction
                // This will mostly emit a final +Inf Watermark to make the 
Watermark logic work
                // when some sources finish before others do
                ctx.close();
+
+               if (executionConfig.areTimestampsEnabled()) {
+                       synchronized (lockingObject) {
+                               output.emitWatermark(new 
Watermark(Long.MAX_VALUE));
+                       }
+               }
        }
 
        public void cancel() {
@@ -296,14 +302,6 @@ public class StreamSource<T> extends 
AbstractUdfStreamOperator<T, SourceFunction
                }
 
                @Override
-               public void close() {
-                       // emit one last +Inf watermark to make downstream 
watermark processing work
-                       // when some sources close early
-                       synchronized (lockingObject) {
-                               if (watermarkMultiplexingEnabled) {
-                                       output.emitWatermark(new 
Watermark(Long.MAX_VALUE));
-                               }
-                       }
-               }
+               public void close() {}
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bd5714d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index 9c27c6d..bfd9c8b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -57,14 +57,6 @@ public class ExtractTimestampsOperator<T>
        }
 
        @Override
-       public void close() throws Exception {
-               super.close();
-
-               // emit a final +Inf watermark, just like the sources
-               output.emitWatermark(new Watermark(Long.MAX_VALUE));
-       }
-
-       @Override
        public void processElement(StreamRecord<T> element) throws Exception {
                long newTimestamp = 
userFunction.extractTimestamp(element.getValue(), element.getTimestamp());
                output.collect(element.replace(element.getValue(), 
newTimestamp));
@@ -90,6 +82,11 @@ public class ExtractTimestampsOperator<T>
 
        @Override
        public void processWatermark(Watermark mark) throws Exception {
-               // ignore them, since we are basically a watermark source
+               // if we receive a Long.MAX_VALUE watermark we forward it since 
it is used
+               // to signal the end of input and to not block watermark 
progress downstream
+               if (mark.getTimestamp() == Long.MAX_VALUE && 
mark.getTimestamp() > currentWatermark) {
+                       currentWatermark = Long.MAX_VALUE;
+                       output.emitWatermark(mark);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bd5714d/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index 8e7ada4..6c3ef40 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -448,6 +448,68 @@ public class TimestampITCase {
        }
 
        /**
+        * This test verifies that the timestamp extractor forwards 
Long.MAX_VALUE watermarks.
+        */
+       @Test
+       public void testTimestampExtractorWithLongMaxWatermarkFromSource() 
throws Exception {
+               final int NUM_ELEMENTS = 10;
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", 
cluster.getLeaderRPCPort());
+               env.setParallelism(2);
+               env.getConfig().disableSysoutLogging();
+               env.getConfig().enableTimestamps();
+               env.getConfig().setAutoWatermarkInterval(1);
+
+
+               DataStream<Integer> source1 = env.addSource(new 
EventTimeSourceFunction<Integer>() {
+                       @Override
+                       public void run(SourceContext<Integer> ctx) throws 
Exception {
+                               int index = 0;
+                               while (index < NUM_ELEMENTS) {
+                                       ctx.collectWithTimestamp(index, index);
+                                       ctx.collectWithTimestamp(index - 1, 
index - 1);
+                                       index++;
+                                       ctx.emitWatermark(new 
Watermark(index-2));
+                               }
+
+                               // emit the final Long.MAX_VALUE watermark, do 
it twice and verify that
+                               // we only see one in the result
+                               ctx.emitWatermark(new 
Watermark(Long.MAX_VALUE));
+                               ctx.emitWatermark(new 
Watermark(Long.MAX_VALUE));
+                       }
+
+                       @Override
+                       public void cancel() {
+
+                       }
+               });
+
+               source1.assignTimestamps(new TimestampExtractor<Integer>() {
+                       @Override
+                       public long extractTimestamp(Integer element, long 
currentTimestamp) {
+                               return element;
+                       }
+
+                       @Override
+                       public long extractWatermark(Integer element, long 
currentTimestamp) {
+                               return Long.MIN_VALUE;
+                       }
+
+                       @Override
+                       public long getCurrentWatermark() {
+                               return Long.MIN_VALUE;
+                       }
+               })
+                       .transform("Watermark Check", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true));
+
+
+               env.execute();
+
+               Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 
1);
+               
Assert.assertTrue(CustomOperator.finalWatermarks[0].get(0).getTimestamp() == 
Long.MAX_VALUE);
+       }
+
+       /**
         * This tests whether the program throws an exception when an 
event-time source tries
         * to emit without timestamp.
         */

Reply via email to