Hi Shazia,

FLIP-182 [1] might be a thing that will let you address issues like this in
the future. With it, maybe you could do some magic with assigning
watermarks to make sure that one stream doesn't run too much into the
future which would effectively prioritise the other stream. But that's
currently aimed for Flink 1.15 (subject to change), which is still a couple
of months away.

For the time being, a workaround that I know some people were using is to
implement some manual throttling of the sources. Either via a throttling
operator/mapping function chained directly after the sources, or
implemented inside your custom source. One issue that complicates this
solution is that most likely you would need to use an external system
(external database?, maybe some file?) to control how much and when to
throttle whom. To decide whom to throttle you could use Flink metrics [2],
especially something around the amount of bytes/records processed by an
operator/subtask. Also note that be cautious when doing sleeps, as when you
are blocking calls inside your code, you will block checkpointing for
example. And let me stress this one more time, throttling should be chained
directly after the sources. If there is a network exchange between source
and throttling function, you would capture a lot of in-flight records
between the two, causing potentially crippling back pressure that would
especially affect aligned checkpointing [3].

Best,
Piotrek

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
[2] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/

wt., 23 lis 2021 o 15:52 Shazia Kayani <shazia.kay...@ibm.com> napisaƂ(a):

> Hi Martijn,
>
> Its a continuous requirement so always read from one input source over
> another, but its does not require a super strict guarantee, so it doesn't
> matter if on occasion a message is read from the wrong topic. It's mainly
> due to there consistently being significantly more messages on one source
> than another which causes issues when we there are too many messages on the
> stream.
>
> Thanks
>
> Shazia
>
>
> ----- Original message -----
> From: "Martijn Visser" <mart...@ververica.com>
> To: "Shazia Kayani" <shazia.kay...@ibm.com>
> Cc: "User" <user@flink.apache.org>
> Subject: [EXTERNAL] Re: Input Selectable & Checkpointing
> Date: Tue, Nov 23, 2021 2:45 PM
>
> Hi,
>
> Do you have a requirement to continuously prioritise one input source over
> another (like always read topic X from Kafka before topic Y from Kafka) or
> is it a one-time effort, because you might need to bootstrap some state, so
> first read all data from file source A before switching over to topic B
> from Kafka?). If it's the latter, you could look into the HybridSource.
>
> Best regards,
>
> Martijn
>
> On Tue, 23 Nov 2021 at 15:34, Shazia Kayani <shazia.kay...@ibm.com> wrote:
>
> Hi All,
>
> Hope you are well!
>
> I am working on something which has a requirement from flink to prioritise
> one input datastream over another, to do this I'm currently implemented an
> operator which extends InputSelectable to do this.
> However, because of using input selectable checkpointing is disabled as it
> is currently not supported.
>
> I was just wondering if anyone has done something similar to
> this previously? and if so were you able to implement changes which
> resulted in successful checkpointing?
> If anyone has any other tips around the topic that too would also be
> helpful!
>
> Thanks
>
> Shazia
>
> Unless stated otherwise above:
>
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598.
>
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
>
>
>
>
>
> Unless stated otherwise above:
>
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598.
>
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
>
>
>

Reply via email to