PetarVasiljevic-DB commented on code in PR #50921:
URL: https://github.com/apache/spark/pull/50921#discussion_r2185005748


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -98,6 +108,117 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
       filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
   }
 
+  def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    // Join can be attempted to be pushed down only if left and right side of 
join are
+    // compatible (same data source, for example). Also, another requirement 
is that if
+    // there are projections between Join and ScanBuilderHolder, these 
projections need to be
+    // AttributeReferences. We could probably support Alias as well, but this 
should be on
+    // TODO list.
+    // Alias can exist between Join and sHolder node because the query below 
is not valid:
+    // SELECT * FROM
+    // (SELECT * FROM tbl t1 JOIN tbl2 t2) p
+    // JOIN
+    // (SELECT * FROM tbl t3 JOIN tbl3 t4) q
+    // ON p.t1.col = q.t3.col (this is not possible)
+    // It's because there are 2 same tables in both sides of top level join 
and it not possible
+    // to fully qualified the column names in condition. Therefore, query 
should be rewritten so
+    // that each of the outputs of child joins are aliased, so there would be 
a projection
+    // with aliases between top level join and scanBuilderHolder (that has 
pushed child joins).
+    case node @ Join(
+      PhysicalOperation(
+        leftProjections,
+        Nil,
+        leftHolder @ ScanBuilderHolder(_, _, lBuilder: SupportsPushDownJoin)
+      ),
+      PhysicalOperation(
+        rightProjections,
+        Nil,
+        rightHolder @ ScanBuilderHolder(_, _, rBuilder: SupportsPushDownJoin)
+      ),
+      joinType,
+      condition,
+    _) if conf.dataSourceV2JoinPushdown &&
+      // We do not support pushing down anything besides AttributeReference.
+      leftProjections.forall(_.isInstanceOf[AttributeReference]) &&

Review Comment:
   From code:
   ```
   object PhysicalOperation extends OperationHelper {
     // Returns: (the final project list, filters to push down, relation)
     type ReturnType = (Seq[NamedExpression], Seq[Expression], LogicalPlan)
    ```
   
   therefore projections should never be `ExtractValue`. Whole DSv2 relies on 
`PhysicalOperation` so if we plan to support `ExtractValue` it not scoped down 
to join only but other rules as well.
   
   For now, I don't plan to support it.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -98,6 +108,117 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
       filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
   }
 
+  def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    // Join can be attempted to be pushed down only if left and right side of 
join are
+    // compatible (same data source, for example). Also, another requirement 
is that if
+    // there are projections between Join and ScanBuilderHolder, these 
projections need to be
+    // AttributeReferences. We could probably support Alias as well, but this 
should be on
+    // TODO list.
+    // Alias can exist between Join and sHolder node because the query below 
is not valid:
+    // SELECT * FROM
+    // (SELECT * FROM tbl t1 JOIN tbl2 t2) p
+    // JOIN
+    // (SELECT * FROM tbl t3 JOIN tbl3 t4) q
+    // ON p.t1.col = q.t3.col (this is not possible)
+    // It's because there are 2 same tables in both sides of top level join 
and it not possible
+    // to fully qualified the column names in condition. Therefore, query 
should be rewritten so
+    // that each of the outputs of child joins are aliased, so there would be 
a projection
+    // with aliases between top level join and scanBuilderHolder (that has 
pushed child joins).
+    case node @ Join(
+      PhysicalOperation(
+        leftProjections,
+        Nil,
+        leftHolder @ ScanBuilderHolder(_, _, lBuilder: SupportsPushDownJoin)
+      ),
+      PhysicalOperation(
+        rightProjections,
+        Nil,
+        rightHolder @ ScanBuilderHolder(_, _, rBuilder: SupportsPushDownJoin)
+      ),
+      joinType,
+      condition,
+    _) if conf.dataSourceV2JoinPushdown &&
+      // We do not support pushing down anything besides AttributeReference.
+      leftProjections.forall(_.isInstanceOf[AttributeReference]) &&
+      rightProjections.forall(_.isInstanceOf[AttributeReference]) &&
+      // Cross joins are not supported because they increase the amount of 
data.
+      condition.isDefined &&
+      // Joins on top of sampled tables are not supported
+      leftHolder.pushedSample.isEmpty &&
+      rightHolder.pushedSample.isEmpty &&

Review Comment:
   synced offline. Removed it completely as other pushdowns don't have this 
check either.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -98,6 +108,117 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
       filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
   }
 
