Hi Shazia, FLIP-182 [1] might be a thing that will let you address issues like this in the future. With it, maybe you could do some magic with assigning watermarks to make sure that one stream doesn't run too much into the future which would effectively prioritise the other stream. But that's currently aimed for Flink 1.15 (subject to change), which is still a couple of months away.
For the time being, a workaround that I know some people were using is to implement some manual throttling of the sources. Either via a throttling operator/mapping function chained directly after the sources, or implemented inside your custom source. One issue that complicates this solution is that most likely you would need to use an external system (external database?, maybe some file?) to control how much and when to throttle whom. To decide whom to throttle you could use Flink metrics [2], especially something around the amount of bytes/records processed by an operator/subtask. Also note that be cautious when doing sleeps, as when you are blocking calls inside your code, you will block checkpointing for example. And let me stress this one more time, throttling should be chained directly after the sources. If there is a network exchange between source and throttling function, you would capture a lot of in-flight records between the two, causing potentially crippling back pressure that would especially affect aligned checkpointing [3]. Best, Piotrek [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources [2] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/ [3] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/ wt., 23 lis 2021 o 15:52 Shazia Kayani <shazia.kay...@ibm.com> napisaĆ(a): > Hi Martijn, > > Its a continuous requirement so always read from one input source over > another, but its does not require a super strict guarantee, so it doesn't > matter if on occasion a message is read from the wrong topic. It's mainly > due to there consistently being significantly more messages on one source > than another which causes issues when we there are too many messages on the > stream. > > Thanks > > Shazia > > > ----- Original message ----- > From: "Martijn Visser" <mart...@ververica.com> > To: "Shazia Kayani" <shazia.kay...@ibm.com> > Cc: "User" <user@flink.apache.org> > Subject: [EXTERNAL] Re: Input Selectable & Checkpointing > Date: Tue, Nov 23, 2021 2:45 PM > > Hi, > > Do you have a requirement to continuously prioritise one input source over > another (like always read topic X from Kafka before topic Y from Kafka) or > is it a one-time effort, because you might need to bootstrap some state, so > first read all data from file source A before switching over to topic B > from Kafka?). If it's the latter, you could look into the HybridSource. > > Best regards, > > Martijn > > On Tue, 23 Nov 2021 at 15:34, Shazia Kayani <shazia.kay...@ibm.com> wrote: > > Hi All, > > Hope you are well! > > I am working on something which has a requirement from flink to prioritise > one input datastream over another, to do this I'm currently implemented an > operator which extends InputSelectable to do this. > However, because of using input selectable checkpointing is disabled as it > is currently not supported. > > I was just wondering if anyone has done something similar to > this previously? and if so were you able to implement changes which > resulted in successful checkpointing? > If anyone has any other tips around the topic that too would also be > helpful! > > Thanks > > Shazia > > Unless stated otherwise above: > > IBM United Kingdom Limited - Registered in England and Wales with number > 741598. > > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU > > > > > > Unless stated otherwise above: > > IBM United Kingdom Limited - Registered in England and Wales with number > 741598. > > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU > > >