[
https://issues.apache.org/jira/browse/FLINK-5256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16004950#comment-16004950
]
ASF GitHub Bot commented on FLINK-5256:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3673#discussion_r115784332
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapJoinLeftRunner.scala
---
@@ -19,19 +19,35 @@
package org.apache.flink.table.runtime
import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.types.Row
import org.apache.flink.util.Collector
class MapJoinLeftRunner[IN1, IN2, OUT](
name: String,
code: String,
+ outerJoin: Boolean,
returnType: TypeInformation[OUT],
broadcastSetName: String)
extends MapSideJoinRunner[IN1, IN2, IN2, IN1, OUT](name, code,
returnType, broadcastSetName) {
override def flatMap(multiInput: IN1, out: Collector[OUT]): Unit = {
broadcastSet match {
case Some(singleInput) => function.join(multiInput, singleInput, out)
- case None =>
+ case None if outerJoin => function.
+ join(multiInput, null.asInstanceOf[IN2],
out)
+ case None => {
+ if (outerJoin && isRowClass(multiInput) &&
returnType.getTypeClass.equals(classOf[Row])) {
+ val inputRow = multiInput.asInstanceOf[Row]
+ val countNullRecords = returnType.getTotalFields -
inputRow.getArity
+ val nullRecords = new Row(countNullRecords)
+ function.join(multiInput, nullRecords.asInstanceOf[IN2], out)
+ }
+ }
}
}
+
+ private def isRowClass(obj: Any) = obj match {
--- End diff --
Can be removed
> Extend DataSetSingleRowJoin to support Left and Right joins
> -----------------------------------------------------------
>
> Key: FLINK-5256
> URL: https://issues.apache.org/jira/browse/FLINK-5256
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Affects Versions: 1.2.0
> Reporter: Fabian Hueske
> Assignee: Dmytro Shkvyra
>
> The {{DataSetSingleRowJoin}} is a broadcast-map join that supports arbitrary
> inner joins where one input is a single row.
> I found that Calcite translates certain subqueries into non-equi left and
> right joins with single input. These cases can be handled if the
> {{DataSetSingleRowJoin}} is extended to support outer joins on the
> non-single-row input, i.e., left joins if the right side is single input and
> vice versa.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)