cloud-fan commented on code in PR #36438:
URL: https://github.com/apache/spark/pull/36438#discussion_r866839375


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PropagateEmptyPartitions.scala:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import org.apache.spark.sql.execution.joins._
+
+/**
+ * A rule to propagate empty partitions, so that some unnecessary shuffle read 
can be skipped.
+ *
+ * The general idea is to utilize the shuffled join to skip some partitions.
+ *
+ * For example, assume the shuffled join has 4 partitions, and L2 and R3 are 
empty:
+ * left:  [L1, L2, L3, L4]
+ * right: [R1, R2, R3, R4]
+ *
+ * Suppose the join type is Inner. Then this rule will skip reading 
partitions: L2, R2, L3, R3.
+ *
+ * Suppose the join type is LeftOuter. Then this rule will skip reading 
partitions: L2, R2, R3.
+ *
+ * Suppose the join type is RightOuter. Then this rule will skip reading 
partitions: L2, L3, R3.
+ *
+ * Suppose the join type is FullOuter. Then this rule will skip reading 
partitions: L2, R3.
+ */
+object PropagateEmptyPartitions extends AQEShuffleReadRule {
+
+  override val supportedShuffleOrigins: Seq[ShuffleOrigin] =
+    Seq(ENSURE_REQUIREMENTS, REPARTITION_BY_NUM, REPARTITION_BY_COL,
+      REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL)
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.propagateEmptyPartitionsEnabled) {
+      return plan
+    }
+
+    // If there is no ShuffledJoin, no need to continue.
+    if (plan.collectFirst { case j: ShuffledJoin => j }.isEmpty) {
+      return plan
+    }
+
+    val stages = plan.collect { case s: ShuffleQueryStageExec => s }
+    // currently, empty information is only extracted from and propagated to 
shuffle data.
+    // TODO: support DataScan in the future.
+    if (stages.size < 2 || !stages.forall(_.isMaterialized)) {
+      return plan
+    }
+
+    val (_, emptyGroupInfos) = collectEmptyGroups(plan)

Review Comment:
   The code looks a bit confusing to me. I'm expecting something similar to 
`OptimizeSkewedJoin`: we match shuffle join with 2 shuffle stages, and optimize 
the join node.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to