urosstan-db commented on code in PR #50921:
URL: https://github.com/apache/spark/pull/50921#discussion_r2192348720


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushedDownOperators.scala:
##########
@@ -30,6 +30,7 @@ case class PushedDownOperators(
     limit: Option[Int],
     offset: Option[Int],
     sortValues: Seq[SortOrder],
-    pushedPredicates: Seq[Predicate]) {
+    pushedPredicates: Seq[Predicate],
+    joinedRelations: Seq[String] = Seq()) {

Review Comment:
   Can we avoid making default value here, semantic of this field is not 
different than other non default values.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -98,6 +100,132 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
       filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
   }
 
+  def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    // Join can be attempted to be pushed down only if left and right side of 
join are
+    // compatible (same data source, for example). Also, another requirement 
is that if
+    // there are projections between Join and ScanBuilderHolder, these 
projections need to be
+    // AttributeReferences. We could probably support Alias as well, but this 
should be on
+    // TODO list.
+    // Alias can exist between Join and sHolder node because the query below 
is not valid:
+    // SELECT * FROM
+    // (SELECT * FROM tbl t1 JOIN tbl2 t2) p
+    // JOIN
+    // (SELECT * FROM tbl t3 JOIN tbl3 t4) q
+    // ON p.t1.col = q.t3.col (this is not possible)
+    // It's because there are 2 same tables in both sides of top level join 
and it not possible
+    // to fully qualified the column names in condition. Therefore, query 
should be rewritten so
+    // that each of the outputs of child joins are aliased, so there would be 
a projection
+    // with aliases between top level join and scanBuilderHolder (that has 
pushed child joins).
+    case node @ Join(
+      PhysicalOperation(
+        leftProjections,
+        Nil,
+        leftHolder @ ScanBuilderHolder(_, _, lBuilder: SupportsPushDownJoin)
+      ),
+      PhysicalOperation(
+        rightProjections,
+        Nil,
+        rightHolder @ ScanBuilderHolder(_, _, rBuilder: SupportsPushDownJoin)
+      ),
+      joinType,
+      condition,
+    _) if conf.dataSourceV2JoinPushdown &&
+      // We do not support pushing down anything besides AttributeReference.
+      leftProjections.forall(_.isInstanceOf[AttributeReference]) &&
+      rightProjections.forall(_.isInstanceOf[AttributeReference]) &&
+      // Cross joins are not supported because they increase the amount of 
data.
+      condition.isDefined &&
+      // Joins on top of sampled tables are not supported
+      leftHolder.pushedSample.isEmpty &&
+      rightHolder.pushedSample.isEmpty &&
+      lBuilder.isOtherSideCompatibleForJoin(rBuilder) =>
+
+      // projections' names are maybe not up to date if the joins have been 
previously pushed down.
+      // For this reason, we need to use pushedJoinOutputMap to get up to date 
names.
+      def getRequiredColumnNames(

Review Comment:
   Can we move this method somewhere else, it is hard to read code with inner 
methods. Or at least put it on the end



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -98,6 +100,132 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
       filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
   }
 
+  def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    // Join can be attempted to be pushed down only if left and right side of 
join are
+    // compatible (same data source, for example). Also, another requirement 
is that if
+    // there are projections between Join and ScanBuilderHolder, these 
projections need to be
+    // AttributeReferences. We could probably support Alias as well, but this 
should be on
+    // TODO list.
+    // Alias can exist between Join and sHolder node because the query below 
is not valid:
+    // SELECT * FROM
+    // (SELECT * FROM tbl t1 JOIN tbl2 t2) p
+    // JOIN
+    // (SELECT * FROM tbl t3 JOIN tbl3 t4) q
+    // ON p.t1.col = q.t3.col (this is not possible)
+    // It's because there are 2 same tables in both sides of top level join 
and it not possible
+    // to fully qualified the column names in condition. Therefore, query 
should be rewritten so
+    // that each of the outputs of child joins are aliased, so there would be 
a projection
+    // with aliases between top level join and scanBuilderHolder (that has 
pushed child joins).
+    case node @ Join(
+      PhysicalOperation(
+        leftProjections,
+        Nil,
+        leftHolder @ ScanBuilderHolder(_, _, lBuilder: SupportsPushDownJoin)
+      ),
+      PhysicalOperation(
+        rightProjections,
+        Nil,
+        rightHolder @ ScanBuilderHolder(_, _, rBuilder: SupportsPushDownJoin)
+      ),
+      joinType,
+      condition,
+    _) if conf.dataSourceV2JoinPushdown &&
+      // We do not support pushing down anything besides AttributeReference.
+      leftProjections.forall(_.isInstanceOf[AttributeReference]) &&
+      rightProjections.forall(_.isInstanceOf[AttributeReference]) &&
+      // Cross joins are not supported because they increase the amount of 
data.
+      condition.isDefined &&
+      // Joins on top of sampled tables are not supported
+      leftHolder.pushedSample.isEmpty &&
+      rightHolder.pushedSample.isEmpty &&
+      lBuilder.isOtherSideCompatibleForJoin(rBuilder) =>
+
+      // projections' names are maybe not up to date if the joins have been 
previously pushed down.
+      // For this reason, we need to use pushedJoinOutputMap to get up to date 
names.
+      def getRequiredColumnNames(
+          projections: Seq[NamedExpression],
+          sHolder: ScanBuilderHolder): Array[String] = {
+        val normalizedProjections = DataSourceStrategy.normalizeExprs(
+          projections,
+          sHolder.output.map { a =>
+            sHolder.pushedJoinOutputMap.getOrElse(a, 
a).asInstanceOf[AttributeReference]
+          }
+        ).asInstanceOf[Seq[AttributeReference]]
+
+        normalizedProjections.map(_.name).toArray
+      }
+
+      def generateJoinOutputAlias(name: String): String =
+        s"${name}_${java.util.UUID.randomUUID().toString.replace("-", "_")}"
+
+      val leftSideRequiredColumnNames = 
getRequiredColumnNames(leftProjections, leftHolder)
+      val rightSideRequiredColumnNames = 
getRequiredColumnNames(rightProjections, rightHolder)
+
+      // Alias the duplicated columns from left side of the join.
+      val leftSideRequiredColumnsWithAliases = leftSideRequiredColumnNames.map 
{ name =>
+        val aliasName = if (rightSideRequiredColumnNames.contains(name)) {
+          generateJoinOutputAlias(name)
+        } else {
+          null
+        }
+
+        new SupportsPushDownJoin.ColumnWithAlias(name, aliasName)
+      }
+
+      // Aliasing of duplicated columns in right side of the join is not 
needed because the
+      // the conflicts are resolved by aliasing the left side.
+      val rightSideRequiredColumnsWithAliases = 
rightSideRequiredColumnNames.map { field =>
+        new SupportsPushDownJoin.ColumnWithAlias(field, null)
+      }
+
+      // Create the AttributeMap that holds (Attribute -> Attribute with up to 
date name) mapping.
+      val pushedJoinOutputMap = AttributeMap[Expression](

Review Comment:
   This is map[String, String]? Can you add explicit type since there are many 
chained methods?
   ```suggestion
         val pushedJoinOutputMap: Map[String, String] = 
AttributeMap[Expression](
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -573,6 +736,10 @@ case class ScanBuilderHolder(
   var pushedAggregate: Option[Aggregation] = None
 
   var pushedAggOutputMap: AttributeMap[Expression] = 
AttributeMap.empty[Expression]
+
+  var joinedRelations: Seq[DataSourceV2RelationBase] = Seq(relation)

Review Comment:
   We loose some info here, it would be good to store joined scanBuilderHolders.
   That allows us to have more info about joined relations (like what are 
pushdowns on joined relations).



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala:
##########
@@ -121,6 +123,114 @@ case class JDBCScanBuilder(
     }
   }
 
+  override def isOtherSideCompatibleForJoin(other: SupportsPushDownJoin): 
Boolean = {
+    if (!jdbcOptions.pushDownJoin || !dialect.supportsJoin) return false
+
+    other.isInstanceOf[JDBCScanBuilder] &&
+      jdbcOptions.url == other.asInstanceOf[JDBCScanBuilder].jdbcOptions.url
+  };
+
+
+  /**
+   * Helper method to calculate StructType based on the 
SupportsPushDownJoin.ColumnWithAlias and
+   * the given schema.
+   *
+   * If ColumnWithAlias object has defined alias, new field with new name 
being equal to alias
+   * should be returned. Otherwise, original field is returned.
+   */
+  private def calculateJoinOutputSchema(
+      columnsWithAliases: Array[SupportsPushDownJoin.ColumnWithAlias],
+      schema: StructType): StructType = {
+    var newSchema = StructType(Seq())
+    columnsWithAliases.foreach { columnWithAlias =>
+      val colName = columnWithAlias.getColName
+      val alias = columnWithAlias.getAlias
+      val field = schema(colName)
+
+      val newName = if (alias == null) colName else alias
+      newSchema = newSchema.add(newName, field.dataType, field.nullable, 
field.metadata)
+    }
+
+    newSchema
+  }
+
+  override def pushDownJoin(
+      other: SupportsPushDownJoin,
+      joinType: JoinType,
+      leftSideRequiredColumnWithAliases: 
Array[SupportsPushDownJoin.ColumnWithAlias],
+      rightSideRequiredColumnWithAliases: 
Array[SupportsPushDownJoin.ColumnWithAlias],
+      condition: Predicate ): Boolean = {
+    if (!jdbcOptions.pushDownJoin || !dialect.supportsJoin) return false
+    val otherJdbcScanBuilder = other.asInstanceOf[JDBCScanBuilder]
+
+    // Get left side and right side of join sql queries. These will be used as 
subqueries in final
+    // join query.
+    val sqlQuery = 
buildSQLQueryUsedInJoinPushDown(leftSideRequiredColumnWithAliases)
+    val otherSideSqlQuery = otherJdbcScanBuilder
+      .buildSQLQueryUsedInJoinPushDown(rightSideRequiredColumnWithAliases)
+
+    // requiredSchema will become the finalSchema of this JDBCScanBuilder
+    var requiredSchema = StructType(Seq())
+    requiredSchema = 
calculateJoinOutputSchema(leftSideRequiredColumnWithAliases, finalSchema)
+    requiredSchema = requiredSchema.merge(
+      calculateJoinOutputSchema(
+        rightSideRequiredColumnWithAliases,
+        otherJdbcScanBuilder.finalSchema
+      )
+    )
+
+    val joinOutputColumnsString =
+      requiredSchema.fields.map(f => 
dialect.quoteIdentifier(f.name)).mkString(",")
+
+    val joinTypeStringOption = joinType match {
+      case JoinType.INNER_JOIN => Some("INNER JOIN")
+      case _ => None
+    }
+
+    if (!joinTypeStringOption.isDefined) return false

Review Comment:
   We have a lot of exit points here. Can you move some of these on the 
beginning? I would shift compilation of join type and condition before output 
schema calculation



##########
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##########
@@ -265,6 +266,288 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
     super.afterAll()
   }
 
+  test("Test 2-way join without condition - no join pushdown") {

Review Comment:
   Name of test is not intuitive enough



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala:
##########
@@ -121,6 +123,112 @@ case class JDBCScanBuilder(
     }
   }
 
+  override def isOtherSideCompatibleForJoin(other: SupportsPushDownJoin): 
Boolean = {
+    other.isInstanceOf[JDBCScanBuilder] &&
+      jdbcOptions.url == other.asInstanceOf[JDBCScanBuilder].jdbcOptions.url

Review Comment:
   This does not seems like enough check to make pushdown possible.
   We should check whether credentials are same as well (different users may 
have different privileges on the same database instance). Also, we might have 
problem if users do full qualification of `dbtable` (with database which is 
possible on many databases), and database does not support cross database 
joins. Can you check whether that scenario is possible.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -98,6 +100,132 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
       filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
   }
 
+  def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    // Join can be attempted to be pushed down only if left and right side of 
join are
+    // compatible (same data source, for example). Also, another requirement 
is that if
+    // there are projections between Join and ScanBuilderHolder, these 
projections need to be
+    // AttributeReferences. We could probably support Alias as well, but this 
should be on
+    // TODO list.
+    // Alias can exist between Join and sHolder node because the query below 
is not valid:
+    // SELECT * FROM
+    // (SELECT * FROM tbl t1 JOIN tbl2 t2) p
+    // JOIN
+    // (SELECT * FROM tbl t3 JOIN tbl3 t4) q
+    // ON p.t1.col = q.t3.col (this is not possible)
+    // It's because there are 2 same tables in both sides of top level join 
and it not possible

Review Comment:
   This would make problem with same column name on different tables if remote 
databases does not include table name in query output (only column names)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -98,6 +100,132 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
       filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
   }
 
+  def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    // Join can be attempted to be pushed down only if left and right side of 
join are
+    // compatible (same data source, for example). Also, another requirement 
is that if
+    // there are projections between Join and ScanBuilderHolder, these 
projections need to be
+    // AttributeReferences. We could probably support Alias as well, but this 
should be on
+    // TODO list.
+    // Alias can exist between Join and sHolder node because the query below 
is not valid:
+    // SELECT * FROM
+    // (SELECT * FROM tbl t1 JOIN tbl2 t2) p
+    // JOIN
+    // (SELECT * FROM tbl t3 JOIN tbl3 t4) q
+    // ON p.t1.col = q.t3.col (this is not possible)

Review Comment:
   This depends on remote data sources? Some databases would have different 
output?
   Something like this?
   ```
       // ON p.col = q.col (this is not possible)
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -98,6 +100,132 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
       filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
   }
 
+  def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    // Join can be attempted to be pushed down only if left and right side of 
join are
+    // compatible (same data source, for example). Also, another requirement 
is that if
+    // there are projections between Join and ScanBuilderHolder, these 
projections need to be
+    // AttributeReferences. We could probably support Alias as well, but this 
should be on
+    // TODO list.
+    // Alias can exist between Join and sHolder node because the query below 
is not valid:
+    // SELECT * FROM
+    // (SELECT * FROM tbl t1 JOIN tbl2 t2) p
+    // JOIN
+    // (SELECT * FROM tbl t3 JOIN tbl3 t4) q
+    // ON p.t1.col = q.t3.col (this is not possible)
+    // It's because there are 2 same tables in both sides of top level join 
and it not possible
+    // to fully qualified the column names in condition. Therefore, query 
should be rewritten so
+    // that each of the outputs of child joins are aliased, so there would be 
a projection
+    // with aliases between top level join and scanBuilderHolder (that has 
pushed child joins).
+    case node @ Join(
+      PhysicalOperation(
+        leftProjections,
+        Nil,
+        leftHolder @ ScanBuilderHolder(_, _, lBuilder: SupportsPushDownJoin)
+      ),
+      PhysicalOperation(
+        rightProjections,
+        Nil,
+        rightHolder @ ScanBuilderHolder(_, _, rBuilder: SupportsPushDownJoin)
+      ),
+      joinType,
+      condition,
+    _) if conf.dataSourceV2JoinPushdown &&
+      // We do not support pushing down anything besides AttributeReference.
+      leftProjections.forall(_.isInstanceOf[AttributeReference]) &&
+      rightProjections.forall(_.isInstanceOf[AttributeReference]) &&
+      // Cross joins are not supported because they increase the amount of 
data.
+      condition.isDefined &&
+      // Joins on top of sampled tables are not supported
+      leftHolder.pushedSample.isEmpty &&
+      rightHolder.pushedSample.isEmpty &&
+      lBuilder.isOtherSideCompatibleForJoin(rBuilder) =>
+
+      // projections' names are maybe not up to date if the joins have been 
previously pushed down.
+      // For this reason, we need to use pushedJoinOutputMap to get up to date 
names.
+      def getRequiredColumnNames(
+          projections: Seq[NamedExpression],
+          sHolder: ScanBuilderHolder): Array[String] = {
+        val normalizedProjections = DataSourceStrategy.normalizeExprs(
+          projections,
+          sHolder.output.map { a =>
+            sHolder.pushedJoinOutputMap.getOrElse(a, 
a).asInstanceOf[AttributeReference]
+          }
+        ).asInstanceOf[Seq[AttributeReference]]
+
+        normalizedProjections.map(_.name).toArray
+      }
+
+      def generateJoinOutputAlias(name: String): String =
+        s"${name}_${java.util.UUID.randomUUID().toString.replace("-", "_")}"
+
+      val leftSideRequiredColumnNames = 
getRequiredColumnNames(leftProjections, leftHolder)
+      val rightSideRequiredColumnNames = 
getRequiredColumnNames(rightProjections, rightHolder)
+
+      // Alias the duplicated columns from left side of the join.
+      val leftSideRequiredColumnsWithAliases = leftSideRequiredColumnNames.map 
{ name =>
+        val aliasName = if (rightSideRequiredColumnNames.contains(name)) {
+          generateJoinOutputAlias(name)
+        } else {
+          null

Review Comment:
   Avoid using null in Scala, you can use Option instead and None



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -98,6 +100,132 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
       filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
   }
 
+  def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    // Join can be attempted to be pushed down only if left and right side of 
join are
+    // compatible (same data source, for example). Also, another requirement 
is that if
+    // there are projections between Join and ScanBuilderHolder, these 
projections need to be
+    // AttributeReferences. We could probably support Alias as well, but this 
should be on
+    // TODO list.
+    // Alias can exist between Join and sHolder node because the query below 
is not valid:
+    // SELECT * FROM
+    // (SELECT * FROM tbl t1 JOIN tbl2 t2) p
+    // JOIN
+    // (SELECT * FROM tbl t3 JOIN tbl3 t4) q
+    // ON p.t1.col = q.t3.col (this is not possible)
+    // It's because there are 2 same tables in both sides of top level join 
and it not possible
+    // to fully qualified the column names in condition. Therefore, query 
should be rewritten so
+    // that each of the outputs of child joins are aliased, so there would be 
a projection
+    // with aliases between top level join and scanBuilderHolder (that has 
pushed child joins).
+    case node @ Join(
+      PhysicalOperation(
+        leftProjections,
+        Nil,
+        leftHolder @ ScanBuilderHolder(_, _, lBuilder: SupportsPushDownJoin)
+      ),
+      PhysicalOperation(
+        rightProjections,
+        Nil,
+        rightHolder @ ScanBuilderHolder(_, _, rBuilder: SupportsPushDownJoin)
+      ),
+      joinType,
+      condition,
+    _) if conf.dataSourceV2JoinPushdown &&
+      // We do not support pushing down anything besides AttributeReference.
+      leftProjections.forall(_.isInstanceOf[AttributeReference]) &&
+      rightProjections.forall(_.isInstanceOf[AttributeReference]) &&
+      // Cross joins are not supported because they increase the amount of 
data.
+      condition.isDefined &&
+      // Joins on top of sampled tables are not supported
+      leftHolder.pushedSample.isEmpty &&
+      rightHolder.pushedSample.isEmpty &&
+      lBuilder.isOtherSideCompatibleForJoin(rBuilder) =>
+
+      // projections' names are maybe not up to date if the joins have been 
previously pushed down.
+      // For this reason, we need to use pushedJoinOutputMap to get up to date 
names.
+      def getRequiredColumnNames(
+          projections: Seq[NamedExpression],
+          sHolder: ScanBuilderHolder): Array[String] = {
+        val normalizedProjections = DataSourceStrategy.normalizeExprs(
+          projections,
+          sHolder.output.map { a =>
+            sHolder.pushedJoinOutputMap.getOrElse(a, 
a).asInstanceOf[AttributeReference]
+          }
+        ).asInstanceOf[Seq[AttributeReference]]
+
+        normalizedProjections.map(_.name).toArray
+      }
+
+      def generateJoinOutputAlias(name: String): String =
+        s"${name}_${java.util.UUID.randomUUID().toString.replace("-", "_")}"
+
+      val leftSideRequiredColumnNames = 
getRequiredColumnNames(leftProjections, leftHolder)
+      val rightSideRequiredColumnNames = 
getRequiredColumnNames(rightProjections, rightHolder)
+
+      // Alias the duplicated columns from left side of the join.
+      val leftSideRequiredColumnsWithAliases = leftSideRequiredColumnNames.map 
{ name =>
+        val aliasName = if (rightSideRequiredColumnNames.contains(name)) {
+          generateJoinOutputAlias(name)
+        } else {
+          null
+        }
+
+        new SupportsPushDownJoin.ColumnWithAlias(name, aliasName)
+      }
+
+      // Aliasing of duplicated columns in right side of the join is not 
needed because the
+      // the conflicts are resolved by aliasing the left side.
+      val rightSideRequiredColumnsWithAliases = 
rightSideRequiredColumnNames.map { field =>
+        new SupportsPushDownJoin.ColumnWithAlias(field, null)
+      }
+
+      // Create the AttributeMap that holds (Attribute -> Attribute with up to 
date name) mapping.
+      val pushedJoinOutputMap = AttributeMap[Expression](
+        node.output.asInstanceOf[Seq[AttributeReference]]
+          .zip(leftSideRequiredColumnsWithAliases ++ 
rightSideRequiredColumnsWithAliases)
+          .collect {
+            case (attr, columnWithAlias) if columnWithAlias.getAlias != null =>
+              (attr, attr.withName(columnWithAlias.getAlias))
+          }
+          .toMap
+      )
+
+      // Reuse the previously calculated map to update the condition with 
attributes
+      // with up-to-date names
+      val normalizedCondition = condition.map { e =>
+        DataSourceStrategy.normalizeExprs(
+          Seq(e),
+          node.output.map { a =>
+            pushedJoinOutputMap.getOrElse(a, 
a).asInstanceOf[AttributeReference]
+          }
+        ).head
+      }
+
+      val translatedCondition =
+        normalizedCondition.flatMap(DataSourceV2Strategy.translateFilterV2(_))
+      val translatedJoinType = DataSourceStrategy.translateJoinType(joinType)
+
+      if (translatedJoinType.isDefined &&
+        translatedCondition.isDefined &&
+        lBuilder.pushDownJoin(
+          rBuilder,
+          translatedJoinType.get,
+          leftSideRequiredColumnsWithAliases,
+          rightSideRequiredColumnsWithAliases,
+          translatedCondition.get
+        )) {
+        leftHolder.joinedRelations = leftHolder.joinedRelations ++ 
rightHolder.joinedRelations

Review Comment:
   Indention level is same here, it is not readable enough, can you indent if 
conidition for 2 more spaces?



##########
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##########
@@ -265,6 +266,288 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
     super.afterAll()
   }
 
+  test("Test 2-way join without condition - no join pushdown") {
+    val sqlQuery = "SELECT * FROM h2.test.employee a, h2.test.employee b"
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      assert(joinNodes.nonEmpty)
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test multi-way join without condition - no join pushdown") {
+    val sqlQuery = """
+      |SELECT * FROM
+      |h2.test.employee a,
+      |h2.test.employee b,
+      |h2.test.employee c
+      |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      assert(joinNodes.nonEmpty)
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test self join with condition") {
+    val sqlQuery = "SELECT * FROM h2.test.employee a JOIN h2.test.employee b 
ON a.dept = b.dept + 1"
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      assert(joinNodes.isEmpty)
+      checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee]")
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test multi-way self join with conditions") {
+    val sqlQuery = """
+      |SELECT * FROM
+      |h2.test.employee a
+      |JOIN h2.test.employee b ON b.dept = a.dept + 1
+      |JOIN h2.test.employee c ON c.dept = b.dept - 1
+      |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    assert(!rows.isEmpty)
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      assert(joinNodes.isEmpty)
+      checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee, 
h2.test.employee]")
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test self join with column pruning") {
+    val sqlQuery = """
+      |SELECT a.dept + 2, b.dept, b.salary FROM
+      |h2.test.employee a JOIN h2.test.employee b
+      |ON a.dept = b.dept + 1
+      |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      assert(joinNodes.isEmpty)
+      checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee]")
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test 2-way join with column pruning - different tables") {
+    val sqlQuery = """
+      |SELECT * FROM
+      |h2.test.employee a JOIN h2.test.people b
+      |ON a.dept = b.id
+      |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      assert(joinNodes.isEmpty)
+      checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.people]")
+      checkPushedInfo(df,
+        "PushedFilters: [DEPT IS NOT NULL, ID IS NOT NULL, DEPT = ID]")
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test multi-way self join with column pruning") {
+    val sqlQuery = """
+      |SELECT a.dept, b.*, c.dept, c.salary + a.salary
+      |FROM h2.test.employee a
+      |JOIN h2.test.employee b ON b.dept = a.dept + 1
+      |JOIN h2.test.employee c ON c.dept = b.dept - 1
+      |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      assert(joinNodes.isEmpty)
+      checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee, 
h2.test.employee]")
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test aliases not supported in join pushdown") {
+    val sqlQuery = """
+      |SELECT a.dept, bc.*
+      |FROM h2.test.employee a
+      |JOIN (
+      |  SELECT b.*, c.dept AS c_dept, c.salary AS c_salary
+      |  FROM h2.test.employee b
+      |  JOIN h2.test.employee c ON c.dept = b.dept - 1
+      |) bc ON bc.dept = a.dept + 1
+      |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      assert(joinNodes.nonEmpty)
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test aggregate on top of 2-way self join") {
+    val sqlQuery = """
+      |SELECT min(a.dept + b.dept), min(a.dept)
+      |FROM h2.test.employee a
+      |JOIN h2.test.employee b ON a.dept = b.dept + 1
+      |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      val aggNodes = df.queryExecution.optimizedPlan.collect {
+        case a: Aggregate => a
+      }
+
+      assert(joinNodes.isEmpty)
+      assert(aggNodes.isEmpty)
+      checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee]")
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test aggregate on top of multi-way self join") {
+    val sqlQuery = """
+      |SELECT min(a.dept + b.dept), min(a.dept), min(c.dept - 2)
+      |FROM h2.test.employee a
+      |JOIN h2.test.employee b ON b.dept = a.dept + 1
+      |JOIN h2.test.employee c ON c.dept = b.dept - 1
+      |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      val aggNodes = df.queryExecution.optimizedPlan.collect {
+        case a: Aggregate => a
+      }
+
+      assert(joinNodes.isEmpty)
+      assert(aggNodes.isEmpty)
+      checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee, 
h2.test.employee]")
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test sort limit on top of join is pushed down") {
+    val sqlQuery = """
+      |SELECT min(a.dept + b.dept), a.dept, b.dept
+      |FROM h2.test.employee a
+      |JOIN h2.test.employee b ON b.dept = a.dept + 1
+      |GROUP BY a.dept, b.dept
+      |ORDER BY a.dept
+      |LIMIT 1
+      |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(
+      SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      val sortNodes = df.queryExecution.optimizedPlan.collect {
+        case s: Sort => s
+      }
+
+      val limitNodes = df.queryExecution.optimizedPlan.collect {
+        case l: GlobalLimit => l
+      }
+
+      assert(joinNodes.isEmpty)
+      assert(sortNodes.isEmpty)
+      assert(limitNodes.isEmpty)
+      checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee]")
+      checkAnswer(df, rows)
+    }
+  }
+

Review Comment:
   Do we maybe need seperate test suite here, since there are a lot of tests



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala:
##########
@@ -98,6 +100,132 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
       filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder)
   }
 
+  def pushDownJoin(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+    // Join can be attempted to be pushed down only if left and right side of 
join are
+    // compatible (same data source, for example). Also, another requirement 
is that if
+    // there are projections between Join and ScanBuilderHolder, these 
projections need to be
+    // AttributeReferences. We could probably support Alias as well, but this 
should be on
+    // TODO list.
+    // Alias can exist between Join and sHolder node because the query below 
is not valid:
+    // SELECT * FROM
+    // (SELECT * FROM tbl t1 JOIN tbl2 t2) p
+    // JOIN
+    // (SELECT * FROM tbl t3 JOIN tbl3 t4) q
+    // ON p.t1.col = q.t3.col (this is not possible)
+    // It's because there are 2 same tables in both sides of top level join 
and it not possible
+    // to fully qualified the column names in condition. Therefore, query 
should be rewritten so
+    // that each of the outputs of child joins are aliased, so there would be 
a projection
+    // with aliases between top level join and scanBuilderHolder (that has 
pushed child joins).
+    case node @ Join(
+      PhysicalOperation(
+        leftProjections,
+        Nil,
+        leftHolder @ ScanBuilderHolder(_, _, lBuilder: SupportsPushDownJoin)
+      ),
+      PhysicalOperation(
+        rightProjections,
+        Nil,
+        rightHolder @ ScanBuilderHolder(_, _, rBuilder: SupportsPushDownJoin)
+      ),
+      joinType,
+      condition,
+    _) if conf.dataSourceV2JoinPushdown &&
+      // We do not support pushing down anything besides AttributeReference.
+      leftProjections.forall(_.isInstanceOf[AttributeReference]) &&
+      rightProjections.forall(_.isInstanceOf[AttributeReference]) &&
+      // Cross joins are not supported because they increase the amount of 
data.
+      condition.isDefined &&
+      // Joins on top of sampled tables are not supported
+      leftHolder.pushedSample.isEmpty &&
+      rightHolder.pushedSample.isEmpty &&
+      lBuilder.isOtherSideCompatibleForJoin(rBuilder) =>
+
+      // projections' names are maybe not up to date if the joins have been 
previously pushed down.
+      // For this reason, we need to use pushedJoinOutputMap to get up to date 
names.
+      def getRequiredColumnNames(
+          projections: Seq[NamedExpression],
+          sHolder: ScanBuilderHolder): Array[String] = {
+        val normalizedProjections = DataSourceStrategy.normalizeExprs(
+          projections,
+          sHolder.output.map { a =>
+            sHolder.pushedJoinOutputMap.getOrElse(a, 
a).asInstanceOf[AttributeReference]
+          }
+        ).asInstanceOf[Seq[AttributeReference]]
+
+        normalizedProjections.map(_.name).toArray
+      }
+
+      def generateJoinOutputAlias(name: String): String =
+        s"${name}_${java.util.UUID.randomUUID().toString.replace("-", "_")}"
+
+      val leftSideRequiredColumnNames = 
getRequiredColumnNames(leftProjections, leftHolder)
+      val rightSideRequiredColumnNames = 
getRequiredColumnNames(rightProjections, rightHolder)
+
+      // Alias the duplicated columns from left side of the join.
+      val leftSideRequiredColumnsWithAliases = leftSideRequiredColumnNames.map 
{ name =>
+        val aliasName = if (rightSideRequiredColumnNames.contains(name)) {
+          generateJoinOutputAlias(name)
+        } else {
+          null
+        }
+
+        new SupportsPushDownJoin.ColumnWithAlias(name, aliasName)
+      }
+
+      // Aliasing of duplicated columns in right side of the join is not 
needed because the
+      // the conflicts are resolved by aliasing the left side.
+      val rightSideRequiredColumnsWithAliases = 
rightSideRequiredColumnNames.map { field =>
+        new SupportsPushDownJoin.ColumnWithAlias(field, null)
+      }
+
+      // Create the AttributeMap that holds (Attribute -> Attribute with up to 
date name) mapping.
+      val pushedJoinOutputMap = AttributeMap[Expression](
+        node.output.asInstanceOf[Seq[AttributeReference]]
+          .zip(leftSideRequiredColumnsWithAliases ++ 
rightSideRequiredColumnsWithAliases)
+          .collect {
+            case (attr, columnWithAlias) if columnWithAlias.getAlias != null =>
+              (attr, attr.withName(columnWithAlias.getAlias))
+          }
+          .toMap
+      )
+
+      // Reuse the previously calculated map to update the condition with 
attributes
+      // with up-to-date names
+      val normalizedCondition = condition.map { e =>
+        DataSourceStrategy.normalizeExprs(
+          Seq(e),
+          node.output.map { a =>
+            pushedJoinOutputMap.getOrElse(a, 
a).asInstanceOf[AttributeReference]
+          }
+        ).head
+      }
+
+      val translatedCondition =
+        normalizedCondition.flatMap(DataSourceV2Strategy.translateFilterV2(_))
+      val translatedJoinType = DataSourceStrategy.translateJoinType(joinType)
+
+      if (translatedJoinType.isDefined &&
+        translatedCondition.isDefined &&
+        lBuilder.pushDownJoin(
+          rBuilder,
+          translatedJoinType.get,
+          leftSideRequiredColumnsWithAliases,
+          rightSideRequiredColumnsWithAliases,
+          translatedCondition.get
+        )) {
+        leftHolder.joinedRelations = leftHolder.joinedRelations ++ 
rightHolder.joinedRelations
+        leftHolder.pushedPredicates = leftHolder.pushedPredicates ++
+          rightHolder.pushedPredicates :+ translatedCondition.get

Review Comment:
   It is really good we merged these scan builder holders, so users would still 
see predicates are pushed down.
   But, it would be good if users can see which filters are pushed on which 
relation. Maybe we need to modify EXPLAIN a little bit to do recursive tree 
print of scan builder holder. That would be possible if we store scan builder 
holders.
   
   I am fine with doing that in some next PR as well with TODO now.



##########
sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcSQLQueryBuilder.scala:
##########
@@ -91,6 +91,17 @@ class JdbcSQLQueryBuilder(dialect: JdbcDialect, options: 
JDBCOptions) {
     this
   }
 
+  def withAliasedColumns(
+      columns: Array[String],
+      aliases: Array[Option[String]]): JdbcSQLQueryBuilder = {
+    if (columns.nonEmpty) {
+      columnList = columns.zip(aliases).map {

Review Comment:
   Zip will not check for length of two arrays. It will take shorter one. Can 
you add assertion here input parameters have to have same length.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala:
##########
@@ -121,6 +123,114 @@ case class JDBCScanBuilder(
     }
   }
 
+  override def isOtherSideCompatibleForJoin(other: SupportsPushDownJoin): 
Boolean = {
+    if (!jdbcOptions.pushDownJoin || !dialect.supportsJoin) return false
+
+    other.isInstanceOf[JDBCScanBuilder] &&
+      jdbcOptions.url == other.asInstanceOf[JDBCScanBuilder].jdbcOptions.url
+  };
+
+
+  /**
+   * Helper method to calculate StructType based on the 
SupportsPushDownJoin.ColumnWithAlias and
+   * the given schema.
+   *
+   * If ColumnWithAlias object has defined alias, new field with new name 
being equal to alias
+   * should be returned. Otherwise, original field is returned.
+   */
+  private def calculateJoinOutputSchema(
+      columnsWithAliases: Array[SupportsPushDownJoin.ColumnWithAlias],
+      schema: StructType): StructType = {
+    var newSchema = StructType(Seq())
+    columnsWithAliases.foreach { columnWithAlias =>
+      val colName = columnWithAlias.getColName
+      val alias = columnWithAlias.getAlias
+      val field = schema(colName)
+
+      val newName = if (alias == null) colName else alias
+      newSchema = newSchema.add(newName, field.dataType, field.nullable, 
field.metadata)
+    }
+
+    newSchema
+  }
+
+  override def pushDownJoin(
+      other: SupportsPushDownJoin,
+      joinType: JoinType,
+      leftSideRequiredColumnWithAliases: 
Array[SupportsPushDownJoin.ColumnWithAlias],
+      rightSideRequiredColumnWithAliases: 
Array[SupportsPushDownJoin.ColumnWithAlias],
+      condition: Predicate ): Boolean = {
+    if (!jdbcOptions.pushDownJoin || !dialect.supportsJoin) return false

Review Comment:
   It would be more readable if we put return in next line
   ```suggestion
       if (!jdbcOptions.pushDownJoin || !dialect.supportsJoin)
         return false
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -1698,6 +1698,14 @@ object SQLConf {
       .booleanConf
       .createWithDefault(!Utils.isTesting)
 
+  val DATA_SOURCE_V2_JOIN_PUSHDOWN =
+    buildConf("spark.sql.optimizer.datasourceV2JoinPushdown")

Review Comment:
   Shall we enable it by default?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala:
##########
@@ -194,4 +304,13 @@ case class JDBCScanBuilder(
     JDBCScan(JDBCRelation(schema, parts, jdbcOptions)(session), finalSchema, 
pushedPredicate,
       pushedAggregateList, pushedGroupBys, tableSample, pushedLimit, 
sortOrders, pushedOffset)
   }
+
+}
+
+object JoinPushdownAliasGenerator {
+  private val subQueryId = new java.util.concurrent.atomic.AtomicLong()

Review Comment:
   Is this common to use shared Object to store counter on JVM level instead of 
query level? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala:
##########
@@ -121,6 +123,114 @@ case class JDBCScanBuilder(
     }
   }
 
+  override def isOtherSideCompatibleForJoin(other: SupportsPushDownJoin): 
Boolean = {
+    if (!jdbcOptions.pushDownJoin || !dialect.supportsJoin) return false
+
+    other.isInstanceOf[JDBCScanBuilder] &&
+      jdbcOptions.url == other.asInstanceOf[JDBCScanBuilder].jdbcOptions.url
+  };
+
+
+  /**
+   * Helper method to calculate StructType based on the 
SupportsPushDownJoin.ColumnWithAlias and
+   * the given schema.
+   *
+   * If ColumnWithAlias object has defined alias, new field with new name 
being equal to alias
+   * should be returned. Otherwise, original field is returned.
+   */
+  private def calculateJoinOutputSchema(
+      columnsWithAliases: Array[SupportsPushDownJoin.ColumnWithAlias],
+      schema: StructType): StructType = {
+    var newSchema = StructType(Seq())
+    columnsWithAliases.foreach { columnWithAlias =>
+      val colName = columnWithAlias.getColName
+      val alias = columnWithAlias.getAlias
+      val field = schema(colName)
+
+      val newName = if (alias == null) colName else alias
+      newSchema = newSchema.add(newName, field.dataType, field.nullable, 
field.metadata)
+    }
+
+    newSchema
+  }
+
+  override def pushDownJoin(
+      other: SupportsPushDownJoin,
+      joinType: JoinType,
+      leftSideRequiredColumnWithAliases: 
Array[SupportsPushDownJoin.ColumnWithAlias],
+      rightSideRequiredColumnWithAliases: 
Array[SupportsPushDownJoin.ColumnWithAlias],
+      condition: Predicate ): Boolean = {
+    if (!jdbcOptions.pushDownJoin || !dialect.supportsJoin) return false
+    val otherJdbcScanBuilder = other.asInstanceOf[JDBCScanBuilder]
+
+    // Get left side and right side of join sql queries. These will be used as 
subqueries in final
+    // join query.
+    val sqlQuery = 
buildSQLQueryUsedInJoinPushDown(leftSideRequiredColumnWithAliases)
+    val otherSideSqlQuery = otherJdbcScanBuilder
+      .buildSQLQueryUsedInJoinPushDown(rightSideRequiredColumnWithAliases)
+
+    // requiredSchema will become the finalSchema of this JDBCScanBuilder
+    var requiredSchema = StructType(Seq())
+    requiredSchema = 
calculateJoinOutputSchema(leftSideRequiredColumnWithAliases, finalSchema)
+    requiredSchema = requiredSchema.merge(
+      calculateJoinOutputSchema(
+        rightSideRequiredColumnWithAliases,
+        otherJdbcScanBuilder.finalSchema
+      )
+    )
+
+    val joinOutputColumnsString =
+      requiredSchema.fields.map(f => 
dialect.quoteIdentifier(f.name)).mkString(",")
+
+    val joinTypeStringOption = joinType match {
+      case JoinType.INNER_JOIN => Some("INNER JOIN")
+      case _ => None
+    }
+
+    if (!joinTypeStringOption.isDefined) return false
+
+    val compiledCondition = dialect.compileExpression(condition)
+    if (!compiledCondition.isDefined) return false
+
+    val conditionString = compiledCondition.get
+
+    val joinQuery = s"""
+      |SELECT $joinOutputColumnsString FROM
+      |($sqlQuery) ${JoinPushdownAliasGenerator.getSubqueryQualifier}
+      |${joinTypeStringOption.get}
+      |($otherSideSqlQuery) ${JoinPushdownAliasGenerator.getSubqueryQualifier}
+      |ON $conditionString
+      |""".stripMargin
+
+    val newMap = jdbcOptions.parameters.originalMap +
+      (JDBCOptions.JDBC_QUERY_STRING -> joinQuery) - 
(JDBCOptions.JDBC_TABLE_NAME)

Review Comment:
   Do we need this parentheses?
   ```suggestion
         (JDBCOptions.JDBC_QUERY_STRING -> joinQuery) - 
JDBCOptions.JDBC_TABLE_NAME
   ```



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownJoin.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
+import org.apache.spark.sql.connector.join.JoinType;
+
+/**
+ * A mix-in interface for {@link ScanBuilder}. Data sources can implement this 
interface to
+ * push down join operators.
+ *
+ * @since 4.1.0
+ */
+@Evolving
+public interface SupportsPushDownJoin extends ScanBuilder {
+  /**
+   * Returns true if the other side of the join is compatible with the
+   * current {@code SupportsPushDownJoin} for a join push down, meaning both 
sides can be
+   * processed together within the same underlying data source.
+   * <br>
+   * <br>
+   * For example, JDBC connectors are compatible if they use the same
+   * host, port, username, and password.
+   */
+  boolean isOtherSideCompatibleForJoin(SupportsPushDownJoin other);
+
+  /**
+   * Pushes down the join of the current {@code SupportsPushDownJoin} and the 
other side of join
+   * {@code SupportsPushDownJoin}.
+   *
+   * @param other {@code SupportsPushDownJoin} that this {@code 
SupportsPushDownJoin}
+   * gets joined with.
+   * @param joinType the type of join.
+   * @param leftSideRequiredColumnWithAliases required output of the left side 
{@code SupportsPushDownJoin}
+   * @param rightSideRequiredColumnWithAliases required output of the right 
side {@code SupportsPushDownJoin}
+   * @param condition join condition. Columns are named after the specified 
aliases in
+   * {@code leftSideRequiredColumnWithAliases} and {@code 
rightSideRequiredColumnWithAliases}
+   * @return True if join has been successfully pushed down.
+   */
+  boolean pushDownJoin(
+      SupportsPushDownJoin other,
+      JoinType joinType,
+      ColumnWithAlias[] leftSideRequiredColumnWithAliases,
+      ColumnWithAlias[] rightSideRequiredColumnWithAliases,
+      Predicate condition
+  );
+
+  /**
+   *  A helper class used when there are duplicated names coming from 2 sides 
of the join
+   *  operator.
+   *  <br>
+   *  Holds information of original output name and the alias of the new 
output.
+   */
+  class ColumnWithAlias {

Review Comment:
   Java `record` can be used here instead of `class`, it provides more concise 
implementation for structs.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala:
##########
@@ -121,6 +123,114 @@ case class JDBCScanBuilder(
     }
   }
 
+  override def isOtherSideCompatibleForJoin(other: SupportsPushDownJoin): 
Boolean = {
+    if (!jdbcOptions.pushDownJoin || !dialect.supportsJoin) return false
+
+    other.isInstanceOf[JDBCScanBuilder] &&
+      jdbcOptions.url == other.asInstanceOf[JDBCScanBuilder].jdbcOptions.url
+  };
+
+
+  /**
+   * Helper method to calculate StructType based on the 
SupportsPushDownJoin.ColumnWithAlias and
+   * the given schema.
+   *
+   * If ColumnWithAlias object has defined alias, new field with new name 
being equal to alias
+   * should be returned. Otherwise, original field is returned.
+   */
+  private def calculateJoinOutputSchema(
+      columnsWithAliases: Array[SupportsPushDownJoin.ColumnWithAlias],
+      schema: StructType): StructType = {
+    var newSchema = StructType(Seq())
+    columnsWithAliases.foreach { columnWithAlias =>
+      val colName = columnWithAlias.getColName
+      val alias = columnWithAlias.getAlias
+      val field = schema(colName)
+
+      val newName = if (alias == null) colName else alias
+      newSchema = newSchema.add(newName, field.dataType, field.nullable, 
field.metadata)
+    }
+
+    newSchema
+  }
+
+  override def pushDownJoin(
+      other: SupportsPushDownJoin,
+      joinType: JoinType,
+      leftSideRequiredColumnWithAliases: 
Array[SupportsPushDownJoin.ColumnWithAlias],
+      rightSideRequiredColumnWithAliases: 
Array[SupportsPushDownJoin.ColumnWithAlias],
+      condition: Predicate ): Boolean = {
+    if (!jdbcOptions.pushDownJoin || !dialect.supportsJoin) return false
+    val otherJdbcScanBuilder = other.asInstanceOf[JDBCScanBuilder]
+
+    // Get left side and right side of join sql queries. These will be used as 
subqueries in final
+    // join query.
+    val sqlQuery = 
buildSQLQueryUsedInJoinPushDown(leftSideRequiredColumnWithAliases)
+    val otherSideSqlQuery = otherJdbcScanBuilder
+      .buildSQLQueryUsedInJoinPushDown(rightSideRequiredColumnWithAliases)
+
+    // requiredSchema will become the finalSchema of this JDBCScanBuilder
+    var requiredSchema = StructType(Seq())
+    requiredSchema = 
calculateJoinOutputSchema(leftSideRequiredColumnWithAliases, finalSchema)
+    requiredSchema = requiredSchema.merge(
+      calculateJoinOutputSchema(
+        rightSideRequiredColumnWithAliases,
+        otherJdbcScanBuilder.finalSchema
+      )
+    )
+
+    val joinOutputColumnsString =
+      requiredSchema.fields.map(f => 
dialect.quoteIdentifier(f.name)).mkString(",")
+
+    val joinTypeStringOption = joinType match {
+      case JoinType.INNER_JOIN => Some("INNER JOIN")
+      case _ => None
+    }
+
+    if (!joinTypeStringOption.isDefined) return false
+
+    val compiledCondition = dialect.compileExpression(condition)
+    if (!compiledCondition.isDefined) return false
+
+    val conditionString = compiledCondition.get
+
+    val joinQuery = s"""
+      |SELECT $joinOutputColumnsString FROM
+      |($sqlQuery) ${JoinPushdownAliasGenerator.getSubqueryQualifier}
+      |${joinTypeStringOption.get}
+      |($otherSideSqlQuery) ${JoinPushdownAliasGenerator.getSubqueryQualifier}
+      |ON $conditionString
+      |""".stripMargin
+
+    val newMap = jdbcOptions.parameters.originalMap +
+      (JDBCOptions.JDBC_QUERY_STRING -> joinQuery) - 
(JDBCOptions.JDBC_TABLE_NAME)
+
+    jdbcOptions = new JDBCOptions(newMap)
+    finalSchema = requiredSchema
+
+    // We need to reset the pushedPredicate because it has already been 
consumed in previously
+    // crafted SQL query.
+    pushedPredicate = Array.empty[Predicate]
+
+    true
+  }
+
+  def buildSQLQueryUsedInJoinPushDown(
+      columnsWithAliases: Array[SupportsPushDownJoin.ColumnWithAlias]): String 
= {

Review Comment:
   This method can return `JdbcSQLQueryBuilder` instead of string if we want to 
make new API for building query in `JdbcSQLQueryBuilder`.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala:
##########
@@ -121,6 +123,114 @@ case class JDBCScanBuilder(
     }
   }
 
+  override def isOtherSideCompatibleForJoin(other: SupportsPushDownJoin): 
Boolean = {
+    if (!jdbcOptions.pushDownJoin || !dialect.supportsJoin) return false
+
+    other.isInstanceOf[JDBCScanBuilder] &&
+      jdbcOptions.url == other.asInstanceOf[JDBCScanBuilder].jdbcOptions.url
+  };
+
+
+  /**
+   * Helper method to calculate StructType based on the 
SupportsPushDownJoin.ColumnWithAlias and
+   * the given schema.
+   *
+   * If ColumnWithAlias object has defined alias, new field with new name 
being equal to alias
+   * should be returned. Otherwise, original field is returned.
+   */
+  private def calculateJoinOutputSchema(
+      columnsWithAliases: Array[SupportsPushDownJoin.ColumnWithAlias],
+      schema: StructType): StructType = {
+    var newSchema = StructType(Seq())
+    columnsWithAliases.foreach { columnWithAlias =>
+      val colName = columnWithAlias.getColName
+      val alias = columnWithAlias.getAlias
+      val field = schema(colName)
+
+      val newName = if (alias == null) colName else alias
+      newSchema = newSchema.add(newName, field.dataType, field.nullable, 
field.metadata)
+    }
+
+    newSchema
+  }
+
+  override def pushDownJoin(
+      other: SupportsPushDownJoin,
+      joinType: JoinType,
+      leftSideRequiredColumnWithAliases: 
Array[SupportsPushDownJoin.ColumnWithAlias],
+      rightSideRequiredColumnWithAliases: 
Array[SupportsPushDownJoin.ColumnWithAlias],
+      condition: Predicate ): Boolean = {
+    if (!jdbcOptions.pushDownJoin || !dialect.supportsJoin) return false
+    val otherJdbcScanBuilder = other.asInstanceOf[JDBCScanBuilder]
+
+    // Get left side and right side of join sql queries. These will be used as 
subqueries in final
+    // join query.
+    val sqlQuery = 
buildSQLQueryUsedInJoinPushDown(leftSideRequiredColumnWithAliases)
+    val otherSideSqlQuery = otherJdbcScanBuilder
+      .buildSQLQueryUsedInJoinPushDown(rightSideRequiredColumnWithAliases)
+
+    // requiredSchema will become the finalSchema of this JDBCScanBuilder
+    var requiredSchema = StructType(Seq())
+    requiredSchema = 
calculateJoinOutputSchema(leftSideRequiredColumnWithAliases, finalSchema)
+    requiredSchema = requiredSchema.merge(
+      calculateJoinOutputSchema(
+        rightSideRequiredColumnWithAliases,
+        otherJdbcScanBuilder.finalSchema
+      )
+    )
+
+    val joinOutputColumnsString =
+      requiredSchema.fields.map(f => 
dialect.quoteIdentifier(f.name)).mkString(",")
+
+    val joinTypeStringOption = joinType match {
+      case JoinType.INNER_JOIN => Some("INNER JOIN")
+      case _ => None
+    }
+
+    if (!joinTypeStringOption.isDefined) return false
+
+    val compiledCondition = dialect.compileExpression(condition)
+    if (!compiledCondition.isDefined) return false
+
+    val conditionString = compiledCondition.get
+
+    val joinQuery = s"""

