Hello Flink Community,
We question on how Flink SQL optimizes the SQL statements and whether this can
be influenced.
For the sake of simplicity, let’s assume we three small tables, one input and
two output tables.
The input table contains an array which we want
* to flatten.
* do some (resource expensive) enrichment and pre-filtering.
* depending on a filter write the result to one or more output tables.
CREATE TABLE inputTbl (
requestId STRING NOT NULL,
customers ARRAY<STRING NOT NULL>,
ingestionTime TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR ingestionTime AS ingestionTime
) WITH (
…
)
CREATE TABLE outputATbl (
requestId STRING NOT NULL,
customer STRING NOT NULL,
enrichment STRING NOT NULL
) WITH (
…
)
CREATE TABLE outputBTbl (
requestId STRING NOT NULL,
customer STRING NOT NULL,
enrichment STRING NOT NULL
) WITH (
…
)
We have a UDF that returns the enrichment value for a customer and this UDF is
resource expensive.
CREATE TEMPORARY FUNCTION ENRICH AS ’…'
We create a view to do the flattening and pre-filtering
CREATE VIEW flatView AS
SELECT
i.requestId AS requestId,
t.customer AS customer
FROM inputTbl AS i
CROSS JOIN UNNEST(customers) AS t(customer)
We create a second view to do the enrichment and prefiltering
CREATE VIEW enrichmentView AS
SELECT
requestId AS requestId,
customer AS customer,
ENRICH(customer) as enrichment
FROM flatView
WHERE requestId NOT LIKE '%d'
After which we can simply write the data to the output tables using the
following INSERT statements.
INSERT INTO outputATbl
SELECT requestId, customer, enrichment
FROM enrichmentView
WHERE requestId LIKE '%a'
INSERT INTO outputBTbl
SELECT requestId, customer, enrichment
FROM enrichmentView
WHERE requestId LIKE '%b'
Functionally this works correctly but when you look at the Optimized execution
Plan, the UNNEST (Correlate) is done for both sinks. Also, the UDF ENRICH() is
called in every Sink. Both lead to unwanted/unnecessary processing and resource
usage.
== Optimized Execution Plan ==
Calc(select=[requestId, customers, Reinterpret(CAST(ingestionTime AS
TIMESTAMP(3) *ROWTIME*)) AS ingestionTime])(reuse_id=[1])
+- TableSourceScan(table=[[ing, sdav2, inputTbl, metadata=[timestamp],
watermark=[CAST(ingestionTime AS TIMESTAMP(3) *ROWTIME*)], idletimeout=[5],
watermarkEmitStrategy=[on-periodic]]], fields=[requestId, customers,
ingestionTime])
Sink(table=[ing.sdav2.outputATbl], fields=[requestId, customer, enrichment])
+- Calc(select=[requestId, f0 AS customer, ENRICH(f0) AS enrichment])
+- Correlate(invocation=[$UNNEST_ROWS$1($cor2.customers)],
correlate=[table($UNNEST_ROWS$1($cor2.customers))],
select=[requestId,customers,ingestionTime,f0],
rowType=[RecordType(VARCHAR(2147483647) requestId, VARCHAR(2147483647) ARRAY
customers, TIMESTAMP(3) *ROWTIME* ingestionTime, VARCHAR(2147483647) f0)],
joinType=[INNER], condition=[AND(LIKE($0, _UTF-16LE'%a'), NOT(LIKE($0,
_UTF-16LE'%d')))])
+- Reused(reference_id=[1])
Sink(table=[ing.sdav2.outputBTbl], fields=[requestId, customer, enrichment])
+- Calc(select=[requestId, f0 AS customer, ENRICH(f0) AS enrichment])
+- Correlate(invocation=[$UNNEST_ROWS$1($cor3.customers)],
correlate=[table($UNNEST_ROWS$1($cor3.customers))],
select=[requestId,customers,ingestionTime,f0],
rowType=[RecordType(VARCHAR(2147483647) requestId, VARCHAR(2147483647) ARRAY
customers, TIMESTAMP(3) *ROWTIME* ingestionTime, VARCHAR(2147483647) f0)],
joinType=[INNER], condition=[AND(LIKE($0, _UTF-16LE'%b'), NOT(LIKE($0,
_UTF-16LE'%d')))])
+- Reused(reference_id=[1])
I would have expected an Optimized Execution Plan more like the following:
== Expected Optimized Execution Plan ==
== Optimized Execution Plan ==
Calc(select=[requestId, f0 AS customer, ENRICH(f0) AS enrichment])(reuse_id=[1])
+- Correlate(invocation=[$UNNEST_ROWS$1($cor2.customers)],
correlate=[table($UNNEST_ROWS$1($cor2.customers))],
select=[requestId,customers,ingestionTime,f0],
rowType=[RecordType(VARCHAR(2147483647) requestId, VARCHAR(2147483647) ARRAY
customers, TIMESTAMP(3) *ROWTIME* ingestionTime, VARCHAR(2147483647) f0)],
joinType=[INNER])
+- Calc(select=[requestId, customers, Reinterpret(CAST(ingestionTime AS
TIMESTAMP(3) *ROWTIME*)) AS ingestionTime], where=[NOT(LIKE(requestId, '%d'))])
+- TableSourceScan(table=[[ing, sdav2, inputTbl, metadata=[timestamp],
watermark=[CAST(ingestionTime AS TIMESTAMP(3) *ROWTIME*)], idletimeout=[5],
watermarkEmitStrategy=[on-periodic]]], fields=[requestId, customers,
ingestionTime])
Sink(table=[ing.sdav2.outputATbl], fields=[requestId, customer, enrichment])
+- Calc(select=[requestId, customer, enrichment], where=[LIKE(requestId, '%a')])
+- Reused(reference_id=[1])
Sink(table=[ing.sdav2.outputBTbl], fields=[requestId, customer, enrichment])
+- Calc(select=[requestId, customer, enrichment], where=[LIKE(requestId, '%b')])
+- Reused(reference_id=[1])
Is there a way to influence the optimizer so that we can mitigate the
unwanted/unnecessary processing and resource usage?
Or is there another design pattern we should use to solve this issue?
Regards,
Fred Teunissen
-----------------------------------------------------------------
ATTENTION:
The information in this e-mail is confidential and only meant for the intended
recipient. If you are not the intended recipient, don't use or disclose it in
any way. Please let the sender know and delete the message immediately.
-----------------------------------------------------------------