+  def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    // Join can be attempted to be pushed down only if left and right side of 
join are
+    // compatible (same data source, for example). Also, another requirement 
is that if
+    // there are projections between Join and ScanBuilderHolder, these 
projections need to be
+    // AttributeReferences. We could probably support Alias as well, but this 
should be on
+    // TODO list.
+    // Alias can exist between Join and sHolder node because the query below 
is not valid:
+    // SELECT * FROM
+    // (SELECT * FROM tbl t1 JOIN tbl2 t2) p
+    // JOIN
+    // (SELECT * FROM tbl t3 JOIN tbl3 t4) q
+    // ON p.t1.col = q.t3.col (this is not possible)
+    // It's because there are 2 same tables in both sides of top level join 
and it not possible
+    // to fully qualified the column names in condition. Therefore, query 
should be rewritten so
+    // that each of the outputs of child joins are aliased, so there would be 
a projection
+    // with aliases between top level join and scanBuilderHolder (that has 
pushed child joins).
+    case node @ Join(
+      PhysicalOperation(
+        leftProjections,
+        Nil,
+        leftHolder @ ScanBuilderHolder(_, _, lBuilder: SupportsPushDownJoin)
+      ),
+      PhysicalOperation(
+        rightProjections,
+        Nil,
+        rightHolder @ ScanBuilderHolder(_, _, rBuilder: SupportsPushDownJoin)
+      ),
+      joinType,
+      condition,
+    _) if conf.dataSourceV2JoinPushdown &&
+      // We do not support pushing down anything besides AttributeReference.
+      leftProjections.forall(_.isInstanceOf[AttributeReference]) &&
+      rightProjections.forall(_.isInstanceOf[AttributeReference]) &&
+      // Cross joins are not supported because they increase the amount of 
data.
+      condition.isDefined &&
+      // Joins on top of sampled tables are not supported
+      leftHolder.pushedSample.isEmpty &&
+      rightHolder.pushedSample.isEmpty &&
+      lBuilder.isOtherSideCompatibleForJoin(rBuilder) =>
+
+      // projections' names are maybe not up to date if the joins has 
previously been pushed down.
+      // For this reason, we need to use pushedJoinOutputMap to get up to date 
names.
+      //
+      // Normalized projections are then converted to StructType.
+      def getRequiredSchema(
+          projections: Seq[NamedExpression],
+          sHolder: ScanBuilderHolder): StructType = {
+        val normalizedProjections = DataSourceStrategy.normalizeExprs(
+          projections,
+          sHolder.output.map { a =>
+            sHolder.pushedJoinOutputMap.getOrElse(a, 
a).asInstanceOf[AttributeReference]
+          }
+        ).asInstanceOf[Seq[AttributeReference]]
+
+        fromAttributes(normalizedProjections)
+      }
+
+      val leftSideRequiredSchema = getRequiredSchema(leftProjections, 
leftHolder)
+      val rightSideRequiredSchema = getRequiredSchema(rightProjections, 
rightHolder)
+
+      val joinSchema =
+        lBuilder.getJoinedSchema(rBuilder, leftSideRequiredSchema, 
rightSideRequiredSchema)
+
+      assert(joinSchema.length == node.output.length,
+        "The data source returns unexpected number of columns")
+      assert(joinSchema.fields.zip(node.output).forall { case (a1, a2) =>
+        a1.dataType.sameType(a2.dataType)
+      }, "The data source returned column with different data type")
+
+      var pushedJoinOutputMap = AttributeMap.empty[Expression]
+      node.output.asInstanceOf[Seq[AttributeReference]]
+        .zip(joinSchema.fields)
+        .map { case (attr, schemaField) =>
+          pushedJoinOutputMap = pushedJoinOutputMap + (attr, 
attr.withName(schemaField.name))
+        }
+
+      // Reuse the previously calculated map to update the condition with 
attributes
+      // with up-to-date names
+      val normalizedCondition = condition.map { e =>
+        DataSourceStrategy.normalizeExprs(
+          Seq(e),
+          node.output.map { a =>
+            pushedJoinOutputMap.getOrElse(a, 
a).asInstanceOf[AttributeReference]
+          }
+        ).head
+      }
+
+      val translatedCondition =
+        normalizedCondition.flatMap(DataSourceV2Strategy.translateFilterV2(_))
+      val translatedJoinType = DataSourceStrategy.translateJoinType(joinType)
+
+      if (translatedJoinType.isDefined &&
+        translatedCondition.isDefined &&
+        lBuilder.pushDownJoin(
+          rBuilder,
+          joinSchema,
+          translatedJoinType.get,
+          translatedCondition.get
+        )) {
+        leftHolder.joinedRelations = leftHolder.joinedRelations ++ 
rightHolder.joinedRelations
+        leftHolder.pushedPredicates = leftHolder.pushedPredicates ++ 
rightHolder.pushedPredicates

Review Comment:
   Sure



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownJoin.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import java.util.Optional;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
+import org.apache.spark.sql.connector.join.JoinType;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A mix-in interface for {@link ScanBuilder}. Data sources can implement this 
interface to
+ * push down join operators.
+ *
+ * @since 4.1.0
+ */
+@Evolving
+public interface SupportsPushDownJoin extends ScanBuilder {
+  /**
+   * Returns true if the other side of the join is compatible with the
+   * current {@code SupportsPushDownJoin} for a join push down, meaning both 
sides can be
+   * processed together within the same underlying data source.
+   *
+   * <p>For example, JDBC connectors are compatible if they use the same
+   * host, port, username, and password.</p>
+   */
+  boolean isOtherSideCompatibleForJoin(SupportsPushDownJoin other);
+
+  /**
+   * Joining 2 {@code SupportsPushDownJoin} can be problematic if there are 
columns with duplicate
+   * names. The {@code SupportsPushDownJoin} implementation should deal with 
this problem.
+   *
+   * This method returns the merged schema that will be preserved to {@code 
SupportsPushDownJoin}'s
+   * schema after {@code pushDownJoin} call succeeds.
+   *
+   * @param other {@code SupportsPushDownJoin} that this {@code 
SupportsPushDownJoin}
+   * gets joined with.
+   * @param requiredSchema required columns needed for this {@code 
SupportsPushDownJoin}.
+   * @param otherSideRequiredSchema required columns needed for other {@code 
SupportsPushDownJoin}.
+   * @return merged schema. If ambiguous names are forbidden, the merged 
schema should resolve that.
+   */
+  StructType getJoinedSchema(
+    SupportsPushDownJoin other,
+    StructType requiredSchema,
+    StructType otherSideRequiredSchema
+  );
+
+  /**
+   * Pushes down the join of the current {@code SupportsPushDownJoin} and the 
other side of join
+   * {@code SupportsPushDownJoin}.
+   *
+   * @param other {@code SupportsPushDownJoin} that this {@code 
SupportsPushDownJoin}
+   * gets joined with.
+   * @param requiredSchema required output schema after join push down.
+   * @param joinType the type of join.
+   * @param condition join condition.
+   * @return True if join has been successfully pushed down.
+   */
+  boolean pushDownJoin(
+    SupportsPushDownJoin other,
+    StructType requiredSchema,

Review Comment:
   Yes. I have responded to other comment regarding the caching of it.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala:
##########
@@ -121,6 +123,103 @@ case class JDBCScanBuilder(
     }
   }
 
+  override def isOtherSideCompatibleForJoin(other: SupportsPushDownJoin): 
Boolean = {
+    other.isInstanceOf[JDBCScanBuilder] &&
+      jdbcOptions.url == other.asInstanceOf[JDBCScanBuilder].jdbcOptions.url
+  };
+
+  // When getJoinedSchema is called, schema shouldn't be pruned yet because 
pushDownJoin API call
+  // can fail. For this reason, we are temporarily holding pruned schema in 
new variable that is
+  // later used in pushDownJoin when crafting the SQL query.
+  var aboutToBePrunedSchema: StructType = finalSchema
+
+  override def getJoinedSchema(
+    other: SupportsPushDownJoin,
+    requiredSchema: StructType,
+    otherSideRequiredSchema: StructType): StructType = {
+    aboutToBePrunedSchema = requiredSchema
+    other.asInstanceOf[JDBCScanBuilder].aboutToBePrunedSchema = 
otherSideRequiredSchema
+
+    var joinedSchema = StructType(Seq())
+
+    (requiredSchema.fields ++ otherSideRequiredSchema.fields)
+      .zipWithIndex
+      .foreach { case (field, idx) =>
+        val newFieldName = JoinOutputAliasIterator.generateColumnAlias
+        joinedSchema =
+          joinedSchema.add(newFieldName, field.dataType, field.nullable, 
field.metadata)
+      }
+
+    joinedSchema
+  }
+
+  override def pushDownJoin(
+    other: SupportsPushDownJoin,
+    requiredSchema: StructType,

Review Comment:
   Yes, but I kind of like it this way more. We already have 
`aboutToBePrunedSchema` that can bring a bit of confusion because the schema is 
not pruned yet, but will be if join pushdown succeeds.
   
   With caching the `requiredSchema` we would have a similar field. I find it 
this way cleaner, but if you think we should go with caching it in the scan 
builder, I can do it.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala:
##########
@@ -121,6 +123,103 @@ case class JDBCScanBuilder(
     }
   }
 
+  override def isOtherSideCompatibleForJoin(other: SupportsPushDownJoin): 
Boolean = {
+    other.isInstanceOf[JDBCScanBuilder] &&
+      jdbcOptions.url == other.asInstanceOf[JDBCScanBuilder].jdbcOptions.url
+  };
+
+  // When getJoinedSchema is called, schema shouldn't be pruned yet because 
pushDownJoin API call
+  // can fail. For this reason, we are temporarily holding pruned schema in 
new variable that is
+  // later used in pushDownJoin when crafting the SQL query.
+  var aboutToBePrunedSchema: StructType = finalSchema
+
+  override def getJoinedSchema(
+    other: SupportsPushDownJoin,
+    requiredSchema: StructType,
+    otherSideRequiredSchema: StructType): StructType = {
+    aboutToBePrunedSchema = requiredSchema
+    other.asInstanceOf[JDBCScanBuilder].aboutToBePrunedSchema = 
otherSideRequiredSchema
+
+    var joinedSchema = StructType(Seq())
+
+    (requiredSchema.fields ++ otherSideRequiredSchema.fields)
+      .zipWithIndex
+      .foreach { case (field, idx) =>
+        val newFieldName = JoinOutputAliasIterator.generateColumnAlias
+        joinedSchema =
+          joinedSchema.add(newFieldName, field.dataType, field.nullable, 
field.metadata)
+      }
+
+    joinedSchema
+  }
+
+  override def pushDownJoin(
+    other: SupportsPushDownJoin,
+    requiredSchema: StructType,
+    joinType: JoinType,
+    condition: Predicate): Boolean = {
+    if (!jdbcOptions.pushDownJoin || !dialect.supportsJoin) return false
+    val otherJdbcScanBuilder = other.asInstanceOf[JDBCScanBuilder]
+
+    val requiredOutput = 
requiredSchema.fields.take(aboutToBePrunedSchema.length).map(_.name)
+    val otherSideRequiredOutput =
+      requiredSchema.fields.drop(aboutToBePrunedSchema.length).map(_.name)
+
+    val sqlQuery = buildSQLQueryUsedInJoinPushDown(requiredOutput)
+    val otherSideSqlQuery = otherJdbcScanBuilder
+      .buildSQLQueryUsedInJoinPushDown(otherSideRequiredOutput)
+
+    val joinOutputColumnsString =
+      requiredSchema.fields.map(f => 
dialect.quoteIdentifier(f.name)).mkString(",")
+
+    val joinTypeString = joinType match {
+      case JoinType.INNER_JOIN => "INNER JOIN"
+      case _ => ""
+    }
+
+    if (joinTypeString.isEmpty) return false
+
+    val compiledCondition = dialect.compileExpression(condition)
+    if (!compiledCondition.isDefined) return false
+
+    val conditionString = compiledCondition.get
+
+    val joinQuery = s"""
+       |SELECT $joinOutputColumnsString FROM
+       |($sqlQuery) ${JoinOutputAliasIterator.generateSubqueryQualifier}

Review Comment:
   subquery alias doesn't matter but it has to be there for most of the 
dialects. We can use `t1`, `t2` as well.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -441,8 +599,18 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
       } else {
         aliasReplacedOrder.asInstanceOf[Seq[SortOrder]]
       }
-      val normalizedOrders = DataSourceStrategy.normalizeExprs(
-        newOrder, sHolder.relation.output).asInstanceOf[Seq[SortOrder]]
+      val normalizedOrders = if (sHolder.joinedRelations.length == 1) {

Review Comment:
   done



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala:
##########
@@ -121,6 +123,103 @@ case class JDBCScanBuilder(
     }
   }
 
+  override def isOtherSideCompatibleForJoin(other: SupportsPushDownJoin): 
Boolean = {
+    other.isInstanceOf[JDBCScanBuilder] &&
+      jdbcOptions.url == other.asInstanceOf[JDBCScanBuilder].jdbcOptions.url
+  };
+
+  // When getJoinedSchema is called, schema shouldn't be pruned yet because 
pushDownJoin API call
+  // can fail. For this reason, we are temporarily holding pruned schema in 
new variable that is
+  // later used in pushDownJoin when crafting the SQL query.
+  var aboutToBePrunedSchema: StructType = finalSchema
+
+  override def getJoinedSchema(
+    other: SupportsPushDownJoin,
+    requiredSchema: StructType,
+    otherSideRequiredSchema: StructType): StructType = {
+    aboutToBePrunedSchema = requiredSchema
+    other.asInstanceOf[JDBCScanBuilder].aboutToBePrunedSchema = 
otherSideRequiredSchema
+
+    var joinedSchema = StructType(Seq())
+
+    (requiredSchema.fields ++ otherSideRequiredSchema.fields)
+      .zipWithIndex
+      .foreach { case (field, idx) =>
+        val newFieldName = JoinOutputAliasIterator.generateColumnAlias

Review Comment:
   Sure, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to