Hey Till,

Thanks for your response!

I have created a ticket https://issues.apache.org/jira/browse/FLINK-24623 and added
a proposal to address this issue!

Best regards,

Dario

On 22.10.21 17:14, Till Rohrmann wrote:
Hi Dario,

I think adding a sanity check that checks whether the user uses some event
time constructs and prints a warning if WatermarkStrategy.noWatermarks() is
set, could be a good improvement. It might even be ok to fail to make this
misconfiguration explicit. Hence, I think it is a good idea to open a
ticket for this problem. If you have a proposal for how to address this
issue, then post it on the ticket.

Cheers,
Till

On Thu, Oct 21, 2021 at 4:40 PM Dario Heinisch <dario.heini...@gmail.com>
wrote:

Hey there,

When one uses .window(TumblingEventTimeWindows.of(SOME_TIME)) it will
never window any values if the user uses

WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks()
                          .withTimestampAssigner((t, timestamp) -> t.f0)
)

I was wondering whether Flink should throw an Exception at the start of
the programming and prevent the use of it as
no values would ever reach the process function.

If so I would create a ticket and love to work on it.

Here is an example:

public class PlaygroundJob {
      public static void main(String[] args) throws Exception {
          StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new
Configuration());

          DataStreamSource<Tuple2<Long, Integer>> source =
env.addSource(new SourceFunction<Tuple2<Long, Integer>>() {
              @Override
              public void run(SourceContext<Tuple2<Long, Integer>>
sourceContext) throws Exception {
                  int i = 0;
                  while (true) {
                      Tuple2<Long, Integer> tuple =
Tuple2.of(System.currentTimeMillis(), i++ % 10);
                      sourceContext.collect(tuple);
                  }
              }

              @Override
              public void cancel() {
              }

          });

          source.assignTimestampsAndWatermarks(
                  // Switch noWatermarks() to forMonotonousTimestamps()
                  // and values are being printed.
                  WatermarkStrategy.<Tuple2<Long, Integer>>noWatermarks()
                          .withTimestampAssigner((t, timestamp) -> t.f0)
                  ).keyBy(t -> t.f1)
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
                  .process(new ProcessWindowFunction<Tuple2<Long,
Integer>, String, Integer, TimeWindow>() {
                      @Override
                      public void process(Integer key, Context context,
Iterable<Tuple2<Long, Integer>> iterable, Collector<String> out) throws
Exception {
                          int count = 0;
                          Iterator<Tuple2<Long, Integer>> iter =
iterable.iterator();
                          while (iter.hasNext()) {
                              count++;
                              iter.next();
                          }

                          out.collect("Key: " + key + " count: " + count);

                      }
                  }).print();

          env.execute();
      }
}

Best regards,

Dario


Reply via email to