[ 
https://issues.apache.org/jira/browse/FLINK-11070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715954#comment-16715954
 ] 

Fabian Hueske commented on FLINK-11070:
---------------------------------------

Hmmm, I'm hesitant to enable cross joins by default.
It is very easy to write a query that performs unnecessary cross joins. I would 
not trust users with that.
If we run cross joins by default, users will experience very bad performance 
even the query could be rewritten without cross joins. 
With a switch in the table config we can notify the user about a (potentially) 
inefficient query and users can manually enable these queries.

I don't really see a problem removing the switch in the future. We can change 
the default behavior to enable cross joins by default once we have support for 
join reordering. Even if the cardinality estimates are not perfect, equi joins 
should be much cheaper than cross joins.

> Add stream-stream non-window cross join
> ---------------------------------------
>
>                 Key: FLINK-11070
>                 URL: https://issues.apache.org/jira/browse/FLINK-11070
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Hequn Cheng
>            Assignee: Hequn Cheng
>            Priority: Major
>
> Currently, we don't reorder join and rely on the order provided by the user. 
> This is fine for most of the cases, however, it limits the set of supported 
> SQL queries.
> Example:
> {code:java}
> val streamUtil: StreamTableTestUtil = streamTestUtil()
> streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable2", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> streamUtil.addTable[(Int, String, Long)]("MyTable3", 'a, 'b, 'c.rowtime, 
> 'proctime.proctime)
> val sqlQuery =
>       """
>         |SELECT t1.a, t3.b
>         |FROM MyTable3 t3, MyTable2 t2, MyTable t1
>         |WHERE t1.a = t3.a AND t1.a = t2.a
>         |""".stripMargin
> streamUtil.printSql(sqlQuery)
> {code}
> Given the current rule sets, this query produces a cross join which is not 
> supported and thus leads to:
> {code:java}
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
> LogicalProject(a=[$8], b=[$1])
>   LogicalFilter(condition=[AND(=($8, $0), =($8, $4))])
>     LogicalJoin(condition=[true], joinType=[inner])
>       LogicalJoin(condition=[true], joinType=[inner])
>         LogicalTableScan(table=[[_DataStreamTable_2]])
>         LogicalTableScan(table=[[_DataStreamTable_1]])
>       LogicalTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> {code}
> In order to support more queries, it would be nice to have cross join on 
> streaming. We can start from a simple version, for example, call 
> forceNonParallel() for connectOperator in `DataStreamJoin` when it is a cross 
> join. The performance may be bad. But it works fine if the two tables of 
> cross join are small ones. 
> We can do some optimizations later, such as broadcasting the smaller side, 
> etc.
> Any suggestions are greatly appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to