Hey Till, Thanks for your response!
I have created a ticket https://issues.apache.org/jira/browse/FLINK-24623 and added
a proposal to address this issue! Best regards, Dario On 22.10.21 17:14, Till Rohrmann wrote:
Hi Dario, I think adding a sanity check that checks whether the user uses some event time constructs and prints a warning if WatermarkStrategy.noWatermarks() is set, could be a good improvement. It might even be ok to fail to make this misconfiguration explicit. Hence, I think it is a good idea to open a ticket for this problem. If you have a proposal for how to address this issue, then post it on the ticket. Cheers, Till On Thu, Oct 21, 2021 at 4:40 PM Dario Heinisch <[email protected]> wrote:Hey there, When one uses .window(TumblingEventTimeWindows.of(SOME_TIME)) it will never window any values if the user uses WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks() .withTimestampAssigner((t, timestamp) -> t.f0) ) I was wondering whether Flink should throw an Exception at the start of the programming and prevent the use of it as no values would ever reach the process function. If so I would create a ticket and love to work on it. Here is an example: public class PlaygroundJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); DataStreamSource<Tuple2<Long, Integer>> source = env.addSource(new SourceFunction<Tuple2<Long, Integer>>() { @Override public void run(SourceContext<Tuple2<Long, Integer>> sourceContext) throws Exception { int i = 0; while (true) { Tuple2<Long, Integer> tuple = Tuple2.of(System.currentTimeMillis(), i++ % 10); sourceContext.collect(tuple); } } @Override public void cancel() { } }); source.assignTimestampsAndWatermarks( // Switch noWatermarks() to forMonotonousTimestamps() // and values are being printed. WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks() .withTimestampAssigner((t, timestamp) -> t.f0) ).keyBy(t -> t.f1) .window(TumblingEventTimeWindows.of(Time.seconds(1))) .process(new ProcessWindowFunction<Tuple2<Long, Integer>, String, Integer, TimeWindow>() { @Override public void process(Integer key, Context context, Iterable<Tuple2<Long, Integer>> iterable, Collector<String> out) throws Exception { int count = 0; Iterator<Tuple2<Long, Integer>> iter = iterable.iterator(); while (iter.hasNext()) { count++; iter.next(); } out.collect("Key: " + key + " count: " + count); } }).print(); env.execute(); } } Best regards, Dario
