Hi.
I'm running a backfill from a kafka topic with very few records spread
across a few days. I'm seeing a case where the records coming from a kafka
source have a watermark that's more recent (by hours) than the event time.
I haven't seen this before when running this. This violates what I'd
assume the kafka source would do.
Example problem:
1. I have kafka records at ts=1000, 2000, ... 500000. The actual times are
separated by a longer time period.
2. My first operator after the FlinkKafkaConsumer sees:
context.timestamp() = 1000
context.timerService().currentWatermark() = 500000
Details about how I'm running this:
- I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
- I'm using FlinkKafkaConsumer
- I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s). No idleness
settings.
- I'm running similar code in all the environments. The main difference is
low traffic. I have not been able to reproduce this out of the environment.
I put the following process function right after my kafka source.
--------
AfterSource
ts=1647274892728
watermark=1647575140007
record=...
public static class TextLog extends ProcessFunction<Record, Record> {
private final String label;
public TextLogDeliveryLog(String label) {
this.label = label;
}
@Override
public void processElement(Record record, Context context,
Collector<Record> collector) throws Exception {
LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
label, context.timestamp(),
context.timerService().currentWatermark(), record);
collector.collect(deliveryLog);
}
}