Repository: flink
Updated Branches:
  refs/heads/master c4a2d60c3 -> 6bd5714d2


[FLINK-2936] Fix ClassCastException for Event-Time source

Before, would throw a ClassCastException when emitting watermarks with
timestamp/watermark multiplexing disabled.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b648870
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b648870
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b648870

Branch: refs/heads/master
Commit: 4b648870b4673c5a9c4d80f185e7de679967098e
Parents: c4a2d60
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Dec 9 16:00:12 2015 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Fri Dec 11 10:45:17 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/operators/StreamSource.java   | 14 ++++--
 .../streaming/timestamp/TimestampITCase.java    | 46 ++++++++++++++++----
 .../streaming/util/SourceFunctionUtil.java      |  2 +-
 3 files changed, 48 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4b648870/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index e80654a..91c846f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -46,7 +46,7 @@ public class StreamSource<T> extends 
AbstractUdfStreamOperator<T, SourceFunction
                final ExecutionConfig executionConfig = getExecutionConfig();
                
                if (userFunction instanceof EventTimeSourceFunction) {
-                       ctx = new ManualWatermarkContext<T>(lockingObject, 
collector);
+                       ctx = new ManualWatermarkContext<T>(lockingObject, 
collector, getRuntimeContext().getExecutionConfig().areTimestampsEnabled());
                } else if (executionConfig.getAutoWatermarkInterval() > 0) {
                        ctx = new AutomaticWatermarkContext<T>(lockingObject, 
collector, executionConfig);
                } else if (executionConfig.areTimestampsEnabled()) {
@@ -261,11 +261,13 @@ public class StreamSource<T> extends 
AbstractUdfStreamOperator<T, SourceFunction
                private final Object lockingObject;
                private final Output<StreamRecord<T>> output;
                private final StreamRecord<T> reuse;
+               private final boolean watermarkMultiplexingEnabled;
 
-               public ManualWatermarkContext(Object lockingObject, 
Output<StreamRecord<T>> output) {
+               public ManualWatermarkContext(Object lockingObject, 
Output<StreamRecord<T>> output, boolean watermarkMultiplexingEnabled) {
                        this.lockingObject = lockingObject;
                        this.output = output;
                        this.reuse = new StreamRecord<T>(null);
+                       this.watermarkMultiplexingEnabled = 
watermarkMultiplexingEnabled;
                }
 
                @Override
@@ -283,7 +285,9 @@ public class StreamSource<T> extends 
AbstractUdfStreamOperator<T, SourceFunction
 
                @Override
                public void emitWatermark(Watermark mark) {
-                       output.emitWatermark(mark);
+                       if (watermarkMultiplexingEnabled) {
+                               output.emitWatermark(mark);
+                       }
                }
 
                @Override
@@ -296,7 +300,9 @@ public class StreamSource<T> extends 
AbstractUdfStreamOperator<T, SourceFunction
                        // emit one last +Inf watermark to make downstream 
watermark processing work
                        // when some sources close early
                        synchronized (lockingObject) {
-                               output.emitWatermark(new 
Watermark(Long.MAX_VALUE));
+                               if (watermarkMultiplexingEnabled) {
+                                       output.emitWatermark(new 
Watermark(Long.MAX_VALUE));
+                               }
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4b648870/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index 5113b45..8e7ada4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.taskmanager.MultiShotLatch;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;
@@ -133,7 +134,7 @@ public class TimestampITCase {
                source1.union(source2)
                                .map(new IdentityMap())
                                .connect(source2).map(new IdentityCoMap())
-                               .transform("Custom Operator", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
+                               .transform("Custom Operator", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
                                .addSink(new NoOpSink<Integer>());
 
                env.execute();
@@ -293,7 +294,7 @@ public class TimestampITCase {
                                });
 
                extractOp
-                               .transform("Watermark Check", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
+                               .transform("Watermark Check", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
                                .transform("Timestamp Check",
                                                BasicTypeInfo.INT_TYPE_INFO,
                                                new 
TimestampCheckingOperator());
@@ -362,7 +363,7 @@ public class TimestampITCase {
                                return Long.MIN_VALUE;
                        }
                })
-                               .transform("Watermark Check", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
+                               .transform("Watermark Check", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
                                .transform("Timestamp Check", 
BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
 
 
@@ -429,7 +430,7 @@ public class TimestampITCase {
                                return Long.MIN_VALUE;
                        }
                })
-                               .transform("Watermark Check", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
+                               .transform("Watermark Check", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
                                .transform("Timestamp Check", 
BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
 
 
@@ -503,23 +504,50 @@ public class TimestampITCase {
                env.execute();
        }
 
+       /**
+        * This verifies that an event time source works when setting stream 
time characteristic to
+        * processing time. In this case, the watermarks should just be 
swallowed.
+        */
+       @Test
+       public void testEventTimeSourceWithProcessingTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", 
cluster.getLeaderRPCPort());
+               env.setParallelism(2);
+               env.getConfig().disableSysoutLogging();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+               env.getConfig().disableTimestamps();
+
+               DataStream<Integer> source1 = env.addSource(new 
MyTimestampSource(0, 10));
+
+               source1
+                       .map(new IdentityMap())
+                       .transform("Watermark Check", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(false));
+
+               env.execute();
+
+               // verify that we don't get any watermarks, the source is used 
as watermark source in
+               // other tests, so it normally emits watermarks
+               Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 
0);
+       }
+
        @SuppressWarnings("unchecked")
        public static class CustomOperator extends 
AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, 
Integer> {
 
                List<Watermark> watermarks;
                public static List<Watermark>[] finalWatermarks = new 
List[PARALLELISM];
-               private long oldTimestamp;
+               private final boolean timestampsEnabled;
 
-               public CustomOperator() {
+               public CustomOperator(boolean timestampsEnabled) {
                        setChainingStrategy(ChainingStrategy.ALWAYS);
+                       this.timestampsEnabled = timestampsEnabled;
                }
 
                @Override
                public void processElement(StreamRecord<Integer> element) 
throws Exception {
-                       if (element.getTimestamp() != element.getValue()) {
-                               Assert.fail("Timestamps are not properly 
handled.");
+                       if (timestampsEnabled) {
+                               if (element.getTimestamp() != 
element.getValue()) {
+                                       Assert.fail("Timestamps are not 
properly handled.");
+                               }
                        }
-                       oldTimestamp = element.getTimestamp();
                        output.collect(element);
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b648870/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index 2afdc40..8895b6e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -62,7 +62,7 @@ public class SourceFunctionUtil<T> {
                        final Object lockingObject = new Object();
                        SourceFunction.SourceContext<T> ctx;
                        if (sourceFunction instanceof EventTimeSourceFunction) {
-                               ctx = new 
StreamSource.ManualWatermarkContext<T>(lockingObject, collector);
+                               ctx = new 
StreamSource.ManualWatermarkContext<T>(lockingObject, collector, true);
                        } else {
                                ctx = new 
StreamSource.NonWatermarkContext<T>(lockingObject, collector);
                        }

Reply via email to