flink cdc整库同步大小表造成数据倾斜问题

2024-02-06 Thread casel.chen
使用flink cdc 3.0 
yaml作业进行mysql到doris整库同步时发现有数据倾斜发生,大的TM要处理180G数据,小的TM只有30G数据,上游有的大表流量很大,而小表几乎没有流量,有什么办法可以避免发生数据倾斜问题么?

Watermark alignment without idleness

2024-02-06 Thread Alexis Sarda-Espinosa
Hello,

I was reading through the comments in [1] and it seems that enabling
watermark alignment implicitly activates some idleness logic "if the source
waits for alignment for a long time" (even if withIdleness is not called
explicitly during the creation of WatermarkStrategy). Is this time somehow
configurable? I believe this isn't documented.

[1] https://issues.apache.org/jira/browse/FLINK-32496

Regards,
Alexis.


[ANNOUNCE] Apache Celeborn(incubating) 0.4.0 available

2024-02-06 Thread Fu Chen
Hi all,

Apache Celeborn(Incubating) community is glad to announce the
new release of Apache Celeborn(Incubating) 0.4.0.

Celeborn is dedicated to improving the efficiency and elasticity of
different map-reduce engines and provides an elastic, high-efficient
service for intermediate data including shuffle data, spilled data,
result data, etc.


Download Link: https://celeborn.apache.org/download/

GitHub Release Tag:

-
https://github.com/apache/incubator-celeborn/releases/tag/v0.4.0-incubating

Release Notes:

- https://celeborn.apache.org/community/release_notes/release_note_0.4.0


Home Page: https://celeborn.apache.org/

Celeborn Resources:

- Issue Management: https://issues.apache.org/jira/projects/CELEBORN
- Mailing List: d...@celeborn.apache.org

Thanks,
Fu Chen
On behalf of the Apache Celeborn(incubating) community


Re: Request to provide sample codes on Data stream using flink, Spring Boot & kafka

2024-02-06 Thread Alexis Sarda-Espinosa
Hello,

check this thread from some months ago, but keep in mind that it's not
really officially supported by Flink itself:

https://lists.apache.org/thread/l0pgm9o2vdywffzdmbh9kh7xorhfvj40

Regards,
Alexis.

Am Di., 6. Feb. 2024 um 12:23 Uhr schrieb Fidea Lidea <
lideafidea...@gmail.com>:

> Hi Team,
>
> I request you to provide sample codes on data streaming using flink, kafka
> and spring boot.
>
> Awaiting your response.
>
> Thanks & Regards
> Nida Shaikh
>


Re: Idleness not working if watermark alignment is used

2024-02-06 Thread Alexis Sarda-Espinosa
Hi Matthias,

I think I understand the implications of idleness. In my case I really do
need it since even in the production environment one of the Kafka topics
will receive messages only sporadically.

With regards to the code, I have very limited understanding of Flink
internals, but that part I linked seems to indicate that, if a stream is
idle, the log should indicate a hard-coded maxAllowedWatermark equal to
Long.MAX_VALUE, that's why I thought the source isn't really considered as
idle.

Regards,
Alexis.

