I totally agree with Schwalbe that per-partition watermarking allows # source tasks < # kafka partitions.
Otherwise, Dan, you should suspect other possibilities like what Schwalbe said. Best, Dongwon On Fri, Mar 18, 2022 at 5:01 PM Schwalbe Matthias < matthias.schwa...@viseca.ch> wrote: > Hi San, Dongwon, > > > > I share the opinion that when per-partition watermarking is enabled, you > should observe correct behavior … would be interesting to see why it does > not work for you. > > > > I’d like to clear one tiny misconception here when you write: > > > > >> - The same issue happens even if I use an idle watermark. > > > > You would expect to see glitches with watermarking when you enable > idleness. > > Idleness sort of trades watermark correctness for reduces latency when > processing timers (much simplified). > > With idleness enabled you have no guaranties whatsoever as to the quality > of watermarks (which might be ok in some cases). > > BTW we dominantly use a mix of fast and slow sources (that only update > once a day) which hand-pimped watermarking and late event processing, and > enabling idleness would break everything. > > > > Oversight put aside things should work the way you implemented it. > > > > One thing I could imagine to be a cause is > > - that over time the kafka partitions get reassigned to different > consumer subtasks which would probably stress correct recalculation of > watermarks. Hence #partition == number subtask might reduce the problem > - can you enable logging of partition-consumer assignment, to see if > that is the cause of the problem > - also involuntary restarts of the job can cause havoc as this resets > watermarking > > > > I’ll be off next week, unable to take part in the active discussion … > > > > Sincere greetings > > > > Thias > > > > > > > > > > *From:* Dan Hill <quietgol...@gmail.com> > *Sent:* Freitag, 18. März 2022 08:23 > *To:* Dongwon Kim <eastcirc...@gmail.com> > *Cc:* user <user@flink.apache.org> > *Subject:* Re: Weird Flink Kafka source watermark behavior > > > > ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠ > > > > I'll try forcing # source tasks = # partitions tomorrow. > > > > Thank you, Dongwon, for all of your help! > > > > On Fri, Mar 18, 2022 at 12:20 AM Dongwon Kim <eastcirc...@gmail.com> > wrote: > > I believe your job with per-partition watermarking should be working okay > even in a backfill scenario. > > > > BTW, is the problem still observed even with # sour tasks = # partitions? > > > > For committers: > > Is there a way to confirm that per-partition watermarking is used in TM > log? > > > > On Fri, Mar 18, 2022 at 4:14 PM Dan Hill <quietgol...@gmail.com> wrote: > > I hit this using event processing and no idleness detection. The same > issue happens if I enable idleness. > > > > My code matches the code example for per-partition watermarking > <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector> > . > > > > On Fri, Mar 18, 2022 at 12:07 AM Dongwon Kim <eastcirc...@gmail.com> > wrote: > > Hi Dan, > > > > I'm quite confused as you already use per-partition watermarking. > > > > What I meant in the reply is > > - If you don't use per-partition watermarking, # tasks < # partitions can > cause the problem for backfill jobs. > > - If you don't use per-partition watermarking, # tasks = # partitions is > going to be okay even for backfill jobs. > > - If you use per-partition watermarking, # tasks < # partitions shouldn't > cause any problems unless you turn on the idleness detection. > > > > Regarding the idleness detection which is based on processing time, what > is your setting? If you set the value to 10 seconds for example, you'll > face the same problem unless the watermark of your backfill job catches > up real-time within 10 seconds. If you increase the value to 1 minute, your > backfill job should catch up real-time within 1 minute. > > > > Best, > > > > Dongwon > > > > > > On Fri, Mar 18, 2022 at 3:51 PM Dan Hill <quietgol...@gmail.com> wrote: > > Thanks Dongwon! > > > > Wow. Yes, I'm using per-partition watermarking [1]. Yes, my # source > tasks < # kafka partitions. This should be called out in the docs or the > bug should be fixed. > > > > On Thu, Mar 17, 2022 at 10:54 PM Dongwon Kim <eastcirc...@gmail.com> > wrote: > > Hi Dan, > > > > Do you use the per-partition watermarking explained in [1]? > > I've also experienced a similar problem when running backfill jobs > specifically when # source tasks < # kafka partitions. > > - When # source tasks = # kafka partitions, the backfill job works as > expected. > > - When # source tasks < # kafka partitions, a Kafka consumer consumes > multiple partitions. This case can destroying the per-partition patterns as > explained in [2]. > > > > Hope this helps. > > > > p.s. If you plan to use the per-partition watermarking, be aware that > idleness detection [3] can cause another problem when you run a backfill > job. Kafka source tasks in a backfill job seem to read a batch of records > from Kafka and then wait for downstream tasks to catch up the progress, > which can be counted as idleness. > > > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#using-watermark-strategie > > [2] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector > > [3] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources > > > > Best, > > > > Dongwon > > > > On Fri, Mar 18, 2022 at 2:35 PM Dan Hill <quietgol...@gmail.com> wrote: > > I'm following the example from this section: > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector > > > > On Thu, Mar 17, 2022 at 10:26 PM Dan Hill <quietgol...@gmail.com> wrote: > > Other points > > - I'm using the kafka timestamp as event time. > > - The same issue happens even if I use an idle watermark. > > > > On Thu, Mar 17, 2022 at 10:17 PM Dan Hill <quietgol...@gmail.com> wrote: > > There are 12 Kafka partitions (to keep the structure similar to other low > traffic environments). > > > > On Thu, Mar 17, 2022 at 10:13 PM Dan Hill <quietgol...@gmail.com> wrote: > > Hi. > > > > I'm running a backfill from a kafka topic with very few records spread > across a few days. I'm seeing a case where the records coming from a kafka > source have a watermark that's more recent (by hours) than the event time. > I haven't seen this before when running this. This violates what I'd > assume the kafka source would do. > > > > Example problem: > > 1. I have kafka records at ts=1000, 2000, ... 500000. The actual times > are separated by a longer time period. > > 2. My first operator after the FlinkKafkaConsumer sees: > > context.timestamp() = 1000 > > context.timerService().currentWatermark() = 500000 > > > > Details about how I'm running this: > > - I'm on Flink 1.12.3 that's running on EKS and using MSK as the source. > > - I'm using FlinkKafkaConsumer > > - I'm using WatermarkStrategy.forBoundedOutOfOrderness(5s). No idleness > settings. > > - I'm running similar code in all the environments. The main difference > is low traffic. I have not been able to reproduce this out of the > environment. > > > > > > I put the following process function right after my kafka source. > > > > -------- > > > AfterSource > > ts=1647274892728 > watermark=1647575140007 > record=... > > > > > public static class TextLog extends ProcessFunction<Record, Record> { > private final String label; > public TextLogDeliveryLog(String label) { > this.label = label; > } > @Override > public void processElement(Record record, Context context, > Collector<Record> collector) throws Exception { > LOGGER.info("{}\nts={}\nwatermark={}\nrecord={}", > label, context.timestamp(), > context.timerService().currentWatermark(), record); > collector.collect(deliveryLog); > } > } > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >