cloud-fan commented on code in PR #50921:
URL: https://github.com/apache/spark/pull/50921#discussion_r2108678669
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -98,6 +108,100 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan]
with PredicateHelper {
filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
}
+ def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+ // Join can be attempted to be pushed down only if left and right side of
join are
+ // compatible (same data source, for example). Also, another requirement
is that if
+ // there are projections between Join and ScanBuilderHolder, these
projections need to be
+ // AttributeReferences. We could probably support Alias as well, but this
should be on
+ // TODO list.
+ // Alias can exist between Join and sHolder node because the query below
is not valid:
+ // SELECT * FROM
+ // (SELECT * FROM tbl t1 JOIN tbl2 t2) p
+ // JOIN
+ // (SELECT * FROM tbl t3 JOIN tbl3 t4) q
+ // ON p.t1.col = q.t3.col (this is not possible)
+ // It's because there are 2 same tables in both sides of top level join
and it not possible
+ // to fully qualified the column names in condition. Therefore, query
should be rewritten so
+ // that each of the outputs of child joins are aliased, so there would be
a projection
+ // with aliases between top level join and scanBuilderHolder (that has
pushed child joins).
+ case node @ Join(
+ PhysicalOperation(
+ leftProjections,
+ Nil,
+ leftHolder @ ScanBuilderHolder(_, _, lBuilder: SupportsPushDownJoin)
+ ),
+ PhysicalOperation(
+ rightProjections,
+ Nil,
+ rightHolder @ ScanBuilderHolder(_, _, rBuilder: SupportsPushDownJoin)
+ ),
+ joinType,
+ condition,
+ _) if conf.dataSourceV2JoinPushdown &&
+ // TODO: I think projections will always be Seq[AttributeReference]
because
+ // When
+ // SELECT tbl1.col+2, tbl2.* FROM tbl1 JOIN tlb2
+ // is executed, col is pruned down, but col + 2 will be projected on top
of join.
+ leftProjections.forall(_.isInstanceOf[AttributeReference]) &&
+ rightProjections.forall(_.isInstanceOf[AttributeReference]) &&
+ lBuilder.isRightSideCompatibleForJoin(rBuilder) =>
+ val normalizedLeftProjections = DataSourceStrategy.normalizeExprs(
+ leftProjections,
+ leftHolder.output
+ ).asInstanceOf[Seq[AttributeReference]]
+ val leftRequiredSchema = fromAttributes(normalizedLeftProjections)
+
+ val normalizedRightProjections = DataSourceStrategy.normalizeExprs(
+ rightProjections,
+ rightHolder.output
+ ).asInstanceOf[Seq[AttributeReference]]
+ val rightRequiredSchema = fromAttributes(normalizedRightProjections)
+
+ val normalizedCondition = condition.map { e =>
+ DataSourceStrategy.normalizeExprs(
+ Seq(e),
+ leftHolder.output ++ rightHolder.output
+ ).head
+ }
+
+ val conditionWithJoinColumns = normalizedCondition.map { cond =>
+ cond.transformUp {
+ case a: AttributeReference =>
+ val isInLeftSide = leftProjections.filter(_.exprId ==
a.exprId).nonEmpty
+ JoinColumnReference(a, isInLeftSide)
+ }
+ }
+
+ val translatedCondition =
+
conditionWithJoinColumns.flatMap(DataSourceV2Strategy.translateFilterV2(_))
+ val translatedJoinType = DataSourceStrategy.translateJoinType(joinType)
+
+ if (translatedCondition.isDefined == condition.isDefined &&
+ translatedJoinType.isDefined &&
+ lBuilder.pushJoin(
+ rBuilder,
+ translatedJoinType.get,
+ translatedCondition.toJava,
+ leftRequiredSchema,
+ rightRequiredSchema
+ )) {
+ leftHolder.joinedRelations = leftHolder.joinedRelations :+
rightHolder.relation
+
+ val newSchema = leftHolder.builder.build().readSchema()
Review Comment:
let's not build the scan too early here, or call it an extra time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]