[
https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15876190#comment-15876190
]
ASF GitHub Bot commented on FLINK-5498:
---------------------------------------
GitHub user lincoln-lil opened a pull request:
https://github.com/apache/flink/pull/3379
[FLINK-5498] [table] Add support for left/right outer joins with non-…
…equality predicates (and 1+ equality predicates)
Support left/right outer joins with non-equi-join conditions via coGroup
operator with a generated OuterJoinCoGroupFunction.
It should be noted that current implementation is not memory safe when do a
many-to-one outer
join which will copy the opposite side input into an ArrayList buffer. It's
a work-around for now due to the backend limitation of shared iterator instance.
In the long run, I think we should extend the runtime join operators to
support such more join conditions.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/lincoln-lil/flink FLINK-5498
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3379.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3379
----
commit 224ff229feb62db974c910f82bf9aea403949712
Author: lincoln-lil <[email protected]>
Date: 2017-02-16T09:05:49Z
[FLINK-5498] [table] Add support for left/right outer joins with
non-equality predicates (and 1+ equality predicates)
----
> Add support for left/right outer joins with non-equality predicates (and 1+
> equality predicates)
> ------------------------------------------------------------------------------------------------
>
> Key: FLINK-5498
> URL: https://issues.apache.org/jira/browse/FLINK-5498
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Affects Versions: 1.3.0
> Reporter: lincoln.lee
> Assignee: lincoln.lee
> Priority: Minor
>
> I found the expected result of a unit test case incorrect compare to that in
> a RDMBS,
> see
> flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
> {code:title=JoinITCase.scala}
> def testRightJoinWithNotOnlyEquiJoin(): Unit = {
> ...
> val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b,
> 'c)
> val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e,
> 'f, 'g, 'h)
> val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
>
> val expected = "Hello world,BCD\n"
> val results = joinT.toDataSet[Row].collect()
> TestBaseUtils.compareResultAsText(results.asJava, expected)
> }
> {code}
> Then I took some time to learn about the ‘outer join’ in relational
> databases, the right result of above case should be(tested in SQL Server and
> MySQL, the results are same):
> {code}
> > select c, g from tuple3 right outer join tuple5 on a=f and b<h;
> c g
> -------------------------------- --------------------------------
> NULL Hallo
> NULL Hallo Welt
> NULL Hallo Welt wie
> NULL Hallo Welt wie gehts?
> NULL ABC
> Hello world BCD
> NULL CDE
> NULL DEF
> NULL EFG
> NULL FGH
> NULL GHI
> NULL HIJ
> NULL IJK
> NULL JKL
> NULL KLM
> {code}
> the join condition {{rightOuterJoin('a === 'd && 'b < 'h)}} is not equivalent
> to {{rightOuterJoin('a === 'd).where('b < 'h)}}.
> The problem is rooted in the code-generated {{JoinFunction}} (see
> {{DataSetJoin.translateToPlan()}}, line 188). If the join condition does not
> match, we must emit the outer row padded with nulls instead of returning from
> the function without emitting anything.
> The code-generated {{JoinFunction}} does also include equality predicates.
> These should be removed before generating the code, e.g., in
> {{DataSetJoinRule}} when generating the {{DataSetJoin}} with help of
> {{JoinInfo.getRemaining()}}.
> More details: https://goo.gl/ngekca
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)