unigof commented on code in PR #37630:
URL: https://github.com/apache/spark/pull/37630#discussion_r1427878186


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala:
##########
@@ -223,87 +224,383 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
     }
   }
 
-  // Recursively traverse down and try merging 2 plans. If merge is possible 
then return the merged
-  // plan with the attribute mapping from the new to the merged version.
-  // Please note that merging arbitrary plans can be complicated, the current 
version supports only
-  // some of the most important nodes.
+  /**
+   * Recursively traverse down and try merging 2 plans.
+   *
+   * Please note that merging arbitrary plans can be complicated, the current 
version supports only
+   * some of the most important nodes.
+   *
+   * @param newPlan a new plan that we want to merge to an already processed 
plan
+   * @param cachedPlan a plan that we already processed, it can be either an 
original plan or a
+   *                   merged version of 2 or more plans
+   * @param filterPropagationSupported a boolean flag that we propagate down 
to signal we have seen
+   *                                   an `Aggregate` node where propagated 
filters can be merged
+   * @return A tuple of:
+   *         - the merged plan,
+   *         - the attribute mapping from the new to the merged version,
+   *         - the 2 optional filters of both plans that we need to propagate 
up and merge in
+   *           an ancestor `Aggregate` node if possible,
+   *         - the optional accumulated extra cost of merge that we need to 
propagate up and
+   *           check in the ancestor `Aggregate` node.
+   *         The cost is optional to signal if the cost needs to be taken into 
account up in the
+   *         `Aggregate` node to decide about merge.
+   */
   private def tryMergePlans(
       newPlan: LogicalPlan,
-      cachedPlan: LogicalPlan): Option[(LogicalPlan, AttributeMap[Attribute])] 
= {
-    checkIdenticalPlans(newPlan, cachedPlan).map(cachedPlan -> _).orElse(
+      cachedPlan: LogicalPlan,
+      filterPropagationSupported: Boolean):
+    Option[(LogicalPlan, AttributeMap[Attribute], Option[Expression], 
Option[Expression],
+        Option[Double])] = {
+    checkIdenticalPlans(newPlan, cachedPlan).map { outputMap =>
+      // Currently the cost is always propagated up when 
`filterPropagationSupported` is true but
+      // later we can address cases when we don't need to take cost into 
account. Please find the
+      // details at the `Filter` node handling.
+      val mergeCost = if (filterPropagationSupported) Some(0d) else None
+
+      (cachedPlan, outputMap, None, None, mergeCost)
+    }.orElse(
       (newPlan, cachedPlan) match {

Review Comment:
   Could you add DSv2 support(especially parquet) for this pr? 
   I can test it's performance in our production env, thank you very much



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to