dilipbiswal commented on a change in pull request #24344: [SPARK-27440][SQL] 
Optimize uncorrelated predicate subquery
URL: https://github.com/apache/spark/pull/24344#discussion_r285940070
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
 ##########
 @@ -55,6 +55,112 @@ object ExecSubqueryExpression {
   }
 }
 
+/**
+ * Exists is used to test for the existence of any record in a subquery.
+ *
+ * This is the physical copy of Exists to be used inside SparkPlan.
+ */
+case class Exists(
+    plan: BaseSubqueryExec,
+    exprId: ExprId)
+  extends ExecSubqueryExpression {
+
+  override def dataType: DataType = BooleanType
+  override def children: Seq[Expression] = Nil
+  override def nullable: Boolean = false
+  override def toString: String = 
plan.simpleString(SQLConf.get.maxToStringFields)
+  override def withNewPlan(plan: BaseSubqueryExec): Exists = copy(plan = plan)
+
+  // Whether the subquery returns one or more records
+  @volatile private var result: Boolean = _
+  @volatile private var updated: Boolean = false
+
+  def updateResult(): Unit = {
+    val rows = plan.executeCollect()
+    result = rows.nonEmpty
+    updated = true
+  }
+
+  override def eval(input: InternalRow): Boolean = {
+    require(updated, s"$this has not finished")
+    result
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+    require(updated, s"$this has not finished")
+    Literal.create(result, BooleanType).doGenCode(ctx, ev)
+  }
+}
+
+/**
+ * Evaluates to `true` if `values` are returned in the subquery's result set.
+ * If `values` are not found in the subquery's result set, and there are nulls 
in
+ * `values` or the result set, it should return null.
+ * This is the physical copy of InSubquery to be used inside SparkPlan.
+ */
+case class InSubquery(
+    values: Seq[Literal],
+    plan: BaseSubqueryExec,
+    exprId: ExprId)
+  extends ExecSubqueryExpression {
+  override def dataType: DataType = BooleanType
+  override def children: Seq[Expression] = Nil
+  override def nullable: Boolean = true
+  override def toString: String = 
plan.simpleString(SQLConf.get.maxToStringFields)
+  override def withNewPlan(plan: BaseSubqueryExec): InSubquery = copy(plan = 
plan)
+
+  @volatile private var result: Boolean = _
+  @volatile private var isNull: Boolean = false
+  @volatile private var updated: Boolean = false
+
+  def updateResult(): Unit = {
+    val rows = plan.executeCollect()
+    // The semantic of '(a,b) in ((x1, y1), (x2, y2), ...)' is
+    // '(a = x1 and b = y1) or (a = x2 and b = y2) or ...'
+    val expression = rows.map(row => {
 
 Review comment:
   @francis0407 It should only use broadcast if the data size is within 
broadcast threshold to the best of my knowledge. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to