[ 
https://issues.apache.org/jira/browse/FLINK-33365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784915#comment-17784915
 ] 

david radley edited comment on FLINK-33365 at 11/10/23 12:29 PM:
-----------------------------------------------------------------

an update on what I have found:

 

I have switched on DEBUG put out the rules that are being driven for my 
recreation. I see :

org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram [] - 
optimize time_indicator cost 1 ms.

optimize result:

FlinkLogicalSink(table=[*anonymous_collect$4*], fields=[ip, proctime, ip0, 
type])

+- FlinkLogicalCalc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, 
ip0, type])

   +- FlinkLogicalJoin(condition=[=($0, $4)], joinType=[left])

      :- FlinkLogicalCalc(select=[ip, PROCTIME() AS proctime])

      :  +- FlinkLogicalTableSourceScan(table=[[paimon_catalog, default, a]], 
fields=[ip])

      +- FlinkLogicalSnapshot(period=[$cor0.proctime])

         +- FlinkLogicalCalc(select=[ip, CAST(0 AS INTEGER) AS type, CAST(ip AS 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS ip0])

            +- FlinkLogicalTableSourceScan(table=[[mariadb_catalog, menagerie, 
c, {*}filter=[=(type, 0)]]]{*}, fields=[ip, type])

 

Is changed in the next rule to  

org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram [] - 
optimize physical cost 3 ms.

optimize result:

Sink(table=[*anonymous_collect$4*], fields=[ip, proctime, ip0, type])

+- Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])

   +- LookupJoin(table=[mariadb_catalog.menagerie.c], joinType=[LeftOuterJoin], 
lookup=[ip=ip], select=[ip, proctime, ip, *CAST(0 AS INTEGER)* AS type, CAST(ip 
AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS ip0])

      +- Calc(select=[ip, PROCTIME() AS proctime])

         +- TableSourceScan(table=[[paimon_catalog, default, a]], fields=[ip])

 

The *CAST(0 AS INTEGER)* is in the final Optimized Execution Plan we see in the 
explain.

 

I am not an expert at this, but it seems to me that either 2 things are 
happening:

1) This change to the graph is a valid optimization but it is not being 
actioned properly when executed, such that the CAST(0 AS INTEGER) is ignored.

or

2) the comments at the top of 
[CommonPhysicalLookupJoin.scala]([https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala)]
 are correct and this filter should actually be in the lookup keys. The 
comments says

 
_* For a lookup join query:_
_*_
_* <pre> SELECT T.id, T.content, D.age FROM T JOIN userTable FOR SYSTEM_TIME AS 
OF T.proctime AS D_
_* ON T.content = concat(D.name, '!') AND D.age = 11 AND T.id = D.id WHERE 
D.name LIKE 'Jack%'_
_* </pre>_
_*_
_* The LookupJoin physical node encapsulates the following RelNode tree:_
_*_
_* <pre> Join (l.name = r.name) / \ RelNode Calc (concat(name, "!") as name, 
name LIKE 'Jack%') |_
_* DimTable (lookup-keys: age=11, id=l.id) (age, id, name) </pre>_
_*_
_* The important member fields in LookupJoin: <ul> <li>allLookupKeys: [$0=11, 
$1=l.id] ($0 and $1 is_
_* the indexes of age and id in dim table)</li> <li>remainingCondition: 
l.name=r.name</li> <ul>_
_*_
_* The workflow of lookup join:_
_*_
_* 1) lookup records dimension table using the lookup-keys <br> 2) project & 
filter on the lookup-ed_
_* records <br> 3) join left input record and lookup-ed records <br> 4) only 
outputs the rows which_
_* match to the remainingCondition <br>_
 


was (Author: JIRAUSER300523):
an update on what I have found:

 

I have switched on DEBUG put out the rules that are being driven for my 
recreation. I see :

org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram [] - 
optimize time_indicator cost 1 ms.

optimize result:

FlinkLogicalSink(table=[*anonymous_collect$4*], fields=[ip, proctime, ip0, 
type])

