I'm following the example from this section: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com> wrote: > Other points > - I'm using the kafka timestamp as event time. > - The same issue happens even if I use an idle watermark. > > On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com> wrote: > >> There are 12 Kafka partitions (to keep the structure similar to other low >> traffic environments). >> >> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com> wrote: >> >>> Hi. >>> >>> I'm running a backfill from a kafka topic with very few records spread >>> across a few days. I'm seeing a case where the records coming from a kafka >>> source have a watermark that's more recent (by hours) than the event time. >>> I haven't seen this before when running this. This violates what I'd >>> assume the kafka source would do. >>> >>> Example problem: >>> 1. I have kafka records at ts=1000, 2000, ... 500000. The actual times >>> are separated by a longer time period. >>> 2. My first operator after the FlinkKafkaConsumer sees: >>> context.timestamp() = 1000 >>> context.timerService().currentWatermark() = 500000 >>> >>> Details about how I'm running this: >>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source. >>> - I'm using FlinkKafkaConsumer >>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s). No idleness >>> settings. >>> - I'm running similar code in all the environments. The main difference >>> is low traffic. I have not been able to reproduce this out of the >>> environment. >>> >>> >>> I put the following process function right after my kafka source. >>> >>> -------- >>> >>> AfterSource >>> ts=1647274892728 >>> watermark=1647575140007 >>> record=... >>> >>> >>> public static class TextLog extends ProcessFunction<Record, Record> { >>> private final String label; >>> public TextLogDeliveryLog(String label) { >>> this.label = label; >>> } >>> @Override >>> public void processElement(Record record, Context context, >>> Collector<Record> collector) throws Exception { >>> LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}", >>> label, context.timestamp(), >>> context.timerService().currentWatermark(), record); >>> collector.collect(deliveryLog); >>> } >>> } >>> >>