From what I can tell there's currently no way to control the parallelism of the LookupTableSource. It /seems/ to be bound to the parallelism of the input stream (i.e., the CDC stream).

I'd suggest to create a jira ticket for this.

On 11/01/2022 06:43, Jonathan Weaver wrote:
I'm attempting to do a proof of concept conversion of a DataStream based Flink program over to using almost entirely Table SQL.

I have a primary CDC stream (an unbounded scan table source) that does two joins to LookupTableSource tables and then on to a sink.

In the datastream program the only way to maintain throughput and not get backpressured on the CDC stream was to set a carefully tuned parallelism on the lookup functions to maximize the lookup capacity in the source systems.

However in the SQL programs it appears there is no setting I can find to set a parallelism on the LookupTableSource tables, and the planner is setting the parallelism to 1 which is only allowing roughly 1/10 the capacity the source system can handle and backpressuring the CDC stream.

So my question is, is there a way to have the benefits of Table SQL interface but also allow performance tuning on LookupTableSource tables? A max parallelism of 1 will kill the attempted conversion immediately.

I love the Catalog interface and am attempting to turn all the custom functions and lookups into tables that other developers can just write SQL on.. But the performance tuning is critical.

All the tables are being registered in a catalog using the DynamicTableSource factories.

My SQL is basically of the form of
INSERT INTO sink
SELECT
   ...
FROM cdc_table cdc
JOIN lookup1 FOR SYSTEM_TIME AS OF cdc.proc_time look1 ON cdc.identifier = look1.identifier LEFT OUTER JOIN lookup2 FOR SYSTEM_TIME AS OF cdc.proc_time look2 ON cdc.identifier = look2.identifier
WHERE conditions;

Any ways to force the planner to a specific parallelism?

Thanks for your time,
Jonathan





Reply via email to