[
https://issues.apache.org/jira/browse/FLINK-36808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-36808:
-----------------------------------
Labels: pull-request-available (was: )
> UNION ALL after lookup join produces unexpected results
> -------------------------------------------------------
>
> Key: FLINK-36808
> URL: https://issues.apache.org/jira/browse/FLINK-36808
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.20.0, 1.19.1
> Reporter: Qingsheng Ren
> Assignee: Muhammet Orazov
> Priority: Major
> Labels: pull-request-available
>
> Here is the SQL to reproduce the issue:
> {code:java}
> -- Data of table `stream`:
> -- (1, Alice)
> -- (2, Bob)
> CREATE TEMPORARY TABLE `stream` (
> `id` BIGINT,
> `name` STRING,
> `txn_time` as proctime(),
> PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:postgresql://localhost:5432/postgres',
> 'table-name' = 'stream',
> 'username' = 'postgres',
> 'password' = 'postgres'
> );
> -- Data of table `dim`:
> -- (1, OK)
> -- (2, OK)
> CREATE TEMPORARY TABLE `dim` (
> `id` BIGINT,
> `status` STRING,
> PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:postgresql://localhost:5432/postgres',
> 'table-name' = 'dim',
> 'username' = 'postgres',
> 'password' = 'postgres'
> );
> -- Lookup join two tables twice with different filter, and union them together
> SELECT
> s.id,
> s.name,
> s.txn_time,
> d.status
> FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS
> `d`
> ON
> `s`.`id` = `d`.`id`
> WHERE
> `d`.`status` = 'OK'
> UNION ALL
> SELECT
> s.id,
> s.name,
> s.txn_time,
> d.status
> FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS
> `d`
> ON
> `s`.`id` = `d`.`id`
> WHERE
> `d`.`status` = 'NOT_EXISTS';{code}
> The first lookup join should output:
> {code:java}
> (1, Alice 2024-11-27 11:52:19.332, OK)
> (2, Bob 2024-11-27 11:52:19.332, OK) {code}
> The second lookup join should output nothing, as there's not status
> 'NOT_EXISTS'.
> But the result after union is:
> {code:java}
> 1, Alice, 2024-11-27 11:52:19.332, OK
> 2, Bob, 2024-11-27 11:52:19.332, OK
> 1, Alice, 2024-11-27 11:52:19.333, NOT_EXISTS
> 2, Bob, 2024-11-27 11:52:19.333, NOT_EXISTS {code}
> There shouldn't be any 'NOT_EXISTS's.
> The SQL plan shows that, the constant conditions 'OK' and 'NOT_EXISTS' are
> appended directly by the calc after the lookup join operation, which is not
> as expected.
> {code:java}
> | == Abstract Syntax Tree ==
> LogicalUnion(all=[true])
> :- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
> : +- LogicalFilter(condition=[=($4, _UTF-16LE'OK')])
> : +- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
> requiredColumns=[{0, 2}])
> : :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
> : : +- LogicalTableScan(table=[[default_catalog, default_database,
> stream]])
> : +- LogicalFilter(condition=[=($cor0.id, $0)])
> : +- LogicalSnapshot(period=[$cor0.txn_time])
> : +- LogicalTableScan(table=[[default_catalog, default_database,
> dim]])
> +- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
> +- LogicalFilter(condition=[=($4, _UTF-16LE'NOT_EXISTS')])
> +- LogicalCorrelate(correlation=[$cor1], joinType=[inner],
> requiredColumns=[{0, 2}])
> :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
> : +- LogicalTableScan(table=[[default_catalog, default_database,
> stream]])
> +- LogicalFilter(condition=[=($cor1.id, $0)])
> +- LogicalSnapshot(period=[$cor1.txn_time])
> +- LogicalTableScan(table=[[default_catalog, default_database,
> dim]])
> == Optimized Physical Plan ==
> Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
> +- Union(all=[true], union=[id, name, txn_time, status])
> :- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647)
> CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS
> status])
> : +- LookupJoin(table=[default_catalog.default_database.dim],
> joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
> : +- Calc(select=[id, name, PROCTIME() AS txn_time])
> : +- TableSourceScan(table=[[default_catalog, default_database,
> stream]], fields=[id, name])
> +- Calc(select=[id, name, txn_time,
> CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS status])
> +- LookupJoin(table=[default_catalog.default_database.dim],
> joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
> +- Calc(select=[id, name, PROCTIME() AS txn_time])
> +- TableSourceScan(table=[[default_catalog, default_database,
> stream]], fields=[id, name])
> == Optimized Execution Plan ==
> Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
> +- Union(all=[true], union=[id, name, txn_time, status])
> :- Calc(select=[id, name, txn_time, CAST('OK' AS VARCHAR(2147483647)) AS
> status])
> : +- LookupJoin(table=[default_catalog.default_database.dim],
> joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time,
> id])(reuse_id=[1])
> : +- Calc(select=[id, name, PROCTIME() AS txn_time])
> : +- TableSourceScan(table=[[default_catalog, default_database,
> stream]], fields=[id, name])
> +- Calc(select=[id, name, txn_time, CAST('NOT_EXISTS' AS
> VARCHAR(2147483647)) AS status])
> +- Reused(reference_id=[1])
> | {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)