wangyum commented on a change in pull request #31567:
URL: https://github.com/apache/spark/pull/31567#discussion_r577372969



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -539,17 +539,22 @@ object LimitPushDown extends Rule[LogicalPlan] {
     // pushdown Limit.
     case LocalLimit(exp, u: Union) =>
       LocalLimit(exp, u.copy(children = 
u.children.map(maybePushLocalLimit(exp, _))))
-    // Add extra limits below OUTER JOIN. For LEFT OUTER and RIGHT OUTER JOIN 
we push limits to
-    // the left and right sides, respectively. It's not safe to push limits 
below FULL OUTER
-    // JOIN in the general case without a more invasive rewrite.
+    // Add extra limits below JOIN. For LEFT OUTER and RIGHT OUTER JOIN we 
push limits to
+    // the left and right sides, respectively. For INNER and CROSS JOIN we 
push limits to
+    // both the left and right sides if join condition is empty. It's not safe 
to push limits
+    // below FULL OUTER JOIN in the general case without a more invasive 
rewrite.
     // We also need to ensure that this limit pushdown rule will not 
eventually introduce limits
     // on both sides if it is applied multiple times. Therefore:
     //   - If one side is already limited, stack another limit on top if the 
new limit is smaller.
     //     The redundant limit will be collapsed by the CombineLimits rule.
-    case LocalLimit(exp, join @ Join(left, right, joinType, _, _)) =>
+    case LocalLimit(exp, join @ Join(left, right, joinType, conditionOpt, _)) 
=>
       val newJoin = joinType match {
         case RightOuter => join.copy(right = maybePushLocalLimit(exp, right))
         case LeftOuter => join.copy(left = maybePushLocalLimit(exp, left))

Review comment:
       It seems we can not pushdown `LEFT SEMI JOIN`, for example:
   ```scala
   spark.range(20).selectExpr("id % 10 as 
id").repartition(1).write.saveAsTable("t1")
   spark.range(5, 9, 1).repartition(1).write.saveAsTable("t2")
   val df = spark.sql("select * from t1 LEFT SEMI JOIN t2 on t1.id = t2.id 
limit 3")
   df.explain()
   df.show
   ```
   
   Current:
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- CollectLimit 3
      +- BroadcastHashJoin [id#10L], [id#11L], LeftSemi, BuildRight, false
         :- Filter isnotnull(id#10L)
         :  +- FileScan parquet default.t1[id#10L] Batched: true, DataFilters: 
[isnotnull(id#10L)], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark...,
 PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
struct<id:bigint>
         +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
false]),false), [id=#69]
            +- Filter isnotnull(id#11L)
               +- FileScan parquet default.t2[id#11L] Batched: true, 
DataFilters: [isnotnull(id#11L)], Format: Parquet, Location: 
InMemoryFileIndex(1 
paths)[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark...,
 PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
struct<id:bigint>
   
   
   +---+
   | id|
   +---+
   |  5|
   |  6|
   |  7|
   +---+
   ```
   
   Pushdown:
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- CollectLimit 3
      +- BroadcastHashJoin [id#10L], [id#11L], LeftSemi, BuildRight, false
         :- LocalLimit 3
         :  +- Filter isnotnull(id#10L)
         :     +- LocalLimit 3
         :        +- FileScan parquet default.t1[id#10L] Batched: true, 
DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>
         +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
false]),false), [id=#77]
            +- Filter isnotnull(id#11L)
               +- FileScan parquet default.t2[id#11L] Batched: true, 
DataFilters: [isnotnull(id#11L)], Format: Parquet, Location: 
InMemoryFileIndex(1 
paths)[file:/Users/yumwang/spark/SPARK-28169/spark-warehouse/org.apache.spark...,
 PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
struct<id:bigint>
   
   
   +---+
   | id|
   +---+
   +---+
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to