Review Comment:
   We should reuse JDBC Query builder class to build query (that class also 
have overrides for specific databases so we don't need another class hierarchy 
in the future)
   
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcSQLQueryBuilder.scala
   
   Some API like:
   `withJoin(left: JdbcSQLQueryBuilder, right: JdbcSQLQueryBuilder): 
JdbcSQLQueryBuilder`
   
   
   



##########
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##########
@@ -265,6 +266,288 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
     super.afterAll()
   }
 
+  test("Test 2-way join without condition - no join pushdown") {
+    val sqlQuery = "SELECT * FROM h2.test.employee a, h2.test.employee b"
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      assert(joinNodes.nonEmpty)
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test multi-way join without condition - no join pushdown") {
+    val sqlQuery = """
+      |SELECT * FROM
+      |h2.test.employee a,
+      |h2.test.employee b,
+      |h2.test.employee c
+      |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      assert(joinNodes.nonEmpty)
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test self join with condition") {
+    val sqlQuery = "SELECT * FROM h2.test.employee a JOIN h2.test.employee b 
ON a.dept = b.dept + 1"
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      assert(joinNodes.isEmpty)
+      checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee]")
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test multi-way self join with conditions") {
+    val sqlQuery = """
+      |SELECT * FROM
+      |h2.test.employee a
+      |JOIN h2.test.employee b ON b.dept = a.dept + 1
+      |JOIN h2.test.employee c ON c.dept = b.dept - 1
+      |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    assert(!rows.isEmpty)
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      assert(joinNodes.isEmpty)
+      checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee, 
h2.test.employee]")
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test self join with column pruning") {
+    val sqlQuery = """
+      |SELECT a.dept + 2, b.dept, b.salary FROM
+      |h2.test.employee a JOIN h2.test.employee b
+      |ON a.dept = b.dept + 1
+      |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      assert(joinNodes.isEmpty)
+      checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee]")
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test 2-way join with column pruning - different tables") {
+    val sqlQuery = """
+      |SELECT * FROM
+      |h2.test.employee a JOIN h2.test.people b
+      |ON a.dept = b.id
+      |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      assert(joinNodes.isEmpty)
+      checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.people]")
+      checkPushedInfo(df,
+        "PushedFilters: [DEPT IS NOT NULL, ID IS NOT NULL, DEPT = ID]")
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test multi-way self join with column pruning") {
+    val sqlQuery = """
+      |SELECT a.dept, b.*, c.dept, c.salary + a.salary
+      |FROM h2.test.employee a
+      |JOIN h2.test.employee b ON b.dept = a.dept + 1
+      |JOIN h2.test.employee c ON c.dept = b.dept - 1
+      |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      assert(joinNodes.isEmpty)
+      checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee, 
h2.test.employee]")
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test aliases not supported in join pushdown") {
+    val sqlQuery = """
+      |SELECT a.dept, bc.*
+      |FROM h2.test.employee a
+      |JOIN (
+      |  SELECT b.*, c.dept AS c_dept, c.salary AS c_salary
+      |  FROM h2.test.employee b
+      |  JOIN h2.test.employee c ON c.dept = b.dept - 1
+      |) bc ON bc.dept = a.dept + 1
+      |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      assert(joinNodes.nonEmpty)
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test aggregate on top of 2-way self join") {
+    val sqlQuery = """
+      |SELECT min(a.dept + b.dept), min(a.dept)
+      |FROM h2.test.employee a
+      |JOIN h2.test.employee b ON a.dept = b.dept + 1
+      |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      val aggNodes = df.queryExecution.optimizedPlan.collect {
+        case a: Aggregate => a
+      }
+
+      assert(joinNodes.isEmpty)
+      assert(aggNodes.isEmpty)
+      checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee]")
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test aggregate on top of multi-way self join") {
+    val sqlQuery = """
+      |SELECT min(a.dept + b.dept), min(a.dept), min(c.dept - 2)
+      |FROM h2.test.employee a
+      |JOIN h2.test.employee b ON b.dept = a.dept + 1
+      |JOIN h2.test.employee c ON c.dept = b.dept - 1
+      |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      val aggNodes = df.queryExecution.optimizedPlan.collect {
+        case a: Aggregate => a
+      }
+
+      assert(joinNodes.isEmpty)
+      assert(aggNodes.isEmpty)
+      checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee, 
h2.test.employee]")
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test sort limit on top of join is pushed down") {
+    val sqlQuery = """
+      |SELECT min(a.dept + b.dept), a.dept, b.dept
+      |FROM h2.test.employee a
+      |JOIN h2.test.employee b ON b.dept = a.dept + 1
+      |GROUP BY a.dept, b.dept
+      |ORDER BY a.dept
+      |LIMIT 1
+      |""".stripMargin
+
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(
+      SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      val sortNodes = df.queryExecution.optimizedPlan.collect {
+        case s: Sort => s
+      }
+
+      val limitNodes = df.queryExecution.optimizedPlan.collect {
+        case l: GlobalLimit => l
+      }
+
+      assert(joinNodes.isEmpty)
+      assert(sortNodes.isEmpty)
+      assert(limitNodes.isEmpty)
+      checkPushedInfo(df, "PushedJoins: [h2.test.employee, h2.test.employee]")
+      checkAnswer(df, rows)
+    }
+  }

Review Comment:
   We have no test with another table options. I suggest making grid tests 
here, all of your tests should pass with different DSv2 table options. For 
example, we don't have test to verify table with `query` option works fine when 
JOIN pushdown is enabled.



##########
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##########
@@ -265,6 +266,288 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
     super.afterAll()
   }
 
+  test("Test 2-way join without condition - no join pushdown") {
+    val sqlQuery = "SELECT * FROM h2.test.employee a, h2.test.employee b"
+    val rows = withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> 
"false") {
+      sql(sqlQuery).collect().toSeq
+    }
+
+    withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") {
+      val df = sql(sqlQuery)
+      val joinNodes = df.queryExecution.optimizedPlan.collect {
+        case j: Join => j
+      }
+
+      assert(joinNodes.nonEmpty)
+      checkAnswer(df, rows)
+    }
+  }
+
+  test("Test multi-way join without condition - no join pushdown") {

Review Comment:
   This one as well, something like that would be more approriate
   ```
     test("Compare multi-way join pushdown with no pushdown") {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to