[GitHub] [spark] HyukjinKwon commented on a change in pull request #34051: [SPARK-36809][SQL] Remove broadcast for InSubqueryExec used in DPP
HyukjinKwon commented on a change in pull request #34051: URL: https://github.com/apache/spark/pull/34051#discussion_r716132533 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala ## @@ -172,8 +172,7 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join // We can't reuse the broadcast because the join type doesn't support broadcast, // and doing DPP means running an extra query that may have significant overhead. // We need to make sure the pruning side is very big so that DPP is still worthy. - canBroadcastBySize(otherPlan, conf) && Review comment: cc @maryannxue FYI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #34051: [SPARK-36809][SQL] Remove broadcast for InSubqueryExec used in DPP
HyukjinKwon commented on a change in pull request #34051: URL: https://github.com/apache/spark/pull/34051#discussion_r715289655 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala ## @@ -157,7 +161,8 @@ case class InSubqueryExec( child = child.canonicalized, plan = plan.canonicalized.asInstanceOf[BaseSubqueryExec], exprId = ExprId(0), - resultBroadcast = null) + resultBroadcast = null, + result = null) Review comment: I see, okie. that's fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #34051: [SPARK-36809][SQL] Remove broadcast for InSubqueryExec used in DPP
HyukjinKwon commented on a change in pull request #34051: URL: https://github.com/apache/spark/pull/34051#discussion_r715282461 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala ## @@ -157,7 +161,8 @@ case class InSubqueryExec( child = child.canonicalized, plan = plan.canonicalized.asInstanceOf[BaseSubqueryExec], exprId = ExprId(0), - resultBroadcast = null) + resultBroadcast = null, + result = null) Review comment: Ah, yeah. It only copies the params in the constructor (I was confused w/ https://github.com/databricks/scala-style-guide#case-classes-and-immutability): ```scala scala> case class A() { | @transient var a: String = _ | } defined class A scala> val a = A() a: A = A() scala> a.a res1: String = null scala> a.a = "b" a.a: String = b scala> a.copy().a res2: String = null scala> a.a res3: String = b ``` In any event, we won't bother moving `result` to constructor I guess. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #34051: [SPARK-36809][SQL] Remove broadcast for InSubqueryExec used in DPP
HyukjinKwon commented on a change in pull request #34051: URL: https://github.com/apache/spark/pull/34051#discussion_r715279903 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala ## @@ -157,7 +161,8 @@ case class InSubqueryExec( child = child.canonicalized, plan = plan.canonicalized.asInstanceOf[BaseSubqueryExec], exprId = ExprId(0), - resultBroadcast = null) + resultBroadcast = null, + result = null) Review comment: I thought it only works for `val`s. let me double check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #34051: [SPARK-36809][SQL] Remove broadcast for InSubqueryExec used in DPP
HyukjinKwon commented on a change in pull request #34051: URL: https://github.com/apache/spark/pull/34051#discussion_r714443615 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala ## @@ -157,7 +161,8 @@ case class InSubqueryExec( child = child.canonicalized, plan = plan.canonicalized.asInstanceOf[BaseSubqueryExec], exprId = ExprId(0), - resultBroadcast = null) + resultBroadcast = null, + result = null) Review comment: hm, IIRC when it copies, it won't copy `@transient private var result: Array[Any] = _` .. I think we won't have to move `result` into the constructor (?). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #34051: [SPARK-36809][SQL] Remove broadcast for InSubqueryExec used in DPP
HyukjinKwon commented on a change in pull request #34051: URL: https://github.com/apache/spark/pull/34051#discussion_r714442193 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala ## @@ -104,17 +104,18 @@ case class ScalarSubquery( } /** - * The physical node of in-subquery. This is for Dynamic Partition Pruning only, as in-subquery - * coming from the original query will always be converted to joins. + * The physical node of in-subquery. When this is used for Dynamic Partition Pruning, as the pruning + * happens at the driver side, we don't broadcast subquery result. */ case class InSubqueryExec( child: Expression, plan: BaseSubqueryExec, exprId: ExprId, -private var resultBroadcast: Broadcast[Array[Any]] = null) +needBroadcast: Boolean = false, +private var resultBroadcast: Broadcast[Array[Any]] = null, +@transient private var result: Array[Any] = null) Review comment: qq: why should we move this to constructor? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #34051: [SPARK-36809][SQL] Remove broadcast for InSubqueryExec used in DPP
HyukjinKwon commented on a change in pull request #34051: URL: https://github.com/apache/spark/pull/34051#discussion_r714441608 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala ## @@ -104,17 +104,18 @@ case class ScalarSubquery( } /** - * The physical node of in-subquery. This is for Dynamic Partition Pruning only, as in-subquery - * coming from the original query will always be converted to joins. + * The physical node of in-subquery. When this is used for Dynamic Partition Pruning, as the pruning + * happens at the driver side, we don't broadcast subquery result. */ case class InSubqueryExec( child: Expression, plan: BaseSubqueryExec, exprId: ExprId, -private var resultBroadcast: Broadcast[Array[Any]] = null) +needBroadcast: Boolean = false, Review comment: nit but I would name it `shouldBroadcast` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #34051: [SPARK-36809][SQL] Remove broadcast for InSubqueryExec used in DPP
HyukjinKwon commented on a change in pull request #34051: URL: https://github.com/apache/spark/pull/34051#discussion_r714439033 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala ## @@ -130,14 +131,17 @@ case class InSubqueryExec( } else { rows.map(_.get(0, child.dataType)) } -resultBroadcast = plan.session.sparkContext.broadcast(result) +if (needBroadcast) { + resultBroadcast = plan.session.sparkContext.broadcast(result) +} } - def values(): Option[Array[Any]] = Option(resultBroadcast).map(_.value) + // This is used only by DPP where we don't need broadcast the result. + def values(): Option[Array[Any]] = Option(result) private def prepareResult(): Unit = { Review comment: or combine w/ the `require` below -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #34051: [SPARK-36809][SQL] Remove broadcast for InSubqueryExec used in DPP
HyukjinKwon commented on a change in pull request #34051: URL: https://github.com/apache/spark/pull/34051#discussion_r714438751 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala ## @@ -130,14 +131,17 @@ case class InSubqueryExec( } else { rows.map(_.get(0, child.dataType)) } -resultBroadcast = plan.session.sparkContext.broadcast(result) +if (needBroadcast) { + resultBroadcast = plan.session.sparkContext.broadcast(result) +} } - def values(): Option[Array[Any]] = Option(resultBroadcast).map(_.value) + // This is used only by DPP where we don't need broadcast the result. + def values(): Option[Array[Any]] = Option(result) private def prepareResult(): Unit = { Review comment: maybe just adding an assert for sanity check would be fine? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org