Xuyang Zhong created FLINK-38753:
------------------------------------
Summary: Enrich more upsert keys by equiv expressions
Key: FLINK-38753
URL: https://issues.apache.org/jira/browse/FLINK-38753
Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Affects Versions: 2.2.0
Reporter: Xuyang Zhong
Take the following sql as example
{code:java}
@Test
def test3(): Unit = {
tEnv.executeSql(s"""
|create temporary table src1 (
| a1 int primary key not enforced,
| b1 int
|) with (
| 'connector' = 'values',
| 'changelog-mode' = 'I,UA,UB,D'
|)
|""".stripMargin)
tEnv.executeSql(s"""
|create temporary table src2 (
| a2 int,
| b2 int,
| c2 int,
| primary key(a2, b2) not enforced
|) with (
| 'connector' = 'values',
| 'changelog-mode' = 'I,UA,UB,D'
|)
|""".stripMargin)
tEnv.executeSql(s"""
|create temporary table snk (
| a1 int,
| b1 int,
| a2 int,
| b2 int,
| c2 int,
| primary key(a1, b2) not enforced
|) with (
| 'connector' = 'values',
| 'sink-insert-only' = 'false',
| 'sink-changelog-mode-enforced' = 'I,UA,D'
|)
|""".stripMargin)
util.verifyExplainInsert("""
|insert into snk
| select * from src1 join src2 on a1 = a2
|""".stripMargin)
} {code}
The plan is:
{code:java}
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.snk], fields=[a1, b1, a2,
b2, c2])
+- LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3], c2=[$4])
+- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, src1]])
+- LogicalTableScan(table=[[default_catalog, default_database, src2]])
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.snk], fields=[a1, b1, a2, b2, c2],
upsertMaterialize=[true])
+- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2, c2],
leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[HasUniqueKey])
:- Exchange(distribution=[hash[a1]])
: +- TableSourceScan(table=[[default_catalog, default_database, src1]],
fields=[a1, b1])
+- Exchange(distribution=[hash[a2]])
+- TableSourceScan(table=[[default_catalog, default_database, src2]],
fields=[a2, b2, c2])
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.snk], fields=[a1, b1, a2, b2, c2],
upsertMaterialize=[true])
+- Join(joinType=[InnerJoin], where=[(a1 = a2)], select=[a1, b1, a2, b2, c2],
leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[HasUniqueKey])
:- Exchange(distribution=[hash[a1]])
: +- TableSourceScan(table=[[default_catalog, default_database, src1]],
fields=[a1, b1])
+- Exchange(distribution=[hash[a2]])
+- TableSourceScan(table=[[default_catalog, default_database, src2]],
fields=[a2, b2, c2])
{code}
There is a `upsertMaterialize` in sink because the sink pk is `a1, b2` (`\{0,
3}`) and upsert key upstream `[\{2, 3}, \{0, 2, 3}]` doesn't contains it.
However, this is an inner join and there is a equiv cond `a1 = a2`, and we can
use this equivalence relation to enrich more upsert keys, to resolve the
`upsertMaterialize`.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)