I totally agree with Schwalbe that per-partition watermarking allows #
source tasks < # kafka partitions.

Otherwise, Dan, you should suspect other possibilities like what Schwalbe
said.

Best,

Dongwon

On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi San, Dongwon,
>
>
>
> I share the opinion that when per-partition watermarking is enabled, you
> should observe correct behavior … would be interesting to see why it does
> not work for you.
>
>
>
> I’d like to clear one tiny misconception here when you write:
>
>
>
> >> - The same issue happens even if I use an idle watermark.
>
>
>
> You would expect to see glitches with watermarking when you enable
> idleness.
>
> Idleness sort of trades watermark correctness for reduces latency when
> processing timers (much simplified).
>
> With idleness enabled you have no guaranties whatsoever as to the quality
> of watermarks (which might be ok in some cases).
>
> BTW we dominantly use a mix of fast and slow sources (that only update
> once a day) which hand-pimped watermarking and late event processing, and
> enabling idleness would break everything.
>
>
>
> Oversight put aside things should work the way you implemented it.
>
>
>
> One thing I could imagine to be a cause is
>
>    - that over time the kafka partitions get reassigned  to different
>    consumer subtasks which would probably stress correct recalculation of
>    watermarks. Hence #partition == number subtask might reduce the problem
>    - can you enable logging of partition-consumer assignment, to see if
>    that is the cause of the problem
>    - also involuntary restarts of the job can cause havoc as this resets
>    watermarking
>
>
>
> I’ll be off next week, unable to take part in the active discussion …
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
> *From:* Dan Hill <quietgol...@gmail.com>
> *Sent:* Freitag, 18. März 2022 08:23
> *To:* Dongwon Kim <eastcirc...@gmail.com>
> *Cc:* user <user@flink.apache.org>
> *Subject:* Re: Weird Flink Kafka source watermark behavior
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> I'll try forcing # source tasks = # partitions tomorrow.
>
>
>
> Thank you, Dongwon, for all of your help!
>
>
>
> On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim <eastcirc...@gmail.com>
> wrote:
>
> I believe your job with per-partition watermarking should be working okay
> even in a backfill scenario.
>
>
>
> BTW, is the problem still observed even with # sour tasks = # partitions?
>
>
>
> For committers:
>
> Is there a way to confirm that per-partition watermarking is used in TM
> log?
>
>
>
> On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <quietgol...@gmail.com> wrote:
>
> I hit this using event processing and no idleness detection.  The same
> issue happens if I enable idleness.
>
>
>
> My code matches the code example for per-partition watermarking
> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector>
> .
>
>
>
> On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <eastcirc...@gmail.com>
> wrote:
>
> Hi Dan,
>
>
>
> I'm quite confused as you already use per-partition watermarking.
>
>
>
> What I meant in the reply is
>
> - If you don't use per-partition watermarking, # tasks < # partitions can
> cause the problem for backfill jobs.
>
> - If you don't use per-partition watermarking, # tasks = # partitions is
> going to be okay even for backfill jobs.
>
> - If you use per-partition watermarking, # tasks < # partitions shouldn't
> cause any problems unless you turn on the idleness detection.
>
>
>
> Regarding the idleness detection which is based on processing time, what
> is your setting? If you set the value to 10 seconds for example, you'll
> face the same problem unless the watermark of your backfill job catches
> up real-time within 10 seconds. If you increase the value to 1 minute, your
> backfill job should catch up real-time within 1 minute.
>
>
>
> Best,
>
>
>
> Dongwon
>
>
>
>
>
> On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <quietgol...@gmail.com> wrote:
>
> Thanks Dongwon!
>
>
>
> Wow.  Yes, I'm using per-partition watermarking [1].  Yes, my # source
> tasks < # kafka partitions.  This should be called out in the docs or the
> bug should be fixed.
>
>
>
> On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <eastcirc...@gmail.com>
> wrote:
>
> Hi Dan,
>
>
>
> Do you use the per-partition watermarking explained in [1]?
>
> I've also experienced a similar problem when running backfill jobs
> specifically when # source tasks < # kafka partitions.
>
> - When # source tasks = # kafka partitions, the backfill job works as
> expected.
>
> - When # source tasks < # kafka partitions, a Kafka consumer consumes
> multiple partitions. This case can destroying the per-partition patterns as
> explained in [2].
>
>
>
> Hope this helps.
>
>
>
> p.s. If you plan to use the per-partition watermarking, be aware that
> idleness detection [3] can cause another problem when you run a backfill
> job. Kafka source tasks in a backfill job seem to read a batch of records
> from Kafka and then wait for downstream tasks to catch up the progress,
> which can be counted as idleness.
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>
>
>
> Best,
>
>
>
> Dongwon
>
>
>
> On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <quietgol...@gmail.com> wrote:
>
> I'm following the example from this section:
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>
>
>
> On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com> wrote:
>
> Other points
>
> - I'm using the kafka timestamp as event time.
>
> - The same issue happens even if I use an idle watermark.
>
>
>
> On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com> wrote:
>
> There are 12 Kafka partitions (to keep the structure similar to other low
> traffic environments).
>
>
>
> On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com> wrote:
>
> Hi.
>
>
>
> I'm running a backfill from a kafka topic with very few records spread
> across a few days.  I'm seeing a case where the records coming from a kafka
> source have a watermark that's more recent (by hours) than the event time.
> I haven't seen this before when running this.  This violates what I'd
> assume the kafka source would do.
>
>
>
> Example problem:
>
> 1. I have kafka records at ts=1000, 2000, ... 500000.  The actual times
> are separated by a longer time period.
>
> 2.  My first operator after the FlinkKafkaConsumer sees:
>
>    context.timestamp() = 1000
>
>    context.timerService().currentWatermark() = 500000
>
>
>
> Details about how I'm running this:
>
> - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source.
>
> - I'm using FlinkKafkaConsumer
>
> - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s).  No idleness
> settings.
>
> - I'm running similar code in all the environments.  The main difference
> is low traffic.  I have not been able to reproduce this out of the
> environment.
>
>
>
>
>
> I put the following process function right after my kafka source.
>
>
>
> --------
>
>
> AfterSource
>
> ts=1647274892728
> watermark=1647575140007
> record=...
>
>
>
>
> public static class TextLog extends ProcessFunction<Record, Record> {
>     private final String label;
>     public TextLogDeliveryLog(String label) {
>         this.label = label;
>     }
>     @Override
>     public void processElement(Record record, Context context,
> Collector<Record> collector) throws Exception {
>         LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}",
>                 label, context.timestamp(),
> context.timerService().currentWatermark(), record);
>         collector.collect(deliveryLog);
>     }
> }
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to