Repository: flink Updated Branches: refs/heads/master c4a2d60c3 -> 6bd5714d2
[FLINK-2936] Fix ClassCastException for Event-Time source Before, would throw a ClassCastException when emitting watermarks with timestamp/watermark multiplexing disabled. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b648870 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b648870 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b648870 Branch: refs/heads/master Commit: 4b648870b4673c5a9c4d80f185e7de679967098e Parents: c4a2d60 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Wed Dec 9 16:00:12 2015 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri Dec 11 10:45:17 2015 +0100 ---------------------------------------------------------------------- .../streaming/api/operators/StreamSource.java | 14 ++++-- .../streaming/timestamp/TimestampITCase.java | 46 ++++++++++++++++---- .../streaming/util/SourceFunctionUtil.java | 2 +- 3 files changed, 48 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4b648870/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index e80654a..91c846f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -46,7 +46,7 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction final ExecutionConfig executionConfig = getExecutionConfig(); if (userFunction instanceof EventTimeSourceFunction) { - ctx = new ManualWatermarkContext<T>(lockingObject, collector); + ctx = new ManualWatermarkContext<T>(lockingObject, collector, getRuntimeContext().getExecutionConfig().areTimestampsEnabled()); } else if (executionConfig.getAutoWatermarkInterval() > 0) { ctx = new AutomaticWatermarkContext<T>(lockingObject, collector, executionConfig); } else if (executionConfig.areTimestampsEnabled()) { @@ -261,11 +261,13 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction private final Object lockingObject; private final Output<StreamRecord<T>> output; private final StreamRecord<T> reuse; + private final boolean watermarkMultiplexingEnabled; - public ManualWatermarkContext(Object lockingObject, Output<StreamRecord<T>> output) { + public ManualWatermarkContext(Object lockingObject, Output<StreamRecord<T>> output, boolean watermarkMultiplexingEnabled) { this.lockingObject = lockingObject; this.output = output; this.reuse = new StreamRecord<T>(null); + this.watermarkMultiplexingEnabled = watermarkMultiplexingEnabled; } @Override @@ -283,7 +285,9 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction @Override public void emitWatermark(Watermark mark) { - output.emitWatermark(mark); + if (watermarkMultiplexingEnabled) { + output.emitWatermark(mark); + } } @Override @@ -296,7 +300,9 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction // emit one last +Inf watermark to make downstream watermark processing work // when some sources close early synchronized (lockingObject) { - output.emitWatermark(new Watermark(Long.MAX_VALUE)); + if (watermarkMultiplexingEnabled) { + output.emitWatermark(new Watermark(Long.MAX_VALUE)); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/4b648870/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java index 5113b45..8e7ada4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.taskmanager.MultiShotLatch; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor; @@ -133,7 +134,7 @@ public class TimestampITCase { source1.union(source2) .map(new IdentityMap()) .connect(source2).map(new IdentityCoMap()) - .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator()) + .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)) .addSink(new NoOpSink<Integer>()); env.execute(); @@ -293,7 +294,7 @@ public class TimestampITCase { }); extractOp - .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator()) + .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)) .transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator()); @@ -362,7 +363,7 @@ public class TimestampITCase { return Long.MIN_VALUE; } }) - .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator()) + .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)) .transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator()); @@ -429,7 +430,7 @@ public class TimestampITCase { return Long.MIN_VALUE; } }) - .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator()) + .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true)) .transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator()); @@ -503,23 +504,50 @@ public class TimestampITCase { env.execute(); } + /** + * This verifies that an event time source works when setting stream time characteristic to + * processing time. In this case, the watermarks should just be swallowed. + */ + @Test + public void testEventTimeSourceWithProcessingTime() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort()); + env.setParallelism(2); + env.getConfig().disableSysoutLogging(); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + env.getConfig().disableTimestamps(); + + DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0, 10)); + + source1 + .map(new IdentityMap()) + .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(false)); + + env.execute(); + + // verify that we don't get any watermarks, the source is used as watermark source in + // other tests, so it normally emits watermarks + Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 0); + } + @SuppressWarnings("unchecked") public static class CustomOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> { List<Watermark> watermarks; public static List<Watermark>[] finalWatermarks = new List[PARALLELISM]; - private long oldTimestamp; + private final boolean timestampsEnabled; - public CustomOperator() { + public CustomOperator(boolean timestampsEnabled) { setChainingStrategy(ChainingStrategy.ALWAYS); + this.timestampsEnabled = timestampsEnabled; } @Override public void processElement(StreamRecord<Integer> element) throws Exception { - if (element.getTimestamp() != element.getValue()) { - Assert.fail("Timestamps are not properly handled."); + if (timestampsEnabled) { + if (element.getTimestamp() != element.getValue()) { + Assert.fail("Timestamps are not properly handled."); + } } - oldTimestamp = element.getTimestamp(); output.collect(element); } http://git-wip-us.apache.org/repos/asf/flink/blob/4b648870/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java index 2afdc40..8895b6e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java @@ -62,7 +62,7 @@ public class SourceFunctionUtil<T> { final Object lockingObject = new Object(); SourceFunction.SourceContext<T> ctx; if (sourceFunction instanceof EventTimeSourceFunction) { - ctx = new StreamSource.ManualWatermarkContext<T>(lockingObject, collector); + ctx = new StreamSource.ManualWatermarkContext<T>(lockingObject, collector, true); } else { ctx = new StreamSource.NonWatermarkContext<T>(lockingObject, collector); }