[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785563#comment-17785563
 ] 

david radley edited comment on FLINK-33365 at 11/13/23 4:45 PM:
----------------------------------------------------------------

[~Sergey Nuyanzin] [~qingwei91] thanks for your input. The 2 circumventions do 
not work for me on master, on the reported case. 

It sounds like the introduction of pushdown predicate capability for jdbc, 
broke this case.  At the moment the 
JdbcRowDataLookupFunction just adds the keyNames to the conditions 
 
_this.query =_
_options.getDialect()_
_.getSelectFromStatement(options.getTableName(), fieldNames, keyNames);_
 
 
It seems to me that we need to include the filter information into the context 
when we do the pushdown logic, so we can add this as a condition to the last 
parameter. I was hoping to contribute this as a fix, as I have been assigned 
the issue;  [~qingwei91] is this still possible- or do you plan to fix this? 
What do you think of this approach?
 
 
 
 

 


was (Author: JIRAUSER300523):
[~Sergey Nuyanzin] [~qingwei91] thanks for you input. The 2 circumventions do 
not work for me on master, on the reported case. 

It sounds like the introduction of pushdown predicate capability for jdbc, 
broke this case.  At the moment the 
JdbcRowDataLookupFunction just adds the keyNames to the conditions 
 
_this.query =_
_options.getDialect()_
_.getSelectFromStatement(options.getTableName(), fieldNames, keyNames);_
 
 
It seems to me that we need to include the filter information into the context 
when we do the pushdown logic, so we can add this as a condition to the last 
parameter. I was hoping to contribute this as a fix, as I have been assigned 
the issue;  [~qingwei91] is this still possible- or do you plan to fix this? 
 
 
 
 

 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33365
>                 URL: https://issues.apache.org/jira/browse/FLINK-33365
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.18.0, 1.17.1
>         Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>            Reporter: macdoor615
>            Assignee: david radley
>            Priority: Critical
>         Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to