+- FlinkLogicalCalc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, 
ip0, type])

   +- FlinkLogicalJoin(condition=[=($0, $4)], joinType=[left])

      :- FlinkLogicalCalc(select=[ip, PROCTIME() AS proctime])

      :  +- FlinkLogicalTableSourceScan(table=[[paimon_catalog, default, a]], 
fields=[ip])

      +- FlinkLogicalSnapshot(period=[$cor0.proctime])

         +- FlinkLogicalCalc(select=[ip, CAST(0 AS INTEGER) AS type, CAST(ip AS 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS ip0])

            +- FlinkLogicalTableSourceScan(table=[[mariadb_catalog, menagerie, 
c, {*}filter=[=(type, 0)]]]{*}, fields=[ip, type])

 

Is changed in the next rule to  

org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram [] - 
optimize physical cost 3 ms.

optimize result:

Sink(table=[*anonymous_collect$4*], fields=[ip, proctime, ip0, type])

+- Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])

   +- LookupJoin(table=[mariadb_catalog.menagerie.c], joinType=[LeftOuterJoin], 
lookup=[ip=ip], select=[ip, proctime, ip, *CAST(0 AS INTEGER)* AS type, CAST(ip 
AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS ip0])

      +- Calc(select=[ip, PROCTIME() AS proctime])

         +- TableSourceScan(table=[[paimon_catalog, default, a]], fields=[ip])

 

The *CAST(0 AS INTEGER)* is in the final Optimized Execution Plan we see in the 
explain.

 

I am not an expert at this, but it seems to me that either 2 things are 
happening:

1) This change to the graph is a valid optimization but it is not being 
actioned properly when executed, such that the CAST(0 AS INTEGER) is ignored ** 

or

2) the comments at the top of 
[CommonPhysicalLookupJoin.scala]([https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala)]
 are correct and this filter should actually be in the lookup keys. The 
comments says

 
_* For a lookup join query:_
_*_
_* <pre> SELECT T.id, T.content, D.age FROM T JOIN userTable FOR SYSTEM_TIME AS 
OF T.proctime AS D_
_* ON T.content = concat(D.name, '!') AND D.age = 11 AND T.id = D.id WHERE 
D.name LIKE 'Jack%'_
_* </pre>_
_*_
_* The LookupJoin physical node encapsulates the following RelNode tree:_
_*_
_* <pre> Join (l.name = r.name) / \ RelNode Calc (concat(name, "!") as name, 
name LIKE 'Jack%') \|_
_* DimTable (lookup-keys: age=11, id=l.id) (age, id, name) </pre>_
_*_
_* The important member fields in LookupJoin: <ul> <li>allLookupKeys: [$0=11, 
$1=l.id] ($0 and $1 is_
_* the indexes of age and id in dim table)</li> <li>remainingCondition: 
l.name=r.name</li> <ul>_
_*_
_* The workflow of lookup join:_
_*_
_* 1) lookup records dimension table using the lookup-keys <br> 2) project & 
filter on the lookup-ed_
_* records <br> 3) join left input record and lookup-ed records <br> 4) only 
outputs the rows which_
_* match to the remainingCondition <br>_
 
 
 

 

 

 

 

 

 

 

> Missing filter condition in execution plan containing lookup join with mysql 
> jdbc connector
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33365
>                 URL: https://issues.apache.org/jira/browse/FLINK-33365
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.18.0, 1.17.1
>         Environment: Flink 1.17.1 & Flink 1.18.0 with 
> flink-connector-jdbc-3.1.1-1.17.jar
>            Reporter: macdoor615
>            Assignee: david radley
>            Priority: Critical
>         Attachments: flink-connector-jdbc-3.0.0-1.16.png, 
> flink-connector-jdbc-3.1.1-1.17.png
>
>
> create table in flink with sql-client.sh
> {code:java}
> CREATE TABLE default_catalog.default_database.a (
>   ip string, 
>   proctime as proctime()
> ) 
> WITH (
>   'connector' = 'datagen'
> );{code}
> create table in mysql
> {code:java}
> create table b (
>   ip varchar(20), 
>   type int
> );  {code}
>  
> Flink 1.17.1/ 1.18.0 and *flink-connector-jdbc-3.1.1-1.17.jar*
> excute in sql-client.sh 
> {code:java}
> explain SELECT * FROM default_catalog.default_database.a left join 
> bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and 
> a.ip = b.ip; {code}
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip 
> AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]){code}
>  
> excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and 
> *flink-connector-jdbc-3.0.0-1.16.jar*
> get the execution plan
> {code:java}
> ...
> == Optimized Execution Plan ==
> Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
> +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], 
> lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 
> AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
>    +- Calc(select=[ip, PROCTIME() AS proctime])
>       +- TableSourceScan(table=[[default_catalog, default_database, a]], 
> fields=[ip]) {code}
> with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 
> *lookup=[ip=ip]*
> with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 
> *lookup=[type=0, ip=ip], where=[(type = 0)]*
>  
> In out real world production environment, this lead incorrect data output
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to