Hi Viacheslav, These are two very interesting questions...
You have found out about the chaining restriction to single input operators to be chained, it does also not help to union() multiple streams into a single input, they still count as multiple inputs. * The harder way to go would be to patch the relevant parts of Flink to allow chaining with multiple inputs * This is very complicated to get right, especially for the then multiple inputs and outputs that need to get façaded * We once did it (successfully) and abandoned the idea because of its complexity and maintenance cost * The other way might be to implement all into one org.apache.flink.streaming.api.operators.MultipleInputStreamOperator that allows to have any (reasonable) number of inputs, keyed, non-keyed, broadcast ; mixed .... Let me explain: * From what you say I assume, that after the Kafka source you need to .keyBy() the instrument-id anyway, which means a shuffle and (de-/)serialization ... unavoidable. * However, after that shuffle, the MultipleInputStreamOperator could force-chain all your logic as long as it stays to be on the same key/partition domain * Integration of broadcast inputs is a no-brainer there * We do these things all the time and it really helps cutting down serialization cost, among other things * This way does not necessarily help with keeping latency down, as more inputs means more time to round-robin the available inputs I hope this helps What do you think? Regards Thias From: Viacheslav Chernyshev <v.chernys...@outlook.com> Sent: Tuesday, February 28, 2023 1:06 PM To: user@flink.apache.org Subject: Is it possible to preserve chaining for multi-input operators? Hi everyone, My team is developing a streaming pipeline for analytics on top of market data. The ultimate goal is to be able to handle tens of millions of events per second distributed across the cluster according to the unique ID of a particular financial instrument. Unfortunately, we struggle with achieving acceptable performance. As far as I can see, Flink forcibly breaks operator chaining when it encounters a job graph node with multiple inputs. Subsequently, it severely affects the performance because a network boundary is enforced, and every event is forcibly serialised and deserialised. >From the pipeline graph perspective, the requirements are: * Read data from multiple Kafka topics that are connected to different nodes in the graph. * Broadcast a number of dynamic rules to the pipeline. The cleanest way is to achieve the first goal is to have a bunch of KeyedCoProcessFunction operations. This design didn't work for us because the SerDe overhead added by broken chains was too high, we had to completely flatten the pipeline instead. Unfortunately, I can't find any way to solve the second problem. As soon as the broadcast stream is introduced into the pipeline, the performance tanks. Is there any technique that I could possibly utilise to preserve the chaining? Kind regards, Viacheslav Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.