Thanks for driving this, Weijie. Usually, the data distribution of the external system is closely related to the keys, e.g. computing the bucket index by key hashcode % bucket num, so I'm not sure about how much difference there are between partitioning by key and a custom partitioning strategy. Could you give a more concrete example in production on when a custom partitioning strategy will outperform partitioning by key? Since you've mentioned Paimon in doc, maybe an example on Paimon.
Best, Zhanghao Chen ________________________________ From: weijie guo <guoweijieres...@gmail.com> Sent: Friday, June 7, 2024 9:59 To: dev <dev@flink.apache.org> Subject: [DISCUSS] FLIP-462: Support Custom Data Distribution for Input Stream of Lookup Join Hi devs, I'd like to start a discussion about FLIP-462[1]: Support Custom Data Distribution for Input Stream of Lookup Join. Lookup Join is an important feature in Flink, It is typically used to enrich a table with data that is queried from an external system. If we interact with the external systems for each incoming record, we incur significant network IO and RPC overhead. Therefore, most connectors introduce caching to reduce the per-record level query overhead. However, because the data distribution of Lookup Join's input stream is arbitrary, the cache hit rate is sometimes unsatisfactory. We want to introduce a mechanism for the connector to tell the Flink planner its desired input stream data distribution or partitioning strategy. This can significantly reduce the amount of cached data and improve performance of Lookup Join. You can find more details in this FLIP[1]. Looking forward to hearing from you, thanks! Best regards, Weijie [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-462+Support+Custom+Data+Distribution+for+Input+Stream+of+Lookup+Join