[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785563#comment-17785563
 ] 

david radley edited comment on FLINK-33365 at 11/13/23 6:13 PM:
----------------------------------------------------------------

[~Sergey Nuyanzin] [~qingwei91] thanks for your input. The 2 circumventions do 
not work for me on master, on the reported case. 

It sounds like the introduction of pushdown predicate capability for jdbc, 
broke this case.  At the moment the 
JdbcRowDataLookupFunction just adds the keyNames to the conditions 
 
_this.query =_
_options.getDialect()_
_.getSelectFromStatement(options.getTableName(), fieldNames, keyNames);_
 
 
It seems to me that we need to save  the filter information when we do the 
pushdown logic, so we can add this as a condition to the last parameter. I was 
hoping to contribute this as a fix, as I have been assigned the issue;  
[~qingwei91] is this still possible- or do you plan to fix this? It would be 
great to hear your thoughts on this. 

 


 
 
 
 

 


was (Author: JIRAUSER300523):
[~Sergey Nuyanzin] [~qingwei91] thanks for your input. The 2 circumventions do 
not work for me on master, on the reported case. 

It sounds like the introduction of pushdown predicate capability for jdbc, 
broke this case.  At the moment the 
JdbcRowDataLookupFunction just adds the keyNames to the conditions 
 
_this.query =_
_options.getDialect()_
_.getSelectFromStatement(options.getTableName(), fieldNames, keyNames);_
 
 
It seems to me that we need to save  the filter information when we do the 
pushdown logic, so we can add this as a condition to the last parameter. I was 
hoping to contribute this as a fix, as I have been assigned the issue;  
[~qingwei91] is this still possible- or do you plan to fix this? What do you 
think of this approach?
 
 
 
 

 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33365
>                 URL: https://issues.apache.org/jira/browse/FLINK-33365
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.18.0, 1.17.1
>         Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>            Reporter: macdoor615
>            Assignee: david radley
>            Priority: Critical
>         Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to