Am Di., 6. Feb. 2024 um 11:46 Uhr schrieb Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Hi Alexis,
>
>
>
> Yes, I guess so, while not utterly acquainted with that part of the code.
>
> Apparently the SourceCoordinator cannot come up with a proper watermark
> time, if watermarking is turned off (idle mode of stream), and then it
> deducts watermark time from the remaining non-idle sources.
>
> It’s consistent with how idling-state of data streams is designed.
>
> However it still remains the notion of that one needs to compensate for
> .withIdleness(…) if correctness is any consideration.
>
> Using .withIdleness(…) is IMHO only justified in rare cases where
> implications are fully understood.
>
>
>
> If a source is not configured with .withIdleness(…) and becomes factually
> idle, all window aggregations or stateful stream joins stall until that
> source becomes active again (= added latency)
>
>
>
> Thias
>
>
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Tuesday, February 6, 2024 9:48 AM
> *To:* Schwalbe Matthias 
> *Cc:* user 
> *Subject:* Re: Idleness not working if watermark alignment is used
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Matthias,
>
>
>
> thanks for looking at this. Would you then say this comment in the source
> code is not really valid?
>
>
> https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L181
>
>
>
> That's where the log I was looking at is created.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> Am Di., 6. Feb. 2024 um 08:54 Uhr schrieb Schwalbe Matthias <
> matthias.schwa...@viseca.ch>:
>
> Good morning Alexis,
>
>
>
> withIdleness(…) is easily misunderstood, it actually means that the thus
> configured stream is exempt from watermark processing after 5 seconds (in
> your case).
>
> Hence also watermark alignment is turned off for the stream until a new
> event arrives.
>
>
>
> .withIdleness(…) is good for situations where you prefer low latency over
> correctness (causality with respect to time order).
>
> Downstream operators can choose a manual implementation of watermark
> behavior in order to compensate for the missing watermarks.
>
>
>
> IMHO, because I see so many people make the same mistake I would rather
> rename .withIdleness(…) to something like .idleWatermarkExcemption(…) to
> make it more obvious.
>
>
>
> Hope this helps
>
>
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Monday, February 5, 2024 6:04 PM
> *To:* user 
> *Subject:* Re: Idleness not working if watermark alignment is used
>
>
>
> Ah and I forgot to mention, this is with Flink 1.18.1
>
>
>
> Am Mo., 5. Feb. 2024 um 18:00 Uhr schrieb Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com>:
>
> Hello,
>
>
>
> I have 2 Kafka sources that are configured with a watermark strategy
> instantiated like this:
>
>
>
> WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
> .withIdleness(idleTimeout) // 5 seconds currently
> .withWatermarkAlignment(alignmentGroup,
> maxAllowedWatermarkDrift, Duration.ofSeconds(1L))
>
>
>
> The alignment group is the same for both, but each one consumes from a
> different topic. During a test, I ensured that one of the topics didn't
> receive any messages, but when I check the logs I see multiple entries like
> this:
>
>
>
> Distributing maxAllowedWatermark=1707149933770 of group=dispatcher to
> subTaskIds=[0] for source Source: GenericChangeMessageDeserializer.
>
>
>
> where maxAllowedWatermark grows all the time.
>
>
>
> Maybe my understanding is wrong, but I think this means the source is
> never marked as idle even though it didn't receive any new messages in the
> Kafka topic?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of 

Request to provide sample codes on Data stream using flink, Spring Boot & kafka

2024-02-06 Thread Fidea Lidea
Hi Team,

I request you to provide sample codes on data streaming using flink, kafka
and spring boot.

Awaiting your response.

Thanks & Regards
Nida Shaikh


RE: Idleness not working if watermark alignment is used

2024-02-06 Thread Schwalbe Matthias
Hi Alexis,

Yes, I guess so, while not utterly acquainted with that part of the code.
Apparently the SourceCoordinator cannot come up with a proper watermark time, 
if watermarking is turned off (idle mode of stream), and then it deducts 
watermark time from the remaining non-idle sources.
It’s consistent with how idling-state of data streams is designed.
However it still remains the notion of that one needs to compensate for 
.withIdleness(…) if correctness is any consideration.
Using .withIdleness(…) is IMHO only justified in rare cases where implications 
are fully understood.

If a source is not configured with .withIdleness(…) and becomes factually idle, 
all window aggregations or stateful stream joins stall until that source 
becomes active again (= added latency)

Thias

From: Alexis Sarda-Espinosa 
Sent: Tuesday, February 6, 2024 9:48 AM
To: Schwalbe Matthias 
Cc: user 
Subject: Re: Idleness not working if watermark alignment is used

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Matthias,

thanks for looking at this. Would you then say this comment in the source code 
is not really valid?
https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L181

That's where the log I was looking at is created.

Regards,
Alexis.

Am Di., 6. Feb. 2024 um 08:54 Uhr schrieb Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>>:
Good morning Alexis,

withIdleness(…) is easily misunderstood, it actually means that the thus 
configured stream is exempt from watermark processing after 5 seconds (in your 
case).
Hence also watermark alignment is turned off for the stream until a new event 
arrives.

