Hi Dario,

I think adding a sanity check that checks whether the user uses some event
time constructs and prints a warning if WatermarkStrategy.noWatermarks() is
set, could be a good improvement. It might even be ok to fail to make this
misconfiguration explicit. Hence, I think it is a good idea to open a
ticket for this problem. If you have a proposal for how to address this
issue, then post it on the ticket.

Cheers,
Till

On Thu, Oct 21, 2021 at 4:40 PM Dario Heinisch <dario.heini...@gmail.com>
wrote:

> Hey there,
>
> When one uses .window(TumblingEventTimeWindows.of(SOME_TIME)) it will
> never window any values if the user uses
>
> WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks()
>                          .withTimestampAssigner((t, timestamp) -> t.f0)
> )
>
> I was wondering whether Flink should throw an Exception at the start of
> the programming and prevent the use of it as
> no values would ever reach the process function.
>
> If so I would create a ticket and love to work on it.
>
> Here is an example:
>
> public class PlaygroundJob {
>      public static void main(String[] args) throws Exception {
>          StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
> Configuration());
>
>          DataStreamSource<Tuple2<Long, Integer>> source =
> env.addSource(new SourceFunction<Tuple2<Long, Integer>>() {
>              @Override
>              public void run(SourceContext<Tuple2<Long, Integer>>
> sourceContext) throws Exception {
>                  int i = 0;
>                  while (true) {
>                      Tuple2<Long, Integer> tuple =
> Tuple2.of(System.currentTimeMillis(), i++ % 10);
>                      sourceContext.collect(tuple);
>                  }
>              }
>
>              @Override
>              public void cancel() {
>              }
>
>          });
>
>          source.assignTimestampsAndWatermarks(
>                  // Switch noWatermarks() to forMonotonousTimestamps()
>                  // and values are being printed.
>                  WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks()
>                          .withTimestampAssigner((t, timestamp) -> t.f0)
>                  ).keyBy(t -> t.f1)
> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>                  .process(new ProcessWindowFunction<Tuple2<Long,
> Integer>, String, Integer, TimeWindow>() {
>                      @Override
>                      public void process(Integer key, Context context,
> Iterable<Tuple2<Long, Integer>> iterable, Collector<String> out) throws
> Exception {
>                          int count = 0;
>                          Iterator<Tuple2<Long, Integer>> iter =
> iterable.iterator();
>                          while (iter.hasNext()) {
>                              count++;
>                              iter.next();
>                          }
>
>                          out.collect("Key: " + key + " count: " + count);
>
>                      }
>                  }).print();
>
>          env.execute();
>      }
> }
>
> Best regards,
>
> Dario
>
>

Reply via email to