Hi Alexis,

Yes, I guess so, while not utterly acquainted with that part of the code.
Apparently the SourceCoordinator cannot come up with a proper watermark time, 
if watermarking is turned off (idle mode of stream), and then it deducts 
watermark time from the remaining non-idle sources.
It’s consistent with how idling-state of data streams is designed.
However it still remains the notion of that one needs to compensate for 
.withIdleness(…) if correctness is any consideration.
Using .withIdleness(…) is IMHO only justified in rare cases where implications 
are fully understood.

If a source is not configured with .withIdleness(…) and becomes factually idle, 
all window aggregations or stateful stream joins stall until that source 
becomes active again (= added latency)

Thias

From: Alexis Sarda-Espinosa <sarda.espin...@gmail.com>
Sent: Tuesday, February 6, 2024 9:48 AM
To: Schwalbe Matthias <matthias.schwa...@viseca.ch>
Cc: user <user@flink.apache.org>
Subject: Re: Idleness not working if watermark alignment is used

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Matthias,

thanks for looking at this. Would you then say this comment in the source code 
is not really valid?
https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L181

That's where the log I was looking at is created.

Regards,
Alexis.

Am Di., 6. Feb. 2024 um 08:54 Uhr schrieb Schwalbe Matthias 
<matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>>:
Good morning Alexis,

withIdleness(…) is easily misunderstood, it actually means that the thus 
configured stream is exempt from watermark processing after 5 seconds (in your 
case).
Hence also watermark alignment is turned off for the stream until a new event 
arrives.

.withIdleness(…) is good for situations where you prefer low latency over 
correctness (causality with respect to time order).
Downstream operators can choose a manual implementation of watermark behavior 
in order to compensate for the missing watermarks.

IMHO, because I see so many people make the same mistake I would rather rename 
.withIdleness(…) to something like .idleWatermarkExcemption(…) to make it more 
obvious.

Hope this helps


Thias



From: Alexis Sarda-Espinosa 
<sarda.espin...@gmail.com<mailto:sarda.espin...@gmail.com>>
Sent: Monday, February 5, 2024 6:04 PM
To: user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Idleness not working if watermark alignment is used

Ah and I forgot to mention, this is with Flink 1.18.1

Am Mo., 5. Feb. 2024 um 18:00 Uhr schrieb Alexis Sarda-Espinosa 
<sarda.espin...@gmail.com<mailto:sarda.espin...@gmail.com>>:
Hello,

I have 2 Kafka sources that are configured with a watermark strategy 
instantiated like this:

WatermarkStrategy.<T>forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
            .withIdleness(idleTimeout) // 5 seconds currently
            .withWatermarkAlignment(alignmentGroup, maxAllowedWatermarkDrift, 
Duration.ofSeconds(1L))

The alignment group is the same for both, but each one consumes from a 
different topic. During a test, I ensured that one of the topics didn't receive 
any messages, but when I check the logs I see multiple entries like this:

Distributing maxAllowedWatermark=1707149933770 of group=dispatcher to 
subTaskIds=[0] for source Source: GenericChangeMessageDeserializer.

where maxAllowedWatermark grows all the time.

Maybe my understanding is wrong, but I think this means the source is never 
marked as idle even though it didn't receive any new messages in the Kafka 
topic?

Regards,
Alexis.

Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to