Hi.

I'm running a backfill from a kafka topic with very few records spread
across a few days.  I'm seeing a case where the records coming from a kafka
source have a watermark that's more recent (by hours) than the event time.
I haven't seen this before when running this.  This violates what I'd
assume the kafka source would do.

Example problem:
1. I have kafka records at ts=1000, 2000, ... 500000.  The actual times are
separated by a longer time period.
2.  My first operator after the FlinkKafkaConsumer sees:
   context.timestamp() = 1000
   context.timerService().currentWatermark() = 500000

Details about how I'm running this:
- I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
- I'm using FlinkKafkaConsumer
- I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No idleness
settings.
- I'm running similar code in all the environments.  The main difference is
low traffic.  I have not been able to reproduce this out of the environment.


I put the following process function right after my kafka source.

--------

AfterSource
ts=1647274892728
watermark=1647575140007
record=...


public static class TextLog extends ProcessFunction<Record, Record> {
    private final String label;
    public TextLogDeliveryLog(String label) {
        this.label = label;
    }
    @Override
    public void processElement(Record record, Context context,
Collector<Record> collector) throws Exception {
        LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
                label, context.timestamp(),
context.timerService().currentWatermark(), record);
        collector.collect(deliveryLog);
    }
}

Reply via email to