[ https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781712#comment-17781712 ]
david radley edited comment on FLINK-33365 at 11/1/23 12:06 PM: ---------------------------------------------------------------- [~macdoor615] I am running on a Mac and have Mariadb (I tweaked the dialect factory to allow mariadb in the jdbc url - see https://issues.apache.org/jira/browse/FLINK-28433 ) and the latest Flink jdbc connector (master 3.2-SNAPSHOT that I built from source) and I cannot recreate the problem you are seeing (I see the filter as expected in the explain). Can you try: * the latest Flink JDBC connector 3.2-SNAPSHOT built from source * the latest mySQL JDBC driver. I used the platform independent [https://dev.mysql.com/downloads/connector/j/] . This seems to be recommended [https://blogs.oracle.com/mysql/post/mysql-connectorj-has-new-maven-coordinates]([https://blogs.oracle.com/mysql/post/mysql-connectorj-has-new-maven-coordinates).] I have raised issue https://issues.apache.org/jira/browse/FLINK-33384 to change to docs and code that reference the old deprecated JDBC driver. I hope these suggestions resolve this for you; if not I can look to setup a Linux image with mySQL; which JDBC driver do you see the issue on? was (Author: JIRAUSER300523): [~macdoor615] I am running on a Mac and have Mariadb (I tweaked the dialect factory to allow mariadb in the jdbc url - see https://issues.apache.org/jira/browse/FLINK-28433 ) and the latest Flink jdbc connector (master 3.2-SNAPSHOT that I built from source) and I cannot recreate the problem you are seeing (I see the filter as expected in the explain). Can you try: * the latest Flink JDBC connector 3.2-SNAPSHOT built from source * the latest mySQL JDBC driver. I used the platform independent [https://dev.mysql.com/downloads/connector/j/] . This seems to be recommended [https://blogs.oracle.com/mysql/post/mysql-connectorj-has-new-maven-coordinates]([https://blogs.oracle.com/mysql/post/mysql-connectorj-has-new-maven-coordinates).] I have raised issue https://issues.apache.org/jira/browse/FLINK-33384 to change to docs and code that reference the old deprecated JDBC driver. I hope these suggestions resoilve this for you; if not I can look to setup a Linux image with mySQL; which JDBC driver do you see the issue on? > Missing filter condition in execution plan containing lookup join with mysql > jdbc connector > ------------------------------------------------------------------------------------------- > > Key: FLINK-33365 > URL: https://issues.apache.org/jira/browse/FLINK-33365 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Affects Versions: 1.18.0, 1.17.1 > Environment: Flink 1.17.1 & Flink 1.18.0 with > flink-connector-jdbc-3.1.1-1.17.jar > Reporter: macdoor615 > Assignee: david radley > Priority: Critical > Attachments: flink-connector-jdbc-3.0.0-1.16.png, > flink-connector-jdbc-3.1.1-1.17.png > > > create table in flink with sql-client.sh > {code:java} > CREATE TABLE default_catalog.default_database.a ( > ip string, > proctime as proctime() > ) > WITH ( > 'connector' = 'datagen' > );{code} > create table in mysql > {code:java} > create table b ( > ip varchar(20), > type int > ); {code} > > Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar* > excute in sql-client.sh > {code:java} > explain SELECT * FROM default_catalog.default_database.a left join > bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and > a.ip = b.ip; {code} > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip > AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]){code} > > excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and > *flink-connector-jdbc-3.0.0-1.16.jar* > get the execution plan > {code:java} > ... > == Optimized Execution Plan == > Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type]) > +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], > lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 > AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0]) > +- Calc(select=[ip, PROCTIME() AS proctime]) > +- TableSourceScan(table=[[default_catalog, default_database, a]], > fields=[ip]) {code} > with flink-connector-jdbc-3.1.1-1.17.jar, the condition is > *lookup=[ip=ip]* > with flink-connector-jdbc-3.0.0-1.16.jar , the condition is > *lookup=[type=0, ip=ip], where=[(type = 0)]* > > In out real world production environment, this lead incorrect data output > > -- This message was sent by Atlassian Jira (v8.20.10#820010)