[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785967#comment-17785967
 ] 

david radley edited comment on FLINK-33365 at 11/14/23 4:50 PM:
----------------------------------------------------------------

[~libenchao]  and [~qingwei91] I have had a play and have some code that is 
working. I have put it in a draft pr 
[https://github.com/apache/flink-connector-jdbc/pull/79]

 

Could you feedback on it please?

 

My thinking behind the change:

The issue here is that the filters are not honoured in the loop up join when 
there are push down predicates.

The lookup processing has access to the lookup keys as well as the predicate 
pushdowns, the predicate push down information is parameterized. 

The existing getSelectStatement takes as it last parameter conditions, it takes 
the lookupkeys and creates an equality conditions in AbstractDialect like this 

 
_String fieldExpressions =_
_Arrays.stream(conditionFields)_
_.map(f -> format("%s = :%s", quoteIdentifier(f), f))_
_.collect(Collectors.joining(" AND "));_
 

This is not appropriate for a condition like a.type = 0 containing a literal 
and potentially not being an equals condition.

The predicate pushdown is currently only is implemented  for simple 
expressions. 

The idea of the change is to resolve the conditions completely in the lookup 
code and pass an array of Strings to a new overloaded implementation of 
getSelectStatement, so we can add the appropriate conditions as a new 
parameter. Note that the conditions for non-predicate pushdown would end up as 
filters in the existing select conditions.  

 

I do not think the pr is ready yet, but I would love to hear your feedback. I 
intend to: 
 * to do more testing 
 ** multiple conditions including non equalities
 ** multiple lookup keys
 * ideally add test cases along the lines that have been suggested by [~Sergey 
Nuyanzin] 
 * look into whether I can use existing Flink classes to resolve the condition 
insert (I have used a replace); I see there is a 
ResolvedExpression class I might be able to use.
 * Any ideas on other tests I could try to break this?
 * We could do the resolution of the String inserts in the getSelectStatement - 
but this would involved passing both the predicate and the parameter in rather 
than resolving in the caller.  I think resolving it in the caller makes for a 
simpler call.   
 * Currently there is no specific logic to account for partitions (which the 
scan logic does), we may need to do something for partitions here. 


was (Author: JIRAUSER300523):
[~libenchao]  and [~qingwei91] I have had a play and have some code that is 
working. I have put it in a draft pr 
[https://github.com/apache/flink-connector-jdbc/pull/79|https://github.com/apache/flink-connector-jdbc/pull/79.]

Could you feedback on it please?

 

My thinking behind the change:

The issue here is that the filters are not honoured in the loop up join when 
there are push down predicates.

The lookup processing has access to the lookup keys as well as the predicate 
pushdowns, the predicate push down information is parameterized. 

The existing getSelectStatement takes as it last parameter conditions, it takes 
the lookupkeys and creates an equality conditions in AbstractDialect like this 

 
_String fieldExpressions =_
_Arrays.stream(conditionFields)_
_.map(f -> format("%s = :%s", quoteIdentifier(f), f))_
_.collect(Collectors.joining(" AND "));_
 

This is not appropriate for a condition like a.type = 0 containing a literal 
and potentially not being an equals condition.

The predicate pushdown is currently only is implemented  for simple 
expressions. 

The idea of the change is to resolve the conditions completely in the lookup 
code and pass an array of Strings to a new overloaded implementation of 
getSelectStatement, so we can add the appropriate conditions as a new 
parameter. Note that the conditions for non-predicate pushdown would end up as 
filters in the existing select conditions.  

 

I do not think the pr is ready yet, but I would love to hear your feedback. I 
intend to: 
 * to do more testing 
 ** multiple conditions including non equalities
 ** multiple lookup keys
 * ideally add test cases along the lines that have been suggested by [~Sergey 
Nuyanzin] 
 * look into whether I can use existing Flink classes to resolve the condition 
insert (I have used a replace); I see there is a 
ResolvedExpression class I might be able to use.
 * Any ideas on other tests I could try to break this?
 * We could do the resolution of the String inserts in the getSelectStatement - 
but this would involved passing both the predicate and the parameter in rather 
than resolving in the caller.  I think resolving it in the caller makes for a 
simpler call.   
 * Currently there is no specific logic to account for partitions (which the 
scan logic does), we may need to do something for partitions here. 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33365
>                 URL: https://issues.apache.org/jira/browse/FLINK-33365
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.18.0, 1.17.1
>         Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>            Reporter: macdoor615
>            Assignee: david radley
>            Priority: Critical
>         Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to