I believe your job with per-partition watermarking should be working okay even in a backfill scenario.
BTW, is the problem still observed even with # sour tasks = # partitions? For committers: Is there a way to confirm that per-partition watermarking is used in TM log? On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <[email protected]> wrote: > I hit this using event processing and no idleness detection. The same > issue happens if I enable idleness. > > My code matches the code example for per-partition watermarking > <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector> > . > > > On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <[email protected]> > wrote: > >> Hi Dan, >> >> I'm quite confused as you already use per-partition watermarking. >> >> What I meant in the reply is >> - If you don't use per-partition watermarking, # tasks < # partitions can >> cause the problem for backfill jobs. >> - If you don't use per-partition watermarking, # tasks = # partitions is >> going to be okay even for backfill jobs. >> - If you use per-partition watermarking, # tasks < # partitions shouldn't >> cause any problems unless you turn on the idleness detection. >> >> Regarding the idleness detection which is based on processing time, what >> is your setting? If you set the value to 10 seconds for example, you'll >> face the same problem unless the watermark of your backfill job catches >> up real-time within 10 seconds. If you increase the value to 1 minute, your >> backfill job should catch up real-time within 1 minute. >> >> Best, >> >> Dongwon >> >> >> On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <[email protected]> wrote: >> >>> Thanks Dongwon! >>> >>> Wow. Yes, I'm using per-partition watermarking [1]. Yes, my # source >>> tasks < # kafka partitions. This should be called out in the docs or the >>> bug should be fixed. >>> >>> On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <[email protected]> >>> wrote: >>> >>>> Hi Dan, >>>> >>>> Do you use the per-partition watermarking explained in [1]? >>>> I've also experienced a similar problem when running backfill jobs >>>> specifically when # source tasks < # kafka partitions. >>>> - When # source tasks = # kafka partitions, the backfill job works as >>>> expected. >>>> - When # source tasks < # kafka partitions, a Kafka consumer consumes >>>> multiple partitions. This case can destroying the per-partition patterns as >>>> explained in [2]. >>>> >>>> Hope this helps. >>>> >>>> p.s. If you plan to use the per-partition watermarking, be aware that >>>> idleness detection [3] can cause another problem when you run a backfill >>>> job. Kafka source tasks in a backfill job seem to read a batch of records >>>> from Kafka and then wait for downstream tasks to catch up the progress, >>>> which can be counted as idleness. >>>> >>>> [1] >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie >>>> [2] >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector >>>> [3] >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources >>>> >>>> Best, >>>> >>>> Dongwon >>>> >>>> On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <[email protected]> wrote: >>>> >>>>> I'm following the example from this section: >>>>> >>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector >>>>> >>>>> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <[email protected]> >>>>> wrote: >>>>> >>>>>> Other points >>>>>> - I'm using the kafka timestamp as event time. >>>>>> - The same issue happens even if I use an idle watermark. >>>>>> >>>>>> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> There are 12 Kafka partitions (to keep the structure similar to >>>>>>> other low traffic environments). >>>>>>> >>>>>>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi. >>>>>>>> >>>>>>>> I'm running a backfill from a kafka topic with very few records >>>>>>>> spread across a few days. I'm seeing a case where the records coming >>>>>>>> from >>>>>>>> a kafka source have a watermark that's more recent (by hours) than the >>>>>>>> event time. I haven't seen this before when running this. This >>>>>>>> violates >>>>>>>> what I'd assume the kafka source would do. >>>>>>>> >>>>>>>> Example problem: >>>>>>>> 1. I have kafka records at ts=1000, 2000, ... 500000. The actual >>>>>>>> times are separated by a longer time period. >>>>>>>> 2. My first operator after the FlinkKafkaConsumer sees: >>>>>>>> context.timestamp() = 1000 >>>>>>>> context.timerService().currentWatermark() = 500000 >>>>>>>> >>>>>>>> Details about how I'm running this: >>>>>>>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the >>>>>>>> source. >>>>>>>> - I'm using FlinkKafkaConsumer >>>>>>>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s). No >>>>>>>> idleness settings. >>>>>>>> - I'm running similar code in all the environments. The main >>>>>>>> difference is low traffic. I have not been able to reproduce this out >>>>>>>> of >>>>>>>> the environment. >>>>>>>> >>>>>>>> >>>>>>>> I put the following process function right after my kafka source. >>>>>>>> >>>>>>>> -------- >>>>>>>> >>>>>>>> AfterSource >>>>>>>> ts=1647274892728 >>>>>>>> watermark=1647575140007 >>>>>>>> record=... >>>>>>>> >>>>>>>> >>>>>>>> public static class TextLog extends ProcessFunction<Record, Record> >>>>>>>> { >>>>>>>> private final String label; >>>>>>>> public TextLogDeliveryLog(String label) { >>>>>>>> this.label = label; >>>>>>>> } >>>>>>>> @Override >>>>>>>> public void processElement(Record record, Context context, >>>>>>>> Collector<Record> collector) throws Exception { >>>>>>>> LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}", >>>>>>>> label, context.timestamp(), >>>>>>>> context.timerService().currentWatermark(), record); >>>>>>>> collector.collect(deliveryLog); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>
