[ 
https://issues.apache.org/jira/browse/FLINK-10734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17375207#comment-17375207
 ] 

Leonard Xu edited comment on FLINK-10734 at 7/6/21, 7:46 AM:
-------------------------------------------------------------

[~twalthr] Although the bug still exits in legacy temporal join function, user 
can use new event-time versioned table syntax [1] which supports multiple field 
in primary key, thus I think we can close this one.

[1] 
[https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/concepts/versioned_tables/#versioned-table-sources]


was (Author: leonard xu):
[~twalthr] Although the bug still exits in legacy temporal join function, user 
can use new event-time versioned table syntax [1] to which support multiple 
field in primary key, thus I think we can close this one.

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/concepts/versioned_tables/#versioned-table-sources

> Temporal joins on heavily filtered tables might fail in planning
> ----------------------------------------------------------------
>
>                 Key: FLINK-10734
>                 URL: https://issues.apache.org/jira/browse/FLINK-10734
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.7.0
>            Reporter: Piotr Nowojski
>            Priority: Minor
>
> Following query:
> {code}
>     val sqlQuery =
>       """
>         |SELECT
>         |  o.amount * r.rate AS amount
>         |FROM
>         |  Orders AS o,
>         |  LATERAL TABLE (Rates(o.rowtime)) AS r
>         |WHERE r.currency = o.currency
>         |""".stripMargin
> {code}
> with {{Rates}} defined as follows:
> {code}
>     tEnv.registerTable("EuroRatesHistory", 
> tEnv.scan("RatesHistory").filter('currency === "Euro"))
>     tEnv.registerFunction(
>       "Rates",
>       tEnv.scan("EuroRatesHistory").createTemporalTableFunction('rowtime, 
> 'currency))
> {code}
> Will fail with:
> {noformat}
> org.apache.flink.table.api.ValidationException: Only single column join key 
> is supported. Found [] in [InnerJoin(where: 
> (__TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, currency)), join: (amount, 
> rowtime, currency, rate, rowtime0))]
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.validateRightPrimaryKey(DataStreamTemporalJoinToCoProcessTranslator.scala:215)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:183)
>  at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152)
> {noformat}
> The problem is that filtering condition {{('currency === "Euro")}} interferes 
> with joining condition, simplifying it to nothing. Note how top 
> {{LogicalFilter(condition=[=($3, $1)])}} changes during optimising and 
> finally disappears:
> {noformat}
> LogicalProject(amount=[*($0, $4)])
>   LogicalFilter(condition=[=($3, $1)])
>     LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5, 
> $3)], joinType=[inner])
>       LogicalTableScan(table=[[_DataStreamTable_0]])
>       LogicalFilter(condition=[=($0, _UTF-16LE'Euro')])
>         LogicalTableScan(table=[[_DataStreamTable_1]])
> {noformat}
> {noformat}
> LogicalProject(amount=[*($0, $4)])
>   LogicalFilter(condition=[=(_UTF-16LE'Euro', $1)])
>     LogicalProject(amount=[$0], currency=[$1], rowtime=[$2], currency0=[$3], 
> rate=[$4], rowtime0=[CAST($5):TIMESTAMP(3) NOT NULL])
>       LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5, 
> $3)], joinType=[inner])
>         LogicalTableScan(table=[[_DataStreamTable_0]])
>         LogicalFilter(condition=[=($0, _UTF-16LE'Euro')])
>           LogicalTableScan(table=[[_DataStreamTable_1]])
> {noformat}
> {noformat}
> FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[*($t0, $t3)], amount=[$t5])
>   FlinkLogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($1, $4, 
> $2)], joinType=[inner])
>     FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'], 
> expr#4=[=($t3, $t1)], amount=[$t0], rowtime=[$t2], $condition=[$t4])
>       FlinkLogicalNativeTableScan(table=[[_DataStreamTable_0]])
>     FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'], 
> expr#4=[=($t0, $t3)], proj#0..2=[{exprs}], $condition=[$t4])
>       FlinkLogicalNativeTableScan(table=[[_DataStreamTable_1]])
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to