macdoor615 created FLINK-33365:
----------------------------------
Summary: Missing filter condition in execution plan containing
lookup join with mysql jdbc connector
Key: FLINK-33365
URL: https://issues.apache.org/jira/browse/FLINK-33365
Project: Flink
Issue Type: Bug
Components: Connectors / JDBC
Affects Versions: 1.17.1, 1.18.0
Environment: Flink 1.17.1 & Flink 1.18.0 with
flink-connector-jdbc-3.1.1-1.17.jar
Reporter: macdoor615
create table in flink with sql-client.sh
{code:java}
CREATE TABLE default_catalog.default_database.a (
ip string,
proctime as proctime()
)
WITH (
'connector' = 'datagen'
);{code}
create table in mysql
{code:java}
create table b (
ip varchar(20),
type int
); {code}
Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
excute in sql-client.sh
{code:java}
explain SELECT * FROM default_catalog.default_database.a left join
bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and
a.ip = b.ip; {code}
get the execution plan
{code:java}
...
== Optimized Execution Plan ==
Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
+- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin],
lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip
AS VARCHAR(2147483647)) AS ip0])
+- Calc(select=[ip, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, a]],
fields=[ip]){code}
excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and
*flink-connector-jdbc-3.0.0-1.16.jar*
get the execution plan
{code:java}
== Optimized Execution Plan ==
Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
+- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin],
lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 AS
INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
+- Calc(select=[ip, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, a]],
fields=[ip]) {code}
with flink-connector-jdbc-3.1.1-1.17.jar, the condition is
*lookup=[ip=ip]*
with flink-connector-jdbc-3.0.0-1.16.jar , the condition is
{*}lookup=[type=0, ip=ip], where=[(type = 0)]{*}{*}{*}
In out real world production environment, incorrect data is output
--
This message was sent by Atlassian Jira
(v8.20.10#820010)