[ https://issues.apache.org/jira/browse/FLINK-24623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479905#comment-17479905 ]
Alexander Fedulov commented on FLINK-24623: ------------------------------------------- [~Dario] Hi Dario, could you propose your approach in a form of a PR? > Prevent usage of EventTimeWindows when EventTime is disabled > ------------------------------------------------------------ > > Key: FLINK-24623 > URL: https://issues.apache.org/jira/browse/FLINK-24623 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Reporter: Dario Heinisch > Priority: Not a Priority > Original Estimate: 24h > Remaining Estimate: 24h > > Having the following stream will never process values after the windowing as > event time based has been disabled via the Watermark strategy: > {code:java} > public class PlaygroundJob { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new > Configuration()); DataStreamSource<Tuple2<Long, Integer>> source = > env.addSource(new SourceFunction<Tuple2<Long, Integer>>() { > @Override > public void run(SourceContext<Tuple2<Long, Integer>> > sourceContext) throws Exception { > int i = 0; > while (true) { > Tuple2<Long, Integer> tuple = > Tuple2.of(System.currentTimeMillis(), i++ % 10); > sourceContext.collect(tuple); > } > } @Override > public void cancel() { > } }); source.assignTimestampsAndWatermarks( > // Switch noWatermarks() to forMonotonousTimestamps() > // and values are being printed. > WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks() > .withTimestampAssigner((t, timestamp) -> t.f0) > ).keyBy(t -> t.f1) > .window(TumblingEventTimeWindows.of(Time.seconds(1))) > .process(new ProcessWindowFunction<Tuple2<Long, > Integer>, String, Integer, TimeWindow>() { > @Override > public void process(Integer key, Context context, > Iterable<Tuple2<Long, Integer>> iterable, Collector<String> out) throws > Exception { > int count = 0; > Iterator<Tuple2<Long, Integer>> iter = > iterable.iterator(); > while (iter.hasNext()) { > count++; > iter.next(); > } out.collect("Key: " + key > + " count: " + count); } > }).print(); env.execute(); > } > }{code} > > The issue is that the stream makes use of _noWatermarks()_ which effectively > disables any event time windowing. > As this pipeline can never process values it is faulty and Flink should throw > an Exception when starting up. > > -------------------- > Proposed change: > We extend the interface > [WatermarkStrategy|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategy.java#L55] > with the method _boolean isEventTime()_. > We create a new class named _EventTimeWindowPreconditions_ and add the > following method to it where we make use of _isEventTime()_: > > {code:java} > public static void hasPrecedingEventTimeGenerator(final > List<Transformation<?>> predecessors) { > for (int i = predecessors.size() - 1; i >= 0; i--) { > final Transformation<?> pre = predecessors.get(i); > if (pre instanceof TimestampsAndWatermarksTransformation) { > TimestampsAndWatermarksTransformation<?> > timestampsAndWatermarksTransformation = > (TimestampsAndWatermarksTransformation<?>) pre; > final WatermarkStrategy<?> waStrat = > timestampsAndWatermarksTransformation.getWatermarkStrategy(); > // assert that it generates timestamps or throw exception > if (!waStrat.isEventTime()) { > // TODO: Custom exception > throw new IllegalArgumentException( > "Cannot use an EventTime window with a preceding > water mark generator which" > + " does not ingest event times. Did you use > noWatermarks() as the WatermarkStrategy" > + " and used EventTime windows such as > SlidingEventTimeWindows/SlidingEventTimeWindows ?" > + " These windows will never window any > values as your stream does not support event time" > ); > } > // We have to terminate the check now as we have found the first > most recent > // timestamp assigner for this window and ensured that it > actually adds event > // time stamps. If there has been previously in the chain a > window assigner > // such as noWatermarks() we can safely ignore it as another > valid event time watermark assigner > // exists in the chain after and before our current event time > window. > break; > } > } > } > {code} > > Then we can update the constructors of > [AllWindowedStream|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java#L112] > and > [WindowedStream|https://github.com/apache/flink/blob/2cb477343de5dce70978c0add5ec58edbaec157c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java#L79] > to: > {code:java} > if (windowAssigner.isEventTime()) { > > EventTimeWindowPreconditions.hasPrecedingEventTimeGenerator(input.getTransformation().getInputs()); > } > {code} > This is the approach I currently have in mind but not sure whether this is > the best approach. > Best regards, > Dario > > -- This message was sent by Atlassian Jira (v8.20.1#820001)