I hit this using event processing and no idleness detection.  The same
issue happens if I enable idleness.

My code matches the code example for per-partition watermarking
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector>
.


On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <eastcirc...@gmail.com> wrote:

> Hi Dan,
>
> I'm quite confused as you already use per-partition watermarking.
>
> What I meant in the reply is
> - If you don't use per-partition watermarking, # tasks < # partitions can
> cause the problem for backfill jobs.
> - If you don't use per-partition watermarking, # tasks = # partitions is
> going to be okay even for backfill jobs.
> - If you use per-partition watermarking, # tasks < # partitions shouldn't
> cause any problems unless you turn on the idleness detection.
>
> Regarding the idleness detection which is based on processing time, what
> is your setting? If you set the value to 10 seconds for example, you'll
> face the same problem unless the watermark of your backfill job catches
> up real-time within 10 seconds. If you increase the value to 1 minute, your
> backfill job should catch up real-time within 1 minute.
>
> Best,
>
> Dongwon
>
>
> On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <quietgol...@gmail.com> wrote:
>
>> Thanks Dongwon!
>>
>> Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source
>> tasks < # kafka partitions.  This should be called out in the docs or the
>> bug should be fixed.
>>
>> On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <eastcirc...@gmail.com>
>> wrote:
>>
>>> Hi Dan,
>>>
>>> Do you use the per-partition watermarking explained in [1]?
>>> I've also experienced a similar problem when running backfill jobs
>>> specifically when # source tasks < # kafka partitions.
>>> - When # source tasks = # kafka partitions, the backfill job works as
>>> expected.
>>> - When # source tasks < # kafka partitions, a Kafka consumer consumes
>>> multiple partitions. This case can destroying the per-partition patterns as
>>> explained in [2].
>>>
>>> Hope this helps.
>>>
>>> p.s. If you plan to use the per-partition watermarking, be aware that
>>> idleness detection [3] can cause another problem when you run a backfill
>>> job. Kafka source tasks in a backfill job seem to read a batch of records
>>> from Kafka and then wait for downstream tasks to catch up the progress,
>>> which can be counted as idleness.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>> [3]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <quietgol...@gmail.com> wrote:
>>>
>>>> I'm following the example from this section:
>>>>
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>>>
>>>> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com>
>>>> wrote:
>>>>
>>>>> Other points
>>>>> - I'm using the kafka timestamp as event time.
>>>>> - The same issue happens even if I use an idle watermark.
>>>>>
>>>>> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> There are 12 Kafka partitions (to keep the structure similar to other
>>>>>> low traffic environments).
>>>>>>
>>>>>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi.
>>>>>>>
>>>>>>> I'm running a backfill from a kafka topic with very few records
>>>>>>> spread across a few days.  I'm seeing a case where the records coming 
>>>>>>> from
>>>>>>> a kafka source have a watermark that's more recent (by hours) than the
>>>>>>> event time.  I haven't seen this before when running this.  This 
>>>>>>> violates
>>>>>>> what I'd assume the kafka source would do.
>>>>>>>
>>>>>>> Example problem:
>>>>>>> 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual
>>>>>>> times are separated by a longer time period.
>>>>>>> 2.  My first operator after the FlinkKafkaConsumer sees:
>>>>>>>    context.timestamp() = 1000
>>>>>>>    context.timerService().currentWatermark() = 500000
>>>>>>>
>>>>>>> Details about how I'm running this:
>>>>>>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the
>>>>>>> source.
>>>>>>> - I'm using FlinkKafkaConsumer
>>>>>>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No
>>>>>>> idleness settings.
>>>>>>> - I'm running similar code in all the environments.  The main
>>>>>>> difference is low traffic.  I have not been able to reproduce this out 
>>>>>>> of
>>>>>>> the environment.
>>>>>>>
>>>>>>>
>>>>>>> I put the following process function right after my kafka source.
>>>>>>>
>>>>>>> --------
>>>>>>>
>>>>>>> AfterSource
>>>>>>> ts=1647274892728
>>>>>>> watermark=1647575140007
>>>>>>> record=...
>>>>>>>
>>>>>>>
>>>>>>> public static class TextLog extends ProcessFunction<Record, Record> {
>>>>>>>     private final String label;
>>>>>>>     public TextLogDeliveryLog(String label) {
>>>>>>>         this.label = label;
>>>>>>>     }
>>>>>>>     @Override
>>>>>>>     public void processElement(Record record, Context context,
>>>>>>> Collector<Record> collector) throws Exception {
>>>>>>>         LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
>>>>>>>                 label, context.timestamp(),
>>>>>>> context.timerService().currentWatermark(), record);
>>>>>>>         collector.collect(deliveryLog);
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>

Reply via email to