.withIdleness(…) is good for situations where you prefer low latency over 
correctness (causality with respect to time order).
Downstream operators can choose a manual implementation of watermark behavior 
in order to compensate for the missing watermarks.

IMHO, because I see so many people make the same mistake I would rather rename 
.withIdleness(…) to something like .idleWatermarkExcemption(…) to make it more 
obvious.

Hope this helps


Thias



From: Alexis Sarda-Espinosa 
mailto:sarda.espin...@gmail.com>>
Sent: Monday, February 5, 2024 6:04 PM
To: user mailto:user@flink.apache.org>>
Subject: Re: Idleness not working if watermark alignment is used

Ah and I forgot to mention, this is with Flink 1.18.1

Am Mo., 5. Feb. 2024 um 18:00 Uhr schrieb Alexis Sarda-Espinosa 
mailto:sarda.espin...@gmail.com>>:
Hello,

I have 2 Kafka sources that are configured with a watermark strategy 
instantiated like this:

WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
.withIdleness(idleTimeout) // 5 seconds currently
.withWatermarkAlignment(alignmentGroup, maxAllowedWatermarkDrift, 
Duration.ofSeconds(1L))

The alignment group is the same for both, but each one consumes from a 
different topic. During a test, I ensured that one of the topics didn't receive 
any messages, but when I check the logs I see multiple entries like this:

Distributing maxAllowedWatermark=1707149933770 of group=dispatcher to 
subTaskIds=[0] for source Source: GenericChangeMessageDeserializer.

where maxAllowedWatermark grows all the time.

Maybe my understanding is wrong, but I think this means the source is never 
marked as idle even though it didn't receive any new messages in the Kafka 
topic?

Regards,
Alexis.

Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser 

Re: Idleness not working if watermark alignment is used

2024-02-06 Thread Alexis Sarda-Espinosa
Hi Matthias,

thanks for looking at this. Would you then say this comment in the source
code is not really valid?
https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L181

That's where the log I was looking at is created.

Regards,
Alexis.

Am Di., 6. Feb. 2024 um 08:54 Uhr schrieb Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Good morning Alexis,
>
>
>
> withIdleness(…) is easily misunderstood, it actually means that the thus
> configured stream is exempt from watermark processing after 5 seconds (in
> your case).
>
> Hence also watermark alignment is turned off for the stream until a new
> event arrives.
>
>
>
> .withIdleness(…) is good for situations where you prefer low latency over
> correctness (causality with respect to time order).
>
> Downstream operators can choose a manual implementation of watermark
> behavior in order to compensate for the missing watermarks.
>
>
>
> IMHO, because I see so many people make the same mistake I would rather
> rename .withIdleness(…) to something like .idleWatermarkExcemption(…) to
> make it more obvious.
>
>
>
> Hope this helps
>
>
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa 
> *Sent:* Monday, February 5, 2024 6:04 PM
> *To:* user 
> *Subject:* Re: Idleness not working if watermark alignment is used
>
>
>
> Ah and I forgot to mention, this is with Flink 1.18.1
>
>
>
> Am Mo., 5. Feb. 2024 um 18:00 Uhr schrieb Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com>:
>
> Hello,
>
>
>
> I have 2 Kafka sources that are configured with a watermark strategy
> instantiated like this:
>
>
>
> WatermarkStrategy.forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
> .withIdleness(idleTimeout) // 5 seconds currently
> .withWatermarkAlignment(alignmentGroup,
> maxAllowedWatermarkDrift, Duration.ofSeconds(1L))
>
>
>
> The alignment group is the same for both, but each one consumes from a
> different topic. During a test, I ensured that one of the topics didn't
> receive any messages, but when I check the logs I see multiple entries like
> this:
>
>
>
> Distributing maxAllowedWatermark=1707149933770 of group=dispatcher to
> subTaskIds=[0] for source Source: GenericChangeMessageDeserializer.
>
>
>
> where maxAllowedWatermark grows all the time.
>
>
>
> Maybe my understanding is wrong, but I think this means the source is
> never marked as idle even though it didn't receive any new messages in the
> Kafka topic?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>