PetarVasiljevic-DB commented on code in PR #50921:
URL: https://github.com/apache/spark/pull/50921#discussion_r2177612709
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -61,41 +63,232 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan]
with PredicateHelper {
ScanBuilderHolder(r.output, r,
r.table.asReadable.newScanBuilder(r.options))
}
- private def pushDownFilters(plan: LogicalPlan) = plan.transform {
+ private def pushDownFilters(
+ filters: Seq[Expression],
+ sHolder: ScanBuilderHolder): LogicalPlan = {
+ val normalizedFilters =
+ DataSourceStrategy.normalizeExprs(filters, sHolder.relation.output)
+
+ val (normalizedFiltersWithSubquery, normalizedFiltersWithoutSubquery) =
+ normalizedFilters.partition(SubqueryExpression.hasSubquery)
+
+ // `pushedFilters` will be pushed down and evaluated in the underlying
data sources.
+ // `postScanFilters` need to be evaluated after the scan.
+ // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row
group filter.
+ val (pushedFilters, postScanFiltersWithoutSubquery) =
PushDownUtils.pushFilters(
+ sHolder.builder, normalizedFiltersWithoutSubquery)
+ val pushedFiltersStr = if (pushedFilters.isLeft) {
+ pushedFilters.swap
+ .getOrElse(throw new NoSuchElementException("The left node doesn't
have pushedFilters"))
+ .mkString(", ")
+ } else {
+ sHolder.pushedPredicates = pushedFilters
+ .getOrElse(throw new NoSuchElementException("The right node doesn't
have pushedFilters"))
+ sHolder.pushedPredicates.mkString(", ")
+ }
+
+ val postScanFilters = postScanFiltersWithoutSubquery ++
normalizedFiltersWithSubquery
+
+ logInfo(
+ log"""
+ |Pushing operators to ${MDC(RELATION_NAME, sHolder.relation.name)}
+ |Pushed Filters: ${MDC(PUSHED_FILTERS, pushedFiltersStr)}
+ |Post-Scan Filters: ${MDC(POST_SCAN_FILTERS,
postScanFilters.mkString(","))}
+ """.stripMargin)
+
+ val filterCondition = postScanFilters.reduceLeftOption(And)
+ filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
+ }
+
+ private def pushDownFilters(plan: LogicalPlan): LogicalPlan = plan.transform
{
// update the scan builder with filter push down and return a new plan
with filter pushed
case Filter(condition, sHolder: ScanBuilderHolder) =>
val filters = splitConjunctivePredicates(condition)
- val normalizedFilters =
- DataSourceStrategy.normalizeExprs(filters, sHolder.relation.output)
- val (normalizedFiltersWithSubquery, normalizedFiltersWithoutSubquery) =
- normalizedFilters.partition(SubqueryExpression.hasSubquery)
-
- // `pushedFilters` will be pushed down and evaluated in the underlying
data sources.
- // `postScanFilters` need to be evaluated after the scan.
- // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet
row group filter.
- val (pushedFilters, postScanFiltersWithoutSubquery) =
PushDownUtils.pushFilters(
- sHolder.builder, normalizedFiltersWithoutSubquery)
- val pushedFiltersStr = if (pushedFilters.isLeft) {
- pushedFilters.swap
- .getOrElse(throw new NoSuchElementException("The left node doesn't
have pushedFilters"))
- .mkString(", ")
+ pushDownFilters(filters, sHolder)
+ }
+
+ def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+ // Join can be attempted to be pushed down only if left and right side of
join are
+ // compatible (same data source, for example). Also, another requirement
is that if
+ // there are projections between Join and ScanBuilderHolder, these
projections need to be
+ // AttributeReferences. We could probably support Alias as well, but this
should be on
+ // TODO list.
+ // Alias can exist between Join and sHolder node because the query below
is not valid:
+ // SELECT * FROM
+ // (SELECT * FROM tbl t1 JOIN tbl2 t2) p
+ // JOIN
+ // (SELECT * FROM tbl t3 JOIN tbl3 t4) q
+ // ON p.t1.col = q.t3.col (this is not possible)
+ // It's because there are 2 same tables in both sides of top level join
and it's not possible
+ // to use fully qualified the column names in condition. Therefore, query
should be rewritten so
+ // that each of the outputs of child joins are aliased, so there would be
a projection
+ // with aliases between top level join and scanBuilderHolder (that has
pushed child joins).
+ case node @ Join(
+ PhysicalOperation(
+ leftProjections,
+ leftFilters,
+ leftHolder @ ScanBuilderHolder(_, _, lBuilder: SupportsPushDownJoin)
+ ),
+ PhysicalOperation(
+ rightProjections,
+ rightFilters,
+ rightHolder @ ScanBuilderHolder(_, _, rBuilder: SupportsPushDownJoin)
+ ),
+ joinType,
+ condition,
+ _) if conf.dataSourceV2JoinPushdown &&
+ leftProjections.forall(_.isInstanceOf[AttributeReference]) &&
+ rightProjections.forall(_.isInstanceOf[AttributeReference]) &&
+ condition.isDefined && // Cross joins aren't pushed down as they
increase the amount of data
+ // We don't support joining the sampled tables
+ leftHolder.pushedSample.isEmpty &&
+ rightHolder.pushedSample.isEmpty &&
+ // Only left-like star schema joins are supported for now
+ rightHolder.joinedRelations.isEmpty &&
+ lBuilder.isOtherSideCompatibleForJoin(rBuilder) =>
+
+ val normalizedCondition = condition.map { e =>
+ DataSourceStrategy.normalizeExprs(
+ Seq(e),
+ leftHolder.output ++ rightHolder.output
+ ).head
+ }
+
+ // We can get qualifier from the join condition. We can't get it from
node.output because
+ // there won't be a qualifier. Qualifier exists in Projection nodes on
top of the plan.
+ var leftSideQualifier: Seq[String] = Seq()
+ var rightSideQualifier: Seq[String] = Seq()
+
+ // There are 2 cases for calculating the qualifiers:
+ // 1. Left and right children are original scan nodes and don't contain
pushed join
+ // information. In this case, we need to qualify both left and right
side filters.
+ // 2. Left side contains pushed join. In this case, we don't need to
qualify the left
+ // side as all of the relations in it are already qualified (or not if
there was no need).
+ // The right side still has to be qualified.
+ val leftSideContainsSingleRelation = leftHolder.joinedRelations.isEmpty
+
+ val conditionWithJoinColumns = normalizedCondition.map { cond =>
+ cond.transformUp {
+ case a: AttributeReference =>
+ val isInLeftSide = leftProjections.filter(_.exprId ==
a.exprId).nonEmpty
+ if (leftSideContainsSingleRelation && isInLeftSide &&
leftSideQualifier.isEmpty) {
+ leftSideQualifier = a.qualifier
+ } else if (!isInLeftSide && rightSideQualifier.isEmpty) {
+ rightSideQualifier = a.qualifier
+ }
+
+ // AttributeReference will already have the qualifier if specified
in query
+ JoinColumnReference(a)
Review Comment:
Eh, I think we can't really do that, I can give one reason, there might be
multiple ones that I am not aware of (or haven't hit them while testing).
For the Spak SQL query `SELECT min(a.id) from tbl a`, the aggregate
expression would have a qualifier `a` and in JDBCScanBuilder we would generate
a SQL query `SELECT min(a.id) from tbl` so `a.id` would not be correct since
there is no qualifier `a` assigned to the table.
We would need to adjust the aggregate pushdown in JDBCScanBuilder as well to
make it work, which I find not really good from risk point of view.
I have another approach in mind, let me know what you think: We can add
metadata that will tell that the `AttributeReference` is used for join purpose.
This way we would use `NamedExpression`, we could allow multipart identifier
compilation in `visitNamedReference` in `JdbcDialects`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]