Hi Dario, I think adding a sanity check that checks whether the user uses some event time constructs and prints a warning if WatermarkStrategy.noWatermarks() is set, could be a good improvement. It might even be ok to fail to make this misconfiguration explicit. Hence, I think it is a good idea to open a ticket for this problem. If you have a proposal for how to address this issue, then post it on the ticket.
Cheers, Till On Thu, Oct 21, 2021 at 4:40 PM Dario Heinisch <dario.heini...@gmail.com> wrote: > Hey there, > > When one uses .window(TumblingEventTimeWindows.of(SOME_TIME)) it will > never window any values if the user uses > > WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks() > .withTimestampAssigner((t, timestamp) -> t.f0) > ) > > I was wondering whether Flink should throw an Exception at the start of > the programming and prevent the use of it as > no values would ever reach the process function. > > If so I would create a ticket and love to work on it. > > Here is an example: > > public class PlaygroundJob { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new > Configuration()); > > DataStreamSource<Tuple2<Long, Integer>> source = > env.addSource(new SourceFunction<Tuple2<Long, Integer>>() { > @Override > public void run(SourceContext<Tuple2<Long, Integer>> > sourceContext) throws Exception { > int i = 0; > while (true) { > Tuple2<Long, Integer> tuple = > Tuple2.of(System.currentTimeMillis(), i++ % 10); > sourceContext.collect(tuple); > } > } > > @Override > public void cancel() { > } > > }); > > source.assignTimestampsAndWatermarks( > // Switch noWatermarks() to forMonotonousTimestamps() > // and values are being printed. > WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks() > .withTimestampAssigner((t, timestamp) -> t.f0) > ).keyBy(t -> t.f1) > .window(TumblingEventTimeWindows.of(Time.seconds(1))) > .process(new ProcessWindowFunction<Tuple2<Long, > Integer>, String, Integer, TimeWindow>() { > @Override > public void process(Integer key, Context context, > Iterable<Tuple2<Long, Integer>> iterable, Collector<String> out) throws > Exception { > int count = 0; > Iterator<Tuple2<Long, Integer>> iter = > iterable.iterator(); > while (iter.hasNext()) { > count++; > iter.next(); > } > > out.collect("Key: " + key + " count: " + count); > > } > }).print(); > > env.execute(); > } > } > > Best regards, > > Dario > >