Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1981#discussion_r64567800
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
---
@@ -269,22 +269,60 @@ case class Join(
condition: Option[Expression]) extends BinaryNode {
override def output: Seq[Attribute] = {
- joinType match {
- case JoinType.INNER => left.output ++ right.output
- case j => throw new ValidationException(s"Unsupported JoinType: $j")
+ left.output ++ right.output
+ }
+
+ private case class JoinFieldReference(
+ name: String,
+ resultType: TypeInformation[_],
+ left: RelNode,
+ right: RelNode) extends Attribute {
+
+ override def toString = s"'$name"
+
+ override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ val joinInputField = if
(left.getRowType.getFieldNames.contains(name)) {
+ val field = left.getRowType.getField(name, false, false)
+ (field.getIndex, field.getType)
+ } else {
+ val field = right.getRowType.getField(name, false, false)
+ (field.getIndex + left.getRowType.getFieldCount, field.getType)
+ }
+
+ new RexInputRef(joinInputField._1, joinInputField._2)
+ }
+
+ override def withName(newName: String): Attribute = {
+ if (newName == name) {
+ this
+ } else {
+ JoinFieldReference(newName, resultType, left, right)
+ }
}
}
override protected[logical] def construct(relBuilder: RelBuilder):
RelBuilder = {
- joinType match {
- case JoinType.INNER =>
- left.construct(relBuilder)
- right.construct(relBuilder)
- relBuilder.join(JoinRelType.INNER,
-
condition.map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true)))
- case _ =>
- throw new ValidationException(s"Unsupported JoinType: $joinType")
+ left.construct(relBuilder)
+ right.construct(relBuilder)
+ val partialFunction: PartialFunction[Expression, Expression] = {
+ case field: ResolvedFieldReference => new JoinFieldReference(
+ field.name,
+ field.resultType,
+ relBuilder.peek(2, 0),
+ relBuilder.peek(2, 1))
}
+
+ val transformedExpression =
condition.map(_.postOrderTransform(partialFunction))
+
.map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true))
+
+ relBuilder.join(flinkJoinTypeToCalcite(joinType),
transformedExpression)
+ }
+
--- End diff --
can you implement the `validate()` method and check if the join condition
(if set) consists only of join predicates and at least one equality join
predicate? Otherwise we would fail later during optimization.
So something like: `(left.a == right.b AND left.c < right.d)` would be OK
but the following would fail:
- `(left.a < right.b)` // no equi join
- `(left.a == right.b or left.c == right.d)` // disjunctive preds are not
supported atm
- `(left.a == right.b and left.a > 10)` // non-join preds would be OK, but
I think it would be cleaner to do that in `where()`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---