This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 8a481d8  [SPARK-32659][SQL][FOLLOWUP][3.0] Broadcast Array instead of 
Set in InSubqueryExec
8a481d8 is described below

commit 8a481d8c336360bc5dfa518af70590251e5b61bc
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Tue Sep 22 10:58:33 2020 -0700

    [SPARK-32659][SQL][FOLLOWUP][3.0] Broadcast Array instead of Set in 
InSubqueryExec
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/29475.
    
    This PR updates the code to broadcast the Array instead of Set, which was 
the behavior before #29475
    
    ### Why are the changes needed?
    
    The size of Set can be much bigger than Array. It's safer to keep the 
behavior the same as before and build the set at the executor side.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #29840 from cloud-fan/backport.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../main/scala/org/apache/spark/sql/execution/subquery.scala | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index 9d15c76..48d6210 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{expressions, InternalRow}
-import org.apache.spark.sql.catalyst.expressions.{AttributeSeq, 
CreateNamedStruct, Expression, ExprId, InSet, ListQuery, Literal, 
PlanExpression}
+import org.apache.spark.sql.catalyst.expressions.{CreateNamedStruct, 
Expression, ExprId, InSet, ListQuery, Literal, PlanExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
@@ -114,10 +114,10 @@ case class InSubqueryExec(
     child: Expression,
     plan: BaseSubqueryExec,
     exprId: ExprId,
-    private var resultBroadcast: Broadcast[Set[Any]] = null) extends 
ExecSubqueryExpression {
+    private var resultBroadcast: Broadcast[Array[Any]] = null) extends 
ExecSubqueryExpression {
 
-  @transient private var result: Set[Any] = _
-  @transient private lazy val inSet = InSet(child, result)
+  @transient private var result: Array[Any] = _
+  @transient private lazy val inSet = InSet(child, result.toSet)
 
   override def dataType: DataType = BooleanType
   override def children: Seq[Expression] = child :: Nil
@@ -132,11 +132,11 @@ case class InSubqueryExec(
 
   def updateResult(): Unit = {
     val rows = plan.executeCollect()
-    result = rows.map(_.get(0, child.dataType)).toSet
+    result = rows.map(_.get(0, child.dataType))
     resultBroadcast = plan.sqlContext.sparkContext.broadcast(result)
   }
 
-  def values(): Option[Set[Any]] = Option(resultBroadcast).map(_.value)
+  def values(): Option[Array[Any]] = Option(resultBroadcast).map(_.value)
 
   private def prepareResult(): Unit = {
     require(resultBroadcast != null, s"$this has not finished")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to