I hit this using event processing and no idleness detection. The same issue happens if I enable idleness.
My code matches the code example for per-partition watermarking <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector> . On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <eastcirc...@gmail.com> wrote: > Hi Dan, > > I'm quite confused as you already use per-partition watermarking. > > What I meant in the reply is > - If you don't use per-partition watermarking, # tasks < # partitions can > cause the problem for backfill jobs. > - If you don't use per-partition watermarking, # tasks = # partitions is > going to be okay even for backfill jobs. > - If you use per-partition watermarking, # tasks < # partitions shouldn't > cause any problems unless you turn on the idleness detection. > > Regarding the idleness detection which is based on processing time, what > is your setting? If you set the value to 10 seconds for example, you'll > face the same problem unless the watermark of your backfill job catches > up real-time within 10 seconds. If you increase the value to 1 minute, your > backfill job should catch up real-time within 1 minute. > > Best, > > Dongwon > > > On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <quietgol...@gmail.com> wrote: > >> Thanks Dongwon! >> >> Wow. Yes, I'm using per-partition watermarking [1]. Yes, my # source >> tasks < # kafka partitions. This should be called out in the docs or the >> bug should be fixed. >> >> On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <eastcirc...@gmail.com> >> wrote: >> >>> Hi Dan, >>> >>> Do you use the per-partition watermarking explained in [1]? >>> I've also experienced a similar problem when running backfill jobs >>> specifically when # source tasks < # kafka partitions. >>> - When # source tasks = # kafka partitions, the backfill job works as >>> expected. >>> - When # source tasks < # kafka partitions, a Kafka consumer consumes >>> multiple partitions. This case can destroying the per-partition patterns as >>> explained in [2]. >>> >>> Hope this helps. >>> >>> p.s. If you plan to use the per-partition watermarking, be aware that >>> idleness detection [3] can cause another problem when you run a backfill >>> job. Kafka source tasks in a backfill job seem to read a batch of records >>> from Kafka and then wait for downstream tasks to catch up the progress, >>> which can be counted as idleness. >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie >>> [2] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector >>> [3] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources >>> >>> Best, >>> >>> Dongwon >>> >>> On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <quietgol...@gmail.com> wrote: >>> >>>> I'm following the example from this section: >>>> >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector >>>> >>>> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com> >>>> wrote: >>>> >>>>> Other points >>>>> - I'm using the kafka timestamp as event time. >>>>> - The same issue happens even if I use an idle watermark. >>>>> >>>>> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com> >>>>> wrote: >>>>> >>>>>> There are 12 Kafka partitions (to keep the structure similar to other >>>>>> low traffic environments). >>>>>> >>>>>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi. >>>>>>> >>>>>>> I'm running a backfill from a kafka topic with very few records >>>>>>> spread across a few days. I'm seeing a case where the records coming >>>>>>> from >>>>>>> a kafka source have a watermark that's more recent (by hours) than the >>>>>>> event time. I haven't seen this before when running this. This >>>>>>> violates >>>>>>> what I'd assume the kafka source would do. >>>>>>> >>>>>>> Example problem: >>>>>>> 1. I have kafka records at ts=1000, 2000, ... 500000. The actual >>>>>>> times are separated by a longer time period. >>>>>>> 2. My first operator after the FlinkKafkaConsumer sees: >>>>>>> context.timestamp() = 1000 >>>>>>> context.timerService().currentWatermark() = 500000 >>>>>>> >>>>>>> Details about how I'm running this: >>>>>>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the >>>>>>> source. >>>>>>> - I'm using FlinkKafkaConsumer >>>>>>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s). No >>>>>>> idleness settings. >>>>>>> - I'm running similar code in all the environments. The main >>>>>>> difference is low traffic. I have not been able to reproduce this out >>>>>>> of >>>>>>> the environment. >>>>>>> >>>>>>> >>>>>>> I put the following process function right after my kafka source. >>>>>>> >>>>>>> -------- >>>>>>> >>>>>>> AfterSource >>>>>>> ts=1647274892728 >>>>>>> watermark=1647575140007 >>>>>>> record=... >>>>>>> >>>>>>> >>>>>>> public static class TextLog extends ProcessFunction<Record, Record> { >>>>>>> private final String label; >>>>>>> public TextLogDeliveryLog(String label) { >>>>>>> this.label = label; >>>>>>> } >>>>>>> @Override >>>>>>> public void processElement(Record record, Context context, >>>>>>> Collector<Record> collector) throws Exception { >>>>>>> LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}", >>>>>>> label, context.timestamp(), >>>>>>> context.timerService().currentWatermark(), record); >>>>>>> collector.collect(deliveryLog); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>