From what I can tell there's currently no way to control the
parallelism of the LookupTableSource. It /seems/ to be bound to the
parallelism of the input stream (i.e., the CDC stream).
I'd suggest to create a jira ticket for this.
On 11/01/2022 06:43, Jonathan Weaver wrote:
I'm attempting to do a proof of concept conversion of a DataStream
based Flink program over to using almost entirely Table SQL.
I have a primary CDC stream (an unbounded scan table source) that does
two joins to LookupTableSource tables and then on to a sink.
In the datastream program the only way to maintain throughput and not
get backpressured on the CDC stream was to set a carefully tuned
parallelism on the lookup functions to maximize the lookup capacity in
the source systems.
However in the SQL programs it appears there is no setting I can find
to set a parallelism on the LookupTableSource tables, and the planner
is setting the parallelism to 1 which is only allowing roughly 1/10
the capacity the source system can handle and backpressuring the CDC
stream.
So my question is, is there a way to have the benefits of Table SQL
interface but also allow performance tuning on LookupTableSource
tables? A max parallelism of 1 will kill the attempted conversion
immediately.
I love the Catalog interface and am attempting to turn all the custom
functions and lookups into tables that other developers can just write
SQL on.. But the performance tuning is critical.
All the tables are being registered in a catalog using the
DynamicTableSource factories.
My SQL is basically of the form of
INSERT INTO sink
SELECT
...
FROM cdc_table cdc
JOIN lookup1 FOR SYSTEM_TIME AS OF cdc.proc_time look1 ON
cdc.identifier = look1.identifier
LEFT OUTER JOIN lookup2 FOR SYSTEM_TIME AS OF cdc.proc_time look2 ON
cdc.identifier = look2.identifier
WHERE conditions;
Any ways to force the planner to a specific parallelism?
Thanks for your time,
Jonathan