[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785813#comment-17785813
 ] 

david radley commented on FLINK-33365:
--------------------------------------

[~qingwei91] thank you very much for you support :) . One question I have about 
the design intent.

1) Would it be true to say that the Calcite rule's should dictate the general 
shape of the query, I can see that there are rules around predicate pushdown 
and lookups. Then the JDBC dialect would be minimal type mapping of the graph. 

2) The solution we are considering, because the predicate pushdown logic was 
done in the JDBC code, we are fixing up the query in the JDBC code to not lose 
the conditions.

 

Would it be true to say the 2nd approach is more tactical and short term as 
this is a blocker and the 1st approach would be more ideal? Or am I 
misunderstanding?

 

Either way I will look into option 2), as a change of the rules is much more 
impactful outside of JDBC. 

 

 

       

  

 

   

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33365
>                 URL: https://issues.apache.org/jira/browse/FLINK-33365
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.18.0, 1.17.1
>         Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>            Reporter: macdoor615
>            Assignee: david radley
>            Priority: Critical
>         Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to