Re: Weird Flink Kafka source watermark behavior

2022-04-13 Thread Jin Yi
ter
> [] - LogRequestFilter ts: 1649284172067 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,316 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
> >>> 2022-04-08 05:47:06,317 WARN
> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
> [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
> >>>
> >>>
> >>>
> >>> On Sat, Mar 19, 2022 at 10:51 AM Dan Hill 
> wrote:
> >>> I dove deeper.  I wasn't actually using per-partition watermarks.
> Thank you for the help!
> >>>
> >>> On Fri, Mar 18, 2022 at 12:11 PM Dan Hill 
> wrote:
> >>> Thanks, Thias and Dongwon.
> >>>
> >>> I'll keep debugging this with the idle watermark turned off.
> >>>
> >>> Next TODOs:
> >>> - Verify that we’re using per-partition watermarks.  Our code matches
> the example but maybe something is disabling it.
> >>> - Enable logging of partition-consumer assignment, to see if that is
> the cause of the problem.
> >>> - Look at adding flags to set the source parallelism to see if that
> fixes the issue.
> >>>
> >>> Yes, I've seen Flink talks on creating our own watermarks through
> Kafka.  Sounds like a good idea.
> >>>
> >>> On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim 
> wrote:
> >>> I totally agree with Schwalbe that per-partition watermarking allows #
> source tasks < # kafka partitions.
> >>>
> >>> Otherwise, Dan, you should suspect other possibilities like what
> Schwalbe said.
> >>>
> >>> Best,
> >>>
> >>> Dongwon
> >>>
> >>> On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
> >>> Hi San, Dongwon,
> >>>
> >>>
> >>>
> >>> I share the opinion that when per-partition watermarking is enabled,
> you should observe correct behavior … would be interesting to see why it
> does not work for you.
> >>>
> >>>
> >>>
> >>> I’d like to clear one tiny misconception here when you write:
> >>>
> >>>
> >>>
> >>>>> - The same issue happens even if I use an idle watermark.
> >>>
> >>>
> >>>
> >>> You would expect to see glitches with watermarking when you enable
> idleness.
> >>>
> >>> Idleness sort of trades watermark correctness for reduces latency when
> processing timers (much simplified).
> >>>
> >>> With idleness enabled you have no guaranties whatsoever as to the
> quality of watermarks (which might be ok in some cases).
> >>>
> >>> BTW we dominantly use a mix of fast and slow sources (that only update
> once a day) which hand-pimped watermarking and late event processing, and
> enabling idleness would break everything.
> >>>
> >>>
> >>>
> >>> Oversight put aside things should work the way you implemented it.
> >>>
> >>>
> >>>
> >>> One thing I could imagine to be a cause is
> >>>
> >>>  • that over time the kafka partitions get reassigned  to
> different consumer subtasks which would probably stress correct
> recalculation of watermarks. Hence #partition == number subtask might
> reduce the problem
> >>>  • can you enable logging of partition-consumer assignment, to see
> if that is the cause of the problem
> >>>  • also involuntary restarts of the job can cause havoc as this
> resets watermarking
> >>>
> >>>
> >>> I’ll be off next week, unable to take part in the active discussion …
> >>>
> >>>
> >>>
> >>> Sincere greetings
> >>>
> >>>
> >>>
> >>> Thias
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> From: Dan Hill 
> >>> Sent: Freitag, 18. März 2022 08:23
> >>> To: Dongwon Kim 
> >>> Cc: user 
> >>> Subject: Re: Weird Flink Kafka source watermark behavior
> >>>
> >>>
> >>&

Re: Weird Flink Kafka source watermark behavior

2022-04-13 Thread Qingsheng Ren
>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
>>> [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
>>> 2022-04-08 05:47:06,316 WARN  
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
>>> [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
>>> 2022-04-08 05:47:06,317 WARN  
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter 
>>> [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
>>> 
>>> 
>>> 
>>> On Sat, Mar 19, 2022 at 10:51 AM Dan Hill  wrote:
>>> I dove deeper.  I wasn't actually using per-partition watermarks.  Thank 
>>> you for the help!
>>> 
>>> On Fri, Mar 18, 2022 at 12:11 PM Dan Hill  wrote:
>>> Thanks, Thias and Dongwon.
>>> 
>>> I'll keep debugging this with the idle watermark turned off.
>>> 
>>> Next TODOs:
>>> - Verify that we’re using per-partition watermarks.  Our code matches the 
>>> example but maybe something is disabling it.
>>> - Enable logging of partition-consumer assignment, to see if that is the 
>>> cause of the problem.
>>> - Look at adding flags to set the source parallelism to see if that fixes 
>>> the issue.
>>> 
>>> Yes, I've seen Flink talks on creating our own watermarks through Kafka.  
>>> Sounds like a good idea.
>>> 
>>> On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim  wrote:
>>> I totally agree with Schwalbe that per-partition watermarking allows # 
>>> source tasks < # kafka partitions. 
>>> 
>>> Otherwise, Dan, you should suspect other possibilities like what Schwalbe 
>>> said.
>>> 
>>> Best,
>>> 
>>> Dongwon
>>> 
>>> On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias 
>>>  wrote:
>>> Hi San, Dongwon,
>>> 
>>> 
>>> 
>>> I share the opinion that when per-partition watermarking is enabled, you 
>>> should observe correct behavior … would be interesting to see why it does 
>>> not work for you.
>>> 
>>> 
>>> 
>>> I’d like to clear one tiny misconception here when you write:
>>> 
>>> 
>>> 
>>>>> - The same issue happens even if I use an idle watermark.
>>> 
>>> 
>>> 
>>> You would expect to see glitches with watermarking when you enable idleness.
>>> 
>>> Idleness sort of trades watermark correctness for reduces latency when 
>>> processing timers (much simplified).
>>> 
>>> With idleness enabled you have no guaranties whatsoever as to the quality 
>>> of watermarks (which might be ok in some cases).
>>> 
>>> BTW we dominantly use a mix of fast and slow sources (that only update once 
>>> a day) which hand-pimped watermarking and late event processing, and 
>>> enabling idleness would break everything.
>>> 
>>> 
>>> 
>>> Oversight put aside things should work the way you implemented it.
>>> 
>>> 
>>> 
>>> One thing I could imagine to be a cause is
>>> 
>>>  • that over time the kafka partitions get reassigned  to different 
>>> consumer subtasks which would probably stress correct recalculation of 
>>> watermarks. Hence #partition == number subtask might reduce the problem
>>>  • can you enable logging of partition-consumer assignment, to see if 
>>> that is the cause of the problem
>>>  • also involuntary restarts of the job can cause havoc as this resets 
>>> watermarking
>>> 
>>> 
>>> I’ll be off next week, unable to take part in the active discussion …
>>> 
>>> 
>>> 
>>> Sincere greetings
>>> 
>>> 
>>> 
>>> Thias
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> From: Dan Hill  
>>> Sent: Freitag, 18. März 2022 08:23
>>> To: Dongwon Kim 
>>> Cc: user 
>>> Subject: Re: Weird Flink Kafka source watermark behavior
>>> 
>>> 
>>> 
>>> ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
>>> 
>>> 
>>> 
>>> I'll try forcing # source tasks = # partitions tomorrow.
>>> 
>>> 
>>> 
>>> Thank you, Dongwon, for all of your help!
>>> 
>>> 
>>> 
>>> On Fri, Mar 18, 2022 at 12:20 AM Dongwo

Re: Weird Flink Kafka source watermark behavior

2022-04-13 Thread Qingsheng Ren
1649284187140
> > 
> > 
> > 
> > On Sat, Mar 19, 2022 at 10:51 AM Dan Hill  wrote:
> > I dove deeper.  I wasn't actually using per-partition watermarks.  Thank 
> > you for the help!
> > 
> > On Fri, Mar 18, 2022 at 12:11 PM Dan Hill  wrote:
> > Thanks, Thias and Dongwon.
> > 
> > I'll keep debugging this with the idle watermark turned off.
> > 
> > Next TODOs:
> > - Verify that we’re using per-partition watermarks.  Our code matches the 
> > example but maybe something is disabling it.
> > - Enable logging of partition-consumer assignment, to see if that is the 
> > cause of the problem.
> > - Look at adding flags to set the source parallelism to see if that fixes 
> > the issue.
> > 
> > Yes, I've seen Flink talks on creating our own watermarks through Kafka.  
> > Sounds like a good idea.
> > 
> > On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim  wrote:
> > I totally agree with Schwalbe that per-partition watermarking allows # 
> > source tasks < # kafka partitions. 
> > 
> > Otherwise, Dan, you should suspect other possibilities like what Schwalbe 
> > said.
> > 
> > Best,
> > 
> > Dongwon
> > 
> > On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias 
> >  wrote:
> > Hi San, Dongwon,
> > 
> >  
> > 
> > I share the opinion that when per-partition watermarking is enabled, you 
> > should observe correct behavior … would be interesting to see why it does 
> > not work for you.
> > 
> >  
> > 
> > I’d like to clear one tiny misconception here when you write:
> > 
> >  
> > 
> > >> - The same issue happens even if I use an idle watermark.
> > 
> >  
> > 
> > You would expect to see glitches with watermarking when you enable idleness.
> > 
> > Idleness sort of trades watermark correctness for reduces latency when 
> > processing timers (much simplified).
> > 
> > With idleness enabled you have no guaranties whatsoever as to the quality 
> > of watermarks (which might be ok in some cases).
> > 
> > BTW we dominantly use a mix of fast and slow sources (that only update once 
> > a day) which hand-pimped watermarking and late event processing, and 
> > enabling idleness would break everything.
> > 
> >  
> > 
> > Oversight put aside things should work the way you implemented it.
> > 
> >  
> > 
> > One thing I could imagine to be a cause is
> > 
> >   • that over time the kafka partitions get reassigned  to different 
> > consumer subtasks which would probably stress correct recalculation of 
> > watermarks. Hence #partition == number subtask might reduce the problem
> >   • can you enable logging of partition-consumer assignment, to see if 
> > that is the cause of the problem
> >   • also involuntary restarts of the job can cause havoc as this resets 
> > watermarking
> >  
> > 
> > I’ll be off next week, unable to take part in the active discussion …
> > 
> >  
> > 
> > Sincere greetings
> > 
> >  
> > 
> > Thias
> > 
> >  
> > 
> >  
> > 
> >  
> > 
> >  
> > 
> > From: Dan Hill  
> > Sent: Freitag, 18. März 2022 08:23
> > To: Dongwon Kim 
> > Cc: user 
> > Subject: Re: Weird Flink Kafka source watermark behavior
> > 
> >  
> > 
> > ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
> > 
> >  
> > 
> > I'll try forcing # source tasks = # partitions tomorrow.
> > 
> >  
> > 
> > Thank you, Dongwon, for all of your help!
> > 
> >  
> > 
> > On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim  wrote:
> > 
> > I believe your job with per-partition watermarking should be working okay 
> > even in a backfill scenario. 
> > 
> >  
> > 
> > BTW, is the problem still observed even with # sour tasks = # partitions?
> > 
> >  
> > 
> > For committers:
> > 
> > Is there a way to confirm that per-partition watermarking is used in TM log?
> > 
> >  
> > 
> > On Fri, Mar 18, 2022 at 4:14 PM Dan Hill  wrote:
> > 
> > I hit this using event processing and no idleness detection.  The same 
> > issue happens if I enable idleness.
> > 
> >  
> > 
> > My code matches the code example for per-partition watermarking.
> > 
> >  
> > 
> > On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim  wrote:
> > 
> > Hi Dan,
> > 
>

Re: Weird Flink Kafka source watermark behavior

2022-04-08 Thread Martijn Visser
.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172171 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,316 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172174 watermark: 1649284187140
>>> > 2022-04-08 05:47:06,317 WARN
>>> ai.promoted.metrics.logprocessor.common.functions.filter.LogRequestFilter
>>> [] - LogRequestFilter ts: 1649284172666 watermark: 1649284187140
>>> >
>>> >
>>> >
>>> > On Sat, Mar 19, 2022 at 10:51 AM Dan Hill 
>>> wrote:
>>> > I dove deeper.  I wasn't actually using per-partition watermarks.
>>> Thank you for the help!
>>> >
>>> > On Fri, Mar 18, 2022 at 12:11 PM Dan Hill 
>>> wrote:
>>> > Thanks, Thias and Dongwon.
>>> >
>>> > I'll keep debugging this with the idle watermark turned off.
>>> >
>>> > Next TODOs:
>>> > - Verify that we’re using per-partition watermarks.  Our code matches
>>> the example but maybe something is disabling it.
>>> > - Enable logging of partition-consumer assignment, to see if that is
>>> the cause of the problem.
>>> > - Look at adding flags to set the source parallelism to see if that
>>> fixes the issue.
>>> >
>>> > Yes, I've seen Flink talks on creating our own watermarks through
>>> Kafka.  Sounds like a good idea.
>>> >
>>> > On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim 
>>> wrote:
>>> > I totally agree with Schwalbe that per-partition watermarking allows #
>>> source tasks < # kafka partitions.
>>> >
>>> > Otherwise, Dan, you should suspect other possibilities like what
>>> Schwalbe said.
>>> >
>>> > Best,
>>> >
>>> > Dongwon
>>> >
>>> > On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias <
>>> matthias.schwa...@viseca.ch> wrote:
>>> > Hi San, Dongwon,
>>> >
>>> >
>>> >
>>> > I share the opinion that when per-partition watermarking is enabled,
>>> you should observe correct behavior … would be interesting to see why it
>>> does not work for you.
>>> >
>>> >
>>> >
>>> > I’d like to clear one tiny misconception here when you write:
>>> >
>>> >
>>> >
>>> > >> - The same issue happens even if I use an idle watermark.
>>> >
>>> >
>>> >
>>> > You would expect to see glitches with watermarking when you enable
>>> idleness.
>>> >
>>> > Idleness sort of trades watermark correctness for reduces latency when
>>> processing timers (much simplified).
>>> >
>>> > With idleness enabled you have no guaranties whatsoever as to the
>>> quality of watermarks (which might be ok in some cases).
>>> >
>>> > BTW we dominantly use a mix of fast and slow sources (that only update
>>> once a day) which hand-pimped watermarking and late event processing, and
>>> enabling idleness would break everything.
>>> >
>>> >
>>> >
>>> > Oversight put aside things should work the way you implemented it.
>>> >
>>> >
>>> >
>>> > One thing I could imagine to be a cause is
>>> >
>>> >   • that over time the kafka partitions get reassigned  to
>>> different consumer subtasks which would probably stress correct
>>> recalculation of watermarks. Hence #partition == number subtask might
>>> reduce the problem
>>> >   • can you enable logging of partition-consumer assignment, to
>>> see if that is the cause of the problem
>>> >   • also involuntary restarts of the job can cause havoc as this
>>> resets watermarking
>>> >
>>> >
>>> > I’ll be off next week, unable to take part in the active discussion …
>>> >
>>> >
>>> >
>>> > Sincere greetings
>>> >
>>> >
>>> >
>>> > Thias
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > From: Dan Hill 
>>> > Sent: Freitag, 18. März 2022 08:23
>>> > To: Dongwon Kim 
>>> > Cc: user 

Re: Weird Flink Kafka source watermark behavior

2022-04-08 Thread Jin Yi
.  I wasn't actually using per-partition watermarks.
>> Thank you for the help!
>> >
>> > On Fri, Mar 18, 2022 at 12:11 PM Dan Hill 
>> wrote:
>> > Thanks, Thias and Dongwon.
>> >
>> > I'll keep debugging this with the idle watermark turned off.
>> >
>> > Next TODOs:
>> > - Verify that we’re using per-partition watermarks.  Our code matches
>> the example but maybe something is disabling it.
>> > - Enable logging of partition-consumer assignment, to see if that is
>> the cause of the problem.
>> > - Look at adding flags to set the source parallelism to see if that
>> fixes the issue.
>> >
>> > Yes, I've seen Flink talks on creating our own watermarks through
>> Kafka.  Sounds like a good idea.
>> >
>> > On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim 
>> wrote:
>> > I totally agree with Schwalbe that per-partition watermarking allows #
>> source tasks < # kafka partitions.
>> >
>> > Otherwise, Dan, you should suspect other possibilities like what
>> Schwalbe said.
>> >
>> > Best,
>> >
>> > Dongwon
>> >
>> > On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias <
>> matthias.schwa...@viseca.ch> wrote:
>> > Hi San, Dongwon,
>> >
>> >
>> >
>> > I share the opinion that when per-partition watermarking is enabled,
>> you should observe correct behavior … would be interesting to see why it
>> does not work for you.
>> >
>> >
>> >
>> > I’d like to clear one tiny misconception here when you write:
>> >
>> >
>> >
>> > >> - The same issue happens even if I use an idle watermark.
>> >
>> >
>> >
>> > You would expect to see glitches with watermarking when you enable
>> idleness.
>> >
>> > Idleness sort of trades watermark correctness for reduces latency when
>> processing timers (much simplified).
>> >
>> > With idleness enabled you have no guaranties whatsoever as to the
>> quality of watermarks (which might be ok in some cases).
>> >
>> > BTW we dominantly use a mix of fast and slow sources (that only update
>> once a day) which hand-pimped watermarking and late event processing, and
>> enabling idleness would break everything.
>> >
>> >
>> >
>> > Oversight put aside things should work the way you implemented it.
>> >
>> >
>> >
>> > One thing I could imagine to be a cause is
>> >
>> >   • that over time the kafka partitions get reassigned  to
>> different consumer subtasks which would probably stress correct
>> recalculation of watermarks. Hence #partition == number subtask might
>> reduce the problem
>> >   • can you enable logging of partition-consumer assignment, to see
>> if that is the cause of the problem
>> >   • also involuntary restarts of the job can cause havoc as this
>> resets watermarking
>> >
>> >
>> > I’ll be off next week, unable to take part in the active discussion …
>> >
>> >
>> >
>> > Sincere greetings
>> >
>> >
>> >
>> > Thias
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > From: Dan Hill 
>> > Sent: Freitag, 18. März 2022 08:23
>> > To: Dongwon Kim 
>> > Cc: user 
>> > Subject: Re: Weird Flink Kafka source watermark behavior
>> >
>> >
>> >
>> > ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
>> >
>> >
>> >
>> > I'll try forcing # source tasks = # partitions tomorrow.
>> >
>> >
>> >
>> > Thank you, Dongwon, for all of your help!
>> >
>> >
>> >
>> > On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim 
>> wrote:
>> >
>> > I believe your job with per-partition watermarking should be working
>> okay even in a backfill scenario.
>> >
>> >
>> >
>> > BTW, is the problem still observed even with # sour tasks = #
>> partitions?
>> >
>> >
>> >
>> > For committers:
>> >
>> > Is there a way to confirm that per-partition watermarking is used in TM
>> log?
>> >
>> >
>> >
>> > On Fri, Mar 18, 2022 at 4:14 PM Dan Hill  wrote:
>> >
>> > I hit this using event processing and no idleness detection.  The 

Re: Weird Flink Kafka source watermark behavior

2022-04-08 Thread Jin Yi
ism to see if that
> fixes the issue.
> >
> > Yes, I've seen Flink talks on creating our own watermarks through
> Kafka.  Sounds like a good idea.
> >
> > On Fri, Mar 18, 2022 at 1:17 AM Dongwon Kim 
> wrote:
> > I totally agree with Schwalbe that per-partition watermarking allows #
> source tasks < # kafka partitions.
> >
> > Otherwise, Dan, you should suspect other possibilities like what
> Schwalbe said.
> >
> > Best,
> >
> > Dongwon
> >
> > On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
> > Hi San, Dongwon,
> >
> >
> >
> > I share the opinion that when per-partition watermarking is enabled, you
> should observe correct behavior … would be interesting to see why it does
> not work for you.
> >
> >
> >
> > I’d like to clear one tiny misconception here when you write:
> >
> >
> >
> > >> - The same issue happens even if I use an idle watermark.
> >
> >
> >
> > You would expect to see glitches with watermarking when you enable
> idleness.
> >
> > Idleness sort of trades watermark correctness for reduces latency when
> processing timers (much simplified).
> >
> > With idleness enabled you have no guaranties whatsoever as to the
> quality of watermarks (which might be ok in some cases).
> >
> > BTW we dominantly use a mix of fast and slow sources (that only update
> once a day) which hand-pimped watermarking and late event processing, and
> enabling idleness would break everything.
> >
> >
> >
> > Oversight put aside things should work the way you implemented it.
> >
> >
> >
> > One thing I could imagine to be a cause is
> >
> >   • that over time the kafka partitions get reassigned  to different
> consumer subtasks which would probably stress correct recalculation of
> watermarks. Hence #partition == number subtask might reduce the problem
> >   • can you enable logging of partition-consumer assignment, to see
> if that is the cause of the problem
> >   • also involuntary restarts of the job can cause havoc as this
> resets watermarking
> >
> >
> > I’ll be off next week, unable to take part in the active discussion …
> >
> >
> >
> > Sincere greetings
> >
> >
> >
> > Thias
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > From: Dan Hill 
> > Sent: Freitag, 18. März 2022 08:23
> > To: Dongwon Kim 
> > Cc: user 
> > Subject: Re: Weird Flink Kafka source watermark behavior
> >
> >
> >
> > ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
> >
> >
> >
> > I'll try forcing # source tasks = # partitions tomorrow.
> >
> >
> >
> > Thank you, Dongwon, for all of your help!
> >
> >
> >
> > On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim 
> wrote:
> >
> > I believe your job with per-partition watermarking should be working
> okay even in a backfill scenario.
> >
> >
> >
> > BTW, is the problem still observed even with # sour tasks = # partitions?
> >
> >
> >
> > For committers:
> >
> > Is there a way to confirm that per-partition watermarking is used in TM
> log?
> >
> >
> >
> > On Fri, Mar 18, 2022 at 4:14 PM Dan Hill  wrote:
> >
> > I hit this using event processing and no idleness detection.  The same
> issue happens if I enable idleness.
> >
> >
> >
> > My code matches the code example for per-partition watermarking.
> >
> >
> >
> > On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim 
> wrote:
> >
> > Hi Dan,
> >
> >
> >
> > I'm quite confused as you already use per-partition watermarking.
> >
> >
> >
> > What I meant in the reply is
> >
> > - If you don't use per-partition watermarking, # tasks < # partitions
> can cause the problem for backfill jobs.
> >
> > - If you don't use per-partition watermarking, # tasks = # partitions is
> going to be okay even for backfill jobs.
> >
> > - If you use per-partition watermarking, # tasks < # partitions
> shouldn't cause any problems unless you turn on the idleness detection.
> >
> >
> >
> > Regarding the idleness detection which is based on processing time, what
> is your setting? If you set the value to 10 seconds for example, you'll
> face the same problem unless the watermark of your backfill job catches up
> real-time within 10 seconds. If you in

Re: Weird Flink Kafka source watermark behavior

2022-04-08 Thread Qingsheng Ren
t; I’d like to clear one tiny misconception here when you write:
> 
>  
> 
> >> - The same issue happens even if I use an idle watermark.
> 
>  
> 
> You would expect to see glitches with watermarking when you enable idleness.
> 
> Idleness sort of trades watermark correctness for reduces latency when 
> processing timers (much simplified).
> 
> With idleness enabled you have no guaranties whatsoever as to the quality of 
> watermarks (which might be ok in some cases).
> 
> BTW we dominantly use a mix of fast and slow sources (that only update once a 
> day) which hand-pimped watermarking and late event processing, and enabling 
> idleness would break everything.
> 
>  
> 
> Oversight put aside things should work the way you implemented it.
> 
>  
> 
> One thing I could imagine to be a cause is
> 
>   • that over time the kafka partitions get reassigned  to different 
> consumer subtasks which would probably stress correct recalculation of 
> watermarks. Hence #partition == number subtask might reduce the problem
>   • can you enable logging of partition-consumer assignment, to see if 
> that is the cause of the problem
>   • also involuntary restarts of the job can cause havoc as this resets 
> watermarking
>  
> 
> I’ll be off next week, unable to take part in the active discussion …
> 
>  
> 
> Sincere greetings
> 
>  
> 
> Thias
> 
>  
> 
>  
> 
>  
> 
>  
> 
> From: Dan Hill  
> Sent: Freitag, 18. März 2022 08:23
> To: Dongwon Kim 
> Cc: user 
> Subject: Re: Weird Flink Kafka source watermark behavior
> 
>  
> 
> ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
> 
>  
> 
> I'll try forcing # source tasks = # partitions tomorrow.
> 
>  
> 
> Thank you, Dongwon, for all of your help!
> 
>  
> 
> On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim  wrote:
> 
> I believe your job with per-partition watermarking should be working okay 
> even in a backfill scenario. 
> 
>  
> 
> BTW, is the problem still observed even with # sour tasks = # partitions?
> 
>  
> 
> For committers:
> 
> Is there a way to confirm that per-partition watermarking is used in TM log?
> 
>  
> 
> On Fri, Mar 18, 2022 at 4:14 PM Dan Hill  wrote:
> 
> I hit this using event processing and no idleness detection.  The same issue 
> happens if I enable idleness.
> 
>  
> 
> My code matches the code example for per-partition watermarking.
> 
>  
> 
> On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim  wrote:
> 
> Hi Dan,
> 
>  
> 
> I'm quite confused as you already use per-partition watermarking.
> 
>  
> 
> What I meant in the reply is
> 
> - If you don't use per-partition watermarking, # tasks < # partitions can 
> cause the problem for backfill jobs.
> 
> - If you don't use per-partition watermarking, # tasks = # partitions is 
> going to be okay even for backfill jobs.
> 
> - If you use per-partition watermarking, # tasks < # partitions shouldn't 
> cause any problems unless you turn on the idleness detection.
> 
>  
> 
> Regarding the idleness detection which is based on processing time, what is 
> your setting? If you set the value to 10 seconds for example, you'll face the 
> same problem unless the watermark of your backfill job catches up real-time 
> within 10 seconds. If you increase the value to 1 minute, your backfill job 
> should catch up real-time within 1 minute.
> 
>  
> 
> Best,
> 
>  
> 
> Dongwon
> 
>  
> 
>  
> 
> On Fri, Mar 18, 2022 at 3:51 PM Dan Hill  wrote:
> 
> Thanks Dongwon!
> 
>  
> 
> Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source tasks 
> < # kafka partitions.  This should be called out in the docs or the bug 
> should be fixed.
> 
>  
> 
> On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim  wrote:
> 
> Hi Dan,
> 
>  
> 
> Do you use the per-partition watermarking explained in [1]?
> 
> I've also experienced a similar problem when running backfill jobs 
> specifically when # source tasks < # kafka partitions. 
> 
> - When # source tasks = # kafka partitions, the backfill job works as 
> expected.
> 
> - When # source tasks < # kafka partitions, a Kafka consumer consumes 
> multiple partitions. This case can destroying the per-partition patterns as 
> explained in [2].
> 
>  
> 
> Hope this helps.
> 
>  
> 
> p.s. If you plan to use the per-partition watermarking, be aware that 
> idleness detection [3] can cause another problem when you run a backfill job. 
> Kafka source tasks in a backfill job seem to read a batch of records from 
>

Re: Weird Flink Kafka source watermark behavior

2022-03-18 Thread Dongwon Kim
I totally agree with Schwalbe that per-partition watermarking allows #
source tasks < # kafka partitions.

Otherwise, Dan, you should suspect other possibilities like what Schwalbe
said.

Best,

Dongwon

On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi San, Dongwon,
>
>
>
> I share the opinion that when per-partition watermarking is enabled, you
> should observe correct behavior … would be interesting to see why it does
> not work for you.
>
>
>
> I’d like to clear one tiny misconception here when you write:
>
>
>
> >> - The same issue happens even if I use an idle watermark.
>
>
>
> You would expect to see glitches with watermarking when you enable
> idleness.
>
> Idleness sort of trades watermark correctness for reduces latency when
> processing timers (much simplified).
>
> With idleness enabled you have no guaranties whatsoever as to the quality
> of watermarks (which might be ok in some cases).
>
> BTW we dominantly use a mix of fast and slow sources (that only update
> once a day) which hand-pimped watermarking and late event processing, and
> enabling idleness would break everything.
>
>
>
> Oversight put aside things should work the way you implemented it.
>
>
>
> One thing I could imagine to be a cause is
>
>- that over time the kafka partitions get reassigned  to different
>consumer subtasks which would probably stress correct recalculation of
>watermarks. Hence #partition == number subtask might reduce the problem
>- can you enable logging of partition-consumer assignment, to see if
>that is the cause of the problem
>- also involuntary restarts of the job can cause havoc as this resets
>watermarking
>
>
>
> I’ll be off next week, unable to take part in the active discussion …
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Dan Hill 
> *Sent:* Freitag, 18. März 2022 08:23
> *To:* Dongwon Kim 
> *Cc:* user 
> *Subject:* Re: Weird Flink Kafka source watermark behavior
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> I'll try forcing # source tasks = # partitions tomorrow.
>
>
>
> Thank you, Dongwon, for all of your help!
>
>
>
> On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim 
> wrote:
>
> I believe your job with per-partition watermarking should be working okay
> even in a backfill scenario.
>
>
>
> BTW, is the problem still observed even with # sour tasks = # partitions?
>
>
>
> For committers:
>
> Is there a way to confirm that per-partition watermarking is used in TM
> log?
>
>
>
> On Fri, Mar 18, 2022 at 4:14 PM Dan Hill  wrote:
>
> I hit this using event processing and no idleness detection.  The same
> issue happens if I enable idleness.
>
>
>
> My code matches the code example for per-partition watermarking
> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector>
> .
>
>
>
> On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim 
> wrote:
>
> Hi Dan,
>
>
>
> I'm quite confused as you already use per-partition watermarking.
>
>
>
> What I meant in the reply is
>
> - If you don't use per-partition watermarking, # tasks < # partitions can
> cause the problem for backfill jobs.
>
> - If you don't use per-partition watermarking, # tasks = # partitions is
> going to be okay even for backfill jobs.
>
> - If you use per-partition watermarking, # tasks < # partitions shouldn't
> cause any problems unless you turn on the idleness detection.
>
>
>
> Regarding the idleness detection which is based on processing time, what
> is your setting? If you set the value to 10 seconds for example, you'll
> face the same problem unless the watermark of your backfill job catches
> up real-time within 10 seconds. If you increase the value to 1 minute, your
> backfill job should catch up real-time within 1 minute.
>
>
>
> Best,
>
>
>
> Dongwon
>
>
>
>
>
> On Fri, Mar 18, 2022 at 3:51 PM Dan Hill  wrote:
>
> Thanks Dongwon!
>
>
>
> Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source
> tasks < # kafka partitions.  This should be called out in the docs or the
> bug should be fixed.
>
>
>
> On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim 
> wrote:
>
> Hi Dan,
>
>
>
> Do you use the per-partition watermarking explained in [1]?
>
> I've also experienced a similar problem when running backfill jobs
> specifically when # source tasks

RE: Weird Flink Kafka source watermark behavior

2022-03-18 Thread Schwalbe Matthias
Oops mistyped your name, Dan

From: Schwalbe Matthias
Sent: Freitag, 18. März 2022 09:02
To: 'Dan Hill' ; Dongwon Kim 
Cc: user 
Subject: RE: Weird Flink Kafka source watermark behavior

Hi San, Dongwon,

I share the opinion that when per-partition watermarking is enabled, you should 
observe correct behavior … would be interesting to see why it does not work for 
you.

I’d like to clear one tiny misconception here when you write:

>> - The same issue happens even if I use an idle watermark.

You would expect to see glitches with watermarking when you enable idleness.
Idleness sort of trades watermark correctness for reduces latency when 
processing timers (much simplified).
With idleness enabled you have no guaranties whatsoever as to the quality of 
watermarks (which might be ok in some cases).
BTW we dominantly use a mix of fast and slow sources (that only update once a 
day) which hand-pimped watermarking and late event processing, and enabling 
idleness would break everything.

Oversight put aside things should work the way you implemented it.

One thing I could imagine to be a cause is

  *   that over time the kafka partitions get reassigned  to different consumer 
subtasks which would probably stress correct recalculation of watermarks. Hence 
#partition == number subtask might reduce the problem
  *   can you enable logging of partition-consumer assignment, to see if that 
is the cause of the problem
  *   also involuntary restarts of the job can cause havoc as this resets 
watermarking

I’ll be off next week, unable to take part in the active discussion …

Sincere greetings

Thias




From: Dan Hill mailto:quietgol...@gmail.com>>
Sent: Freitag, 18. März 2022 08:23
To: Dongwon Kim mailto:eastcirc...@gmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Weird Flink Kafka source watermark behavior

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


I'll try forcing # source tasks = # partitions tomorrow.

Thank you, Dongwon, for all of your help!

On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim 
mailto:eastcirc...@gmail.com>> wrote:
I believe your job with per-partition watermarking should be working okay even 
in a backfill scenario.

BTW, is the problem still observed even with # sour tasks = # partitions?

For committers:
Is there a way to confirm that per-partition watermarking is used in TM log?

On Fri, Mar 18, 2022 at 4:14 PM Dan Hill 
mailto:quietgol...@gmail.com>> wrote:
I hit this using event processing and no idleness detection.  The same issue 
happens if I enable idleness.

My code matches the code example for per-partition 
watermarking<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector>.

On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim 
mailto:eastcirc...@gmail.com>> wrote:
Hi Dan,

I'm quite confused as you already use per-partition watermarking.

What I meant in the reply is
- If you don't use per-partition watermarking, # tasks < # partitions can cause 
the problem for backfill jobs.
- If you don't use per-partition watermarking, # tasks = # partitions is going 
to be okay even for backfill jobs.
- If you use per-partition watermarking, # tasks < # partitions shouldn't cause 
any problems unless you turn on the idleness detection.

Regarding the idleness detection which is based on processing time, what is 
your setting? If you set the value to 10 seconds for example, you'll face the 
same problem unless the watermark of your backfill job catches up real-time 
within 10 seconds. If you increase the value to 1 minute, your backfill job 
should catch up real-time within 1 minute.

Best,

Dongwon


On Fri, Mar 18, 2022 at 3:51 PM Dan Hill 
mailto:quietgol...@gmail.com>> wrote:
Thanks Dongwon!

Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source tasks < 
# kafka partitions.  This should be called out in the docs or the bug should be 
fixed.

On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim 
mailto:eastcirc...@gmail.com>> wrote:
Hi Dan,

Do you use the per-partition watermarking explained in [1]?
I've also experienced a similar problem when running backfill jobs specifically 
when # source tasks < # kafka partitions.
- When # source tasks = # kafka partitions, the backfill job works as expected.
- When # source tasks < # kafka partitions, a Kafka consumer consumes multiple 
partitions. This case can destroying the per-partition patterns as explained in 
[2].

Hope this helps.

p.s. If you plan to use the per-partition watermarking, be aware that idleness 
detection [3] can cause another problem when you run a backfill job. Kafka 
source tasks in a backfill job seem to read a batch of records from Kafka and 
then wait for downstream tasks to catch up the progress, which can be counted 
as idleness.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datast

RE: Weird Flink Kafka source watermark behavior

2022-03-18 Thread Schwalbe Matthias
Hi San, Dongwon,

I share the opinion that when per-partition watermarking is enabled, you should 
observe correct behavior … would be interesting to see why it does not work for 
you.

I’d like to clear one tiny misconception here when you write:

>> - The same issue happens even if I use an idle watermark.

You would expect to see glitches with watermarking when you enable idleness.
Idleness sort of trades watermark correctness for reduces latency when 
processing timers (much simplified).
With idleness enabled you have no guaranties whatsoever as to the quality of 
watermarks (which might be ok in some cases).
BTW we dominantly use a mix of fast and slow sources (that only update once a 
day) which hand-pimped watermarking and late event processing, and enabling 
idleness would break everything.

Oversight put aside things should work the way you implemented it.

One thing I could imagine to be a cause is

  *   that over time the kafka partitions get reassigned  to different consumer 
subtasks which would probably stress correct recalculation of watermarks. Hence 
#partition == number subtask might reduce the problem
  *   can you enable logging of partition-consumer assignment, to see if that 
is the cause of the problem
  *   also involuntary restarts of the job can cause havoc as this resets 
watermarking

I’ll be off next week, unable to take part in the active discussion …

Sincere greetings

Thias




From: Dan Hill 
Sent: Freitag, 18. März 2022 08:23
To: Dongwon Kim 
Cc: user 
Subject: Re: Weird Flink Kafka source watermark behavior

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


I'll try forcing # source tasks = # partitions tomorrow.

Thank you, Dongwon, for all of your help!

On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim 
mailto:eastcirc...@gmail.com>> wrote:
I believe your job with per-partition watermarking should be working okay even 
in a backfill scenario.

BTW, is the problem still observed even with # sour tasks = # partitions?

For committers:
Is there a way to confirm that per-partition watermarking is used in TM log?

On Fri, Mar 18, 2022 at 4:14 PM Dan Hill 
mailto:quietgol...@gmail.com>> wrote:
I hit this using event processing and no idleness detection.  The same issue 
happens if I enable idleness.

My code matches the code example for per-partition 
watermarking<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector>.

On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim 
mailto:eastcirc...@gmail.com>> wrote:
Hi Dan,

I'm quite confused as you already use per-partition watermarking.

What I meant in the reply is
- If you don't use per-partition watermarking, # tasks < # partitions can cause 
the problem for backfill jobs.
- If you don't use per-partition watermarking, # tasks = # partitions is going 
to be okay even for backfill jobs.
- If you use per-partition watermarking, # tasks < # partitions shouldn't cause 
any problems unless you turn on the idleness detection.

Regarding the idleness detection which is based on processing time, what is 
your setting? If you set the value to 10 seconds for example, you'll face the 
same problem unless the watermark of your backfill job catches up real-time 
within 10 seconds. If you increase the value to 1 minute, your backfill job 
should catch up real-time within 1 minute.

Best,

Dongwon


On Fri, Mar 18, 2022 at 3:51 PM Dan Hill 
mailto:quietgol...@gmail.com>> wrote:
Thanks Dongwon!

Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source tasks < 
# kafka partitions.  This should be called out in the docs or the bug should be 
fixed.

On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim 
mailto:eastcirc...@gmail.com>> wrote:
Hi Dan,

Do you use the per-partition watermarking explained in [1]?
I've also experienced a similar problem when running backfill jobs specifically 
when # source tasks < # kafka partitions.
- When # source tasks = # kafka partitions, the backfill job works as expected.
- When # source tasks < # kafka partitions, a Kafka consumer consumes multiple 
partitions. This case can destroying the per-partition patterns as explained in 
[2].

Hope this helps.

p.s. If you plan to use the per-partition watermarking, be aware that idleness 
detection [3] can cause another problem when you run a backfill job. Kafka 
source tasks in a backfill job seem to read a batch of records from Kafka and 
then wait for downstream tasks to catch up the progress, which can be counted 
as idleness.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
[3] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-ti

Re: Weird Flink Kafka source watermark behavior

2022-03-18 Thread Dan Hill
I'll try forcing # source tasks = # partitions tomorrow.

Thank you, Dongwon, for all of your help!

On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim  wrote:

> I believe your job with per-partition watermarking should be working okay
> even in a backfill scenario.
>
> BTW, is the problem still observed even with # sour tasks = # partitions?
>
> For committers:
> Is there a way to confirm that per-partition watermarking is used in TM
> log?
>
> On Fri, Mar 18, 2022 at 4:14 PM Dan Hill  wrote:
>
>> I hit this using event processing and no idleness detection.  The same
>> issue happens if I enable idleness.
>>
>> My code matches the code example for per-partition watermarking
>> 
>> .
>>
>>
>> On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim 
>> wrote:
>>
>>> Hi Dan,
>>>
>>> I'm quite confused as you already use per-partition watermarking.
>>>
>>> What I meant in the reply is
>>> - If you don't use per-partition watermarking, # tasks < # partitions
>>> can cause the problem for backfill jobs.
>>> - If you don't use per-partition watermarking, # tasks = # partitions is
>>> going to be okay even for backfill jobs.
>>> - If you use per-partition watermarking, # tasks < # partitions
>>> shouldn't cause any problems unless you turn on the idleness detection.
>>>
>>> Regarding the idleness detection which is based on processing time, what
>>> is your setting? If you set the value to 10 seconds for example, you'll
>>> face the same problem unless the watermark of your backfill job catches
>>> up real-time within 10 seconds. If you increase the value to 1 minute, your
>>> backfill job should catch up real-time within 1 minute.
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>>
>>> On Fri, Mar 18, 2022 at 3:51 PM Dan Hill  wrote:
>>>
 Thanks Dongwon!

 Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source
 tasks < # kafka partitions.  This should be called out in the docs or the
 bug should be fixed.

 On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim 
 wrote:

> Hi Dan,
>
> Do you use the per-partition watermarking explained in [1]?
> I've also experienced a similar problem when running backfill jobs
> specifically when # source tasks < # kafka partitions.
> - When # source tasks = # kafka partitions, the backfill job works as
> expected.
> - When # source tasks < # kafka partitions, a Kafka consumer consumes
> multiple partitions. This case can destroying the per-partition patterns 
> as
> explained in [2].
>
> Hope this helps.
>
> p.s. If you plan to use the per-partition watermarking, be aware that
> idleness detection [3] can cause another problem when you run a backfill
> job. Kafka source tasks in a backfill job seem to read a batch of records
> from Kafka and then wait for downstream tasks to catch up the progress,
> which can be counted as idleness.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>
> Best,
>
> Dongwon
>
> On Fri, Mar 18, 2022 at 2:35 PM Dan Hill 
> wrote:
>
>> I'm following the example from this section:
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>
>> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill 
>> wrote:
>>
>>> Other points
>>> - I'm using the kafka timestamp as event time.
>>> - The same issue happens even if I use an idle watermark.
>>>
>>> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill 
>>> wrote:
>>>
 There are 12 Kafka partitions (to keep the structure similar to
 other low traffic environments).

 On Thu, Mar 17, 2022 at 10:13 PM Dan Hill 
 wrote:

> Hi.
>
> I'm running a backfill from a kafka topic with very few records
> spread across a few days.  I'm seeing a case where the records coming 
> from
> a kafka source have a watermark that's more recent (by hours) than the
> event time.  I haven't seen this before when running this.  This 
> violates
> what I'd assume the kafka source would do.
>
> Example problem:
> 1. I have kafka records at ts=1000, 2000, ... 50.  The actual
> times are separated by a longer time period.
> 2.  My first operator after the 

Re: Weird Flink Kafka source watermark behavior

2022-03-18 Thread Dongwon Kim
I believe your job with per-partition watermarking should be working okay
even in a backfill scenario.

BTW, is the problem still observed even with # sour tasks = # partitions?

For committers:
Is there a way to confirm that per-partition watermarking is used in TM log?

On Fri, Mar 18, 2022 at 4:14 PM Dan Hill  wrote:

> I hit this using event processing and no idleness detection.  The same
> issue happens if I enable idleness.
>
> My code matches the code example for per-partition watermarking
> 
> .
>
>
> On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim 
> wrote:
>
>> Hi Dan,
>>
>> I'm quite confused as you already use per-partition watermarking.
>>
>> What I meant in the reply is
>> - If you don't use per-partition watermarking, # tasks < # partitions can
>> cause the problem for backfill jobs.
>> - If you don't use per-partition watermarking, # tasks = # partitions is
>> going to be okay even for backfill jobs.
>> - If you use per-partition watermarking, # tasks < # partitions shouldn't
>> cause any problems unless you turn on the idleness detection.
>>
>> Regarding the idleness detection which is based on processing time, what
>> is your setting? If you set the value to 10 seconds for example, you'll
>> face the same problem unless the watermark of your backfill job catches
>> up real-time within 10 seconds. If you increase the value to 1 minute, your
>> backfill job should catch up real-time within 1 minute.
>>
>> Best,
>>
>> Dongwon
>>
>>
>> On Fri, Mar 18, 2022 at 3:51 PM Dan Hill  wrote:
>>
>>> Thanks Dongwon!
>>>
>>> Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source
>>> tasks < # kafka partitions.  This should be called out in the docs or the
>>> bug should be fixed.
>>>
>>> On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim 
>>> wrote:
>>>
 Hi Dan,

 Do you use the per-partition watermarking explained in [1]?
 I've also experienced a similar problem when running backfill jobs
 specifically when # source tasks < # kafka partitions.
 - When # source tasks = # kafka partitions, the backfill job works as
 expected.
 - When # source tasks < # kafka partitions, a Kafka consumer consumes
 multiple partitions. This case can destroying the per-partition patterns as
 explained in [2].

 Hope this helps.

 p.s. If you plan to use the per-partition watermarking, be aware that
 idleness detection [3] can cause another problem when you run a backfill
 job. Kafka source tasks in a backfill job seem to read a batch of records
 from Kafka and then wait for downstream tasks to catch up the progress,
 which can be counted as idleness.

 [1]
 https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
 [2]
 https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
 [3]
 https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources

 Best,

 Dongwon

 On Fri, Mar 18, 2022 at 2:35 PM Dan Hill  wrote:

> I'm following the example from this section:
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>
> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill 
> wrote:
>
>> Other points
>> - I'm using the kafka timestamp as event time.
>> - The same issue happens even if I use an idle watermark.
>>
>> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill 
>> wrote:
>>
>>> There are 12 Kafka partitions (to keep the structure similar to
>>> other low traffic environments).
>>>
>>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill 
>>> wrote:
>>>
 Hi.

 I'm running a backfill from a kafka topic with very few records
 spread across a few days.  I'm seeing a case where the records coming 
 from
 a kafka source have a watermark that's more recent (by hours) than the
 event time.  I haven't seen this before when running this.  This 
 violates
 what I'd assume the kafka source would do.

 Example problem:
 1. I have kafka records at ts=1000, 2000, ... 50.  The actual
 times are separated by a longer time period.
 2.  My first operator after the FlinkKafkaConsumer sees:
context.timestamp() = 1000
context.timerService().currentWatermark() = 50

 Details about how I'm running this:
 - I'm on Flink 1.12.3 that's running on EKS and using MSK as the
 source.
 - I'm using 

Re: Weird Flink Kafka source watermark behavior

2022-03-18 Thread Dan Hill
I hit this using event processing and no idleness detection.  The same
issue happens if I enable idleness.

My code matches the code example for per-partition watermarking

.


On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim  wrote:

> Hi Dan,
>
> I'm quite confused as you already use per-partition watermarking.
>
> What I meant in the reply is
> - If you don't use per-partition watermarking, # tasks < # partitions can
> cause the problem for backfill jobs.
> - If you don't use per-partition watermarking, # tasks = # partitions is
> going to be okay even for backfill jobs.
> - If you use per-partition watermarking, # tasks < # partitions shouldn't
> cause any problems unless you turn on the idleness detection.
>
> Regarding the idleness detection which is based on processing time, what
> is your setting? If you set the value to 10 seconds for example, you'll
> face the same problem unless the watermark of your backfill job catches
> up real-time within 10 seconds. If you increase the value to 1 minute, your
> backfill job should catch up real-time within 1 minute.
>
> Best,
>
> Dongwon
>
>
> On Fri, Mar 18, 2022 at 3:51 PM Dan Hill  wrote:
>
>> Thanks Dongwon!
>>
>> Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source
>> tasks < # kafka partitions.  This should be called out in the docs or the
>> bug should be fixed.
>>
>> On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim 
>> wrote:
>>
>>> Hi Dan,
>>>
>>> Do you use the per-partition watermarking explained in [1]?
>>> I've also experienced a similar problem when running backfill jobs
>>> specifically when # source tasks < # kafka partitions.
>>> - When # source tasks = # kafka partitions, the backfill job works as
>>> expected.
>>> - When # source tasks < # kafka partitions, a Kafka consumer consumes
>>> multiple partitions. This case can destroying the per-partition patterns as
>>> explained in [2].
>>>
>>> Hope this helps.
>>>
>>> p.s. If you plan to use the per-partition watermarking, be aware that
>>> idleness detection [3] can cause another problem when you run a backfill
>>> job. Kafka source tasks in a backfill job seem to read a batch of records
>>> from Kafka and then wait for downstream tasks to catch up the progress,
>>> which can be counted as idleness.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>> [3]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Fri, Mar 18, 2022 at 2:35 PM Dan Hill  wrote:
>>>
 I'm following the example from this section:

 https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector

 On Thu, Mar 17, 2022 at 10:26 PM Dan Hill 
 wrote:

> Other points
> - I'm using the kafka timestamp as event time.
> - The same issue happens even if I use an idle watermark.
>
> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill 
> wrote:
>
>> There are 12 Kafka partitions (to keep the structure similar to other
>> low traffic environments).
>>
>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill 
>> wrote:
>>
>>> Hi.
>>>
>>> I'm running a backfill from a kafka topic with very few records
>>> spread across a few days.  I'm seeing a case where the records coming 
>>> from
>>> a kafka source have a watermark that's more recent (by hours) than the
>>> event time.  I haven't seen this before when running this.  This 
>>> violates
>>> what I'd assume the kafka source would do.
>>>
>>> Example problem:
>>> 1. I have kafka records at ts=1000, 2000, ... 50.  The actual
>>> times are separated by a longer time period.
>>> 2.  My first operator after the FlinkKafkaConsumer sees:
>>>context.timestamp() = 1000
>>>context.timerService().currentWatermark() = 50
>>>
>>> Details about how I'm running this:
>>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the
>>> source.
>>> - I'm using FlinkKafkaConsumer
>>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No
>>> idleness settings.
>>> - I'm running similar code in all the environments.  The main
>>> difference is low traffic.  I have not been able to reproduce this out 
>>> of
>>> the environment.
>>>
>>>
>>> I put the following process function right after my kafka source.
>>>
>>> 
>>>
>>> AfterSource
>>> 

Re: Weird Flink Kafka source watermark behavior

2022-03-18 Thread Dongwon Kim
Hi Dan,

I'm quite confused as you already use per-partition watermarking.

What I meant in the reply is
- If you don't use per-partition watermarking, # tasks < # partitions can
cause the problem for backfill jobs.
- If you don't use per-partition watermarking, # tasks = # partitions is
going to be okay even for backfill jobs.
- If you use per-partition watermarking, # tasks < # partitions shouldn't
cause any problems unless you turn on the idleness detection.

Regarding the idleness detection which is based on processing time, what is
your setting? If you set the value to 10 seconds for example, you'll face
the same problem unless the watermark of your backfill job catches
up real-time within 10 seconds. If you increase the value to 1 minute, your
backfill job should catch up real-time within 1 minute.

Best,

Dongwon


On Fri, Mar 18, 2022 at 3:51 PM Dan Hill  wrote:

> Thanks Dongwon!
>
> Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source
> tasks < # kafka partitions.  This should be called out in the docs or the
> bug should be fixed.
>
> On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim 
> wrote:
>
>> Hi Dan,
>>
>> Do you use the per-partition watermarking explained in [1]?
>> I've also experienced a similar problem when running backfill jobs
>> specifically when # source tasks < # kafka partitions.
>> - When # source tasks = # kafka partitions, the backfill job works as
>> expected.
>> - When # source tasks < # kafka partitions, a Kafka consumer consumes
>> multiple partitions. This case can destroying the per-partition patterns as
>> explained in [2].
>>
>> Hope this helps.
>>
>> p.s. If you plan to use the per-partition watermarking, be aware that
>> idleness detection [3] can cause another problem when you run a backfill
>> job. Kafka source tasks in a backfill job seem to read a batch of records
>> from Kafka and then wait for downstream tasks to catch up the progress,
>> which can be counted as idleness.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>> [3]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>>
>> Best,
>>
>> Dongwon
>>
>> On Fri, Mar 18, 2022 at 2:35 PM Dan Hill  wrote:
>>
>>> I'm following the example from this section:
>>>
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>>
>>> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill  wrote:
>>>
 Other points
 - I'm using the kafka timestamp as event time.
 - The same issue happens even if I use an idle watermark.

 On Thu, Mar 17, 2022 at 10:17 PM Dan Hill 
 wrote:

> There are 12 Kafka partitions (to keep the structure similar to other
> low traffic environments).
>
> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill 
> wrote:
>
>> Hi.
>>
>> I'm running a backfill from a kafka topic with very few records
>> spread across a few days.  I'm seeing a case where the records coming 
>> from
>> a kafka source have a watermark that's more recent (by hours) than the
>> event time.  I haven't seen this before when running this.  This violates
>> what I'd assume the kafka source would do.
>>
>> Example problem:
>> 1. I have kafka records at ts=1000, 2000, ... 50.  The actual
>> times are separated by a longer time period.
>> 2.  My first operator after the FlinkKafkaConsumer sees:
>>context.timestamp() = 1000
>>context.timerService().currentWatermark() = 50
>>
>> Details about how I'm running this:
>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the
>> source.
>> - I'm using FlinkKafkaConsumer
>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No
>> idleness settings.
>> - I'm running similar code in all the environments.  The main
>> difference is low traffic.  I have not been able to reproduce this out of
>> the environment.
>>
>>
>> I put the following process function right after my kafka source.
>>
>> 
>>
>> AfterSource
>> ts=1647274892728
>> watermark=1647575140007
>> record=...
>>
>>
>> public static class TextLog extends ProcessFunction {
>> private final String label;
>> public TextLogDeliveryLog(String label) {
>> this.label = label;
>> }
>> @Override
>> public void processElement(Record record, Context context,
>> Collector collector) throws Exception {
>> LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
>> label, context.timestamp(),
>> 

Re: Weird Flink Kafka source watermark behavior

2022-03-18 Thread Dan Hill
Thanks Dongwon!

Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source
tasks < # kafka partitions.  This should be called out in the docs or the
bug should be fixed.

On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim  wrote:

> Hi Dan,
>
> Do you use the per-partition watermarking explained in [1]?
> I've also experienced a similar problem when running backfill jobs
> specifically when # source tasks < # kafka partitions.
> - When # source tasks = # kafka partitions, the backfill job works as
> expected.
> - When # source tasks < # kafka partitions, a Kafka consumer consumes
> multiple partitions. This case can destroying the per-partition patterns as
> explained in [2].
>
> Hope this helps.
>
> p.s. If you plan to use the per-partition watermarking, be aware that
> idleness detection [3] can cause another problem when you run a backfill
> job. Kafka source tasks in a backfill job seem to read a batch of records
> from Kafka and then wait for downstream tasks to catch up the progress,
> which can be counted as idleness.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>
> Best,
>
> Dongwon
>
> On Fri, Mar 18, 2022 at 2:35 PM Dan Hill  wrote:
>
>> I'm following the example from this section:
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>
>> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill  wrote:
>>
>>> Other points
>>> - I'm using the kafka timestamp as event time.
>>> - The same issue happens even if I use an idle watermark.
>>>
>>> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill  wrote:
>>>
 There are 12 Kafka partitions (to keep the structure similar to other
 low traffic environments).

 On Thu, Mar 17, 2022 at 10:13 PM Dan Hill 
 wrote:

> Hi.
>
> I'm running a backfill from a kafka topic with very few records spread
> across a few days.  I'm seeing a case where the records coming from a 
> kafka
> source have a watermark that's more recent (by hours) than the event time.
> I haven't seen this before when running this.  This violates what I'd
> assume the kafka source would do.
>
> Example problem:
> 1. I have kafka records at ts=1000, 2000, ... 50.  The actual
> times are separated by a longer time period.
> 2.  My first operator after the FlinkKafkaConsumer sees:
>context.timestamp() = 1000
>context.timerService().currentWatermark() = 50
>
> Details about how I'm running this:
> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the
> source.
> - I'm using FlinkKafkaConsumer
> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No
> idleness settings.
> - I'm running similar code in all the environments.  The main
> difference is low traffic.  I have not been able to reproduce this out of
> the environment.
>
>
> I put the following process function right after my kafka source.
>
> 
>
> AfterSource
> ts=1647274892728
> watermark=1647575140007
> record=...
>
>
> public static class TextLog extends ProcessFunction {
> private final String label;
> public TextLogDeliveryLog(String label) {
> this.label = label;
> }
> @Override
> public void processElement(Record record, Context context,
> Collector collector) throws Exception {
> LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
> label, context.timestamp(),
> context.timerService().currentWatermark(), record);
> collector.collect(deliveryLog);
> }
> }
>



Re: Weird Flink Kafka source watermark behavior

2022-03-17 Thread Dongwon Kim
Hi Dan,

Do you use the per-partition watermarking explained in [1]?
I've also experienced a similar problem when running backfill jobs
specifically when # source tasks < # kafka partitions.
- When # source tasks = # kafka partitions, the backfill job works as
expected.
- When # source tasks < # kafka partitions, a Kafka consumer consumes
multiple partitions. This case can destroying the per-partition patterns as
explained in [2].

Hope this helps.

p.s. If you plan to use the per-partition watermarking, be aware that
idleness detection [3] can cause another problem when you run a backfill
job. Kafka source tasks in a backfill job seem to read a batch of records
from Kafka and then wait for downstream tasks to catch up the progress,
which can be counted as idleness.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources

Best,

Dongwon

On Fri, Mar 18, 2022 at 2:35 PM Dan Hill  wrote:

> I'm following the example from this section:
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>
> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill  wrote:
>
>> Other points
>> - I'm using the kafka timestamp as event time.
>> - The same issue happens even if I use an idle watermark.
>>
>> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill  wrote:
>>
>>> There are 12 Kafka partitions (to keep the structure similar to other
>>> low traffic environments).
>>>
>>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill  wrote:
>>>
 Hi.

 I'm running a backfill from a kafka topic with very few records spread
 across a few days.  I'm seeing a case where the records coming from a kafka
 source have a watermark that's more recent (by hours) than the event time.
 I haven't seen this before when running this.  This violates what I'd
 assume the kafka source would do.

 Example problem:
 1. I have kafka records at ts=1000, 2000, ... 50.  The actual times
 are separated by a longer time period.
 2.  My first operator after the FlinkKafkaConsumer sees:
context.timestamp() = 1000
context.timerService().currentWatermark() = 50

 Details about how I'm running this:
 - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
 - I'm using FlinkKafkaConsumer
 - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No
 idleness settings.
 - I'm running similar code in all the environments.  The main
 difference is low traffic.  I have not been able to reproduce this out of
 the environment.


 I put the following process function right after my kafka source.

 

 AfterSource
 ts=1647274892728
 watermark=1647575140007
 record=...


 public static class TextLog extends ProcessFunction {
 private final String label;
 public TextLogDeliveryLog(String label) {
 this.label = label;
 }
 @Override
 public void processElement(Record record, Context context,
 Collector collector) throws Exception {
 LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
 label, context.timestamp(),
 context.timerService().currentWatermark(), record);
 collector.collect(deliveryLog);
 }
 }

>>>


Re: Weird Flink Kafka source watermark behavior

2022-03-17 Thread Dan Hill
I'm following the example from this section:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector

On Thu, Mar 17, 2022 at 10:26 PM Dan Hill  wrote:

> Other points
> - I'm using the kafka timestamp as event time.
> - The same issue happens even if I use an idle watermark.
>
> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill  wrote:
>
>> There are 12 Kafka partitions (to keep the structure similar to other low
>> traffic environments).
>>
>> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill  wrote:
>>
>>> Hi.
>>>
>>> I'm running a backfill from a kafka topic with very few records spread
>>> across a few days.  I'm seeing a case where the records coming from a kafka
>>> source have a watermark that's more recent (by hours) than the event time.
>>> I haven't seen this before when running this.  This violates what I'd
>>> assume the kafka source would do.
>>>
>>> Example problem:
>>> 1. I have kafka records at ts=1000, 2000, ... 50.  The actual times
>>> are separated by a longer time period.
>>> 2.  My first operator after the FlinkKafkaConsumer sees:
>>>context.timestamp() = 1000
>>>context.timerService().currentWatermark() = 50
>>>
>>> Details about how I'm running this:
>>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
>>> - I'm using FlinkKafkaConsumer
>>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No idleness
>>> settings.
>>> - I'm running similar code in all the environments.  The main difference
>>> is low traffic.  I have not been able to reproduce this out of the
>>> environment.
>>>
>>>
>>> I put the following process function right after my kafka source.
>>>
>>> 
>>>
>>> AfterSource
>>> ts=1647274892728
>>> watermark=1647575140007
>>> record=...
>>>
>>>
>>> public static class TextLog extends ProcessFunction {
>>> private final String label;
>>> public TextLogDeliveryLog(String label) {
>>> this.label = label;
>>> }
>>> @Override
>>> public void processElement(Record record, Context context,
>>> Collector collector) throws Exception {
>>> LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
>>> label, context.timestamp(),
>>> context.timerService().currentWatermark(), record);
>>> collector.collect(deliveryLog);
>>> }
>>> }
>>>
>>


Re: Weird Flink Kafka source watermark behavior

2022-03-17 Thread Dan Hill
Other points
- I'm using the kafka timestamp as event time.
- The same issue happens even if I use an idle watermark.

On Thu, Mar 17, 2022 at 10:17 PM Dan Hill  wrote:

> There are 12 Kafka partitions (to keep the structure similar to other low
> traffic environments).
>
> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill  wrote:
>
>> Hi.
>>
>> I'm running a backfill from a kafka topic with very few records spread
>> across a few days.  I'm seeing a case where the records coming from a kafka
>> source have a watermark that's more recent (by hours) than the event time.
>> I haven't seen this before when running this.  This violates what I'd
>> assume the kafka source would do.
>>
>> Example problem:
>> 1. I have kafka records at ts=1000, 2000, ... 50.  The actual times
>> are separated by a longer time period.
>> 2.  My first operator after the FlinkKafkaConsumer sees:
>>context.timestamp() = 1000
>>context.timerService().currentWatermark() = 50
>>
>> Details about how I'm running this:
>> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
>> - I'm using FlinkKafkaConsumer
>> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No idleness
>> settings.
>> - I'm running similar code in all the environments.  The main difference
>> is low traffic.  I have not been able to reproduce this out of the
>> environment.
>>
>>
>> I put the following process function right after my kafka source.
>>
>> 
>>
>> AfterSource
>> ts=1647274892728
>> watermark=1647575140007
>> record=...
>>
>>
>> public static class TextLog extends ProcessFunction {
>> private final String label;
>> public TextLogDeliveryLog(String label) {
>> this.label = label;
>> }
>> @Override
>> public void processElement(Record record, Context context,
>> Collector collector) throws Exception {
>> LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
>> label, context.timestamp(),
>> context.timerService().currentWatermark(), record);
>> collector.collect(deliveryLog);
>> }
>> }
>>
>


Re: Weird Flink Kafka source watermark behavior

2022-03-17 Thread Dan Hill
There are 12 Kafka partitions (to keep the structure similar to other low
traffic environments).

On Thu, Mar 17, 2022 at 10:13 PM Dan Hill  wrote:

> Hi.
>
> I'm running a backfill from a kafka topic with very few records spread
> across a few days.  I'm seeing a case where the records coming from a kafka
> source have a watermark that's more recent (by hours) than the event time.
> I haven't seen this before when running this.  This violates what I'd
> assume the kafka source would do.
>
> Example problem:
> 1. I have kafka records at ts=1000, 2000, ... 50.  The actual times
> are separated by a longer time period.
> 2.  My first operator after the FlinkKafkaConsumer sees:
>context.timestamp() = 1000
>context.timerService().currentWatermark() = 50
>
> Details about how I'm running this:
> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
> - I'm using FlinkKafkaConsumer
> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No idleness
> settings.
> - I'm running similar code in all the environments.  The main difference
> is low traffic.  I have not been able to reproduce this out of the
> environment.
>
>
> I put the following process function right after my kafka source.
>
> 
>
> AfterSource
> ts=1647274892728
> watermark=1647575140007
> record=...
>
>
> public static class TextLog extends ProcessFunction {
> private final String label;
> public TextLogDeliveryLog(String label) {
> this.label = label;
> }
> @Override
> public void processElement(Record record, Context context,
> Collector collector) throws Exception {
> LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
> label, context.timestamp(),
> context.timerService().currentWatermark(), record);
> collector.collect(deliveryLog);
> }
> }
>