Hi Alexis, below is my understanding:

> I see that, in Flink 1.17.1, watermark alignment will be supported (as beta) 
> within a single source's splits and across different sources. I don't see 
> this explicitly mentioned in the documentation, but I assume that the concept 
> of "maximal drift" used for alignment also takes idleness into account, 
> resuming any readers that were paused due to an idle split or source. Is my 
> understanding correct?

As far as I understand, the evaluation to “unpause” a given split that might 
have been paused due to watermark alignment is evaluated at fixed intervals 
here. [1]

We see that the SourceCoordinator calls announceCombinedWatermark() that 
calculates the global watermark and that subsequently sends a 
WatermarkAlignmentEvent to each subtask. On each subtask, there is an 
evaluation of whether to “wake up” the operator. [2] [3]

This means that there is a periodic evaluation of whether to “wake up”, 
controlled by the update interval, which defaults to 1s [4]

> Also, something that isn't 100% clear to me when comparing to the previous 
> watermark alignment documentation, even if I only wanted alignment within a 
> single source's splits, I still need to call withWatermarkAlignment in the 
> watermark strategy, right? Otherwise alignment will not take place, 
> regardless of pipeline.watermark-alignment.allow-unaligned-source-splits.

Yes, this is correct. Watermark groups are used to check whether multiple 
sources need to coordinate watermarks. If two sources A and B both belong to 
the same watermark group, then their watermarks will be aligned.

Hope the above helps.


Cheers,
Hong


[1] 
https://github.com/apache/flink/blob/45ba7ee87caee63a0babfd421b7c5eabaa779baa/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L160
[2] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L556-L559
[3] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L659
[4] 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithWatermarkAlignment.java#L29



On 13 Jun 2023, at 21:08, Alexis Sarda-Espinosa <sarda.espin...@gmail.com> 
wrote:


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi again, I'm not a fan of bumping questions, but I think this might be 
relevant, maybe enough to include it in the official documentation?

Regards,
Alexis.

On Tue, 30 May 2023, 16:07 Alexis Sarda-Espinosa, 
<sarda.espin...@gmail.com<mailto:sarda.espin...@gmail.com>> wrote:
Hello,

I see that, in Flink 1.17.1, watermark alignment will be supported (as beta) 
within a single source's splits and across different sources. I don't see this 
explicitly mentioned in the documentation, but I assume that the concept of 
"maximal drift" used for alignment also takes idleness into account, resuming 
any readers that were paused due to an idle split or source. Is my 
understanding correct?

Also, something that isn't 100% clear to me when comparing to the previous 
watermark alignment documentation, even if I only wanted alignment within a 
single source's splits, I still need to call withWatermarkAlignment in the 
watermark strategy, right? Otherwise alignment will not take place, regardless 
of pipeline.watermark-alignment.allow-unaligned-source-splits.

Regards,
Alexis.


Reply via email to