dilipbiswal commented on a change in pull request #24344: [SPARK-27440][SQL] 
Optimize uncorrelated predicate subquery
URL: https://github.com/apache/spark/pull/24344#discussion_r285929566
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
 ##########
 @@ -55,6 +55,112 @@ object ExecSubqueryExpression {
   }
 }
 
+/**
+ * Exists is used to test for the existence of any record in a subquery.
+ *
+ * This is the physical copy of Exists to be used inside SparkPlan.
+ */
+case class Exists(
+    plan: BaseSubqueryExec,
+    exprId: ExprId)
+  extends ExecSubqueryExpression {
+
+  override def dataType: DataType = BooleanType
+  override def children: Seq[Expression] = Nil
+  override def nullable: Boolean = false
+  override def toString: String = 
plan.simpleString(SQLConf.get.maxToStringFields)
+  override def withNewPlan(plan: BaseSubqueryExec): Exists = copy(plan = plan)
+
+  // Whether the subquery returns one or more records
+  @volatile private var result: Boolean = _
+  @volatile private var updated: Boolean = false
+
+  def updateResult(): Unit = {
+    val rows = plan.executeCollect()
+    result = rows.nonEmpty
+    updated = true
+  }
+
+  override def eval(input: InternalRow): Boolean = {
+    require(updated, s"$this has not finished")
+    result
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+    require(updated, s"$this has not finished")
+    Literal.create(result, BooleanType).doGenCode(ctx, ev)
+  }
+}
+
+/**
+ * Evaluates to `true` if `values` are returned in the subquery's result set.
+ * If `values` are not found in the subquery's result set, and there are nulls 
in
+ * `values` or the result set, it should return null.
+ * This is the physical copy of InSubquery to be used inside SparkPlan.
+ */
+case class InSubquery(
+    values: Seq[Literal],
+    plan: BaseSubqueryExec,
+    exprId: ExprId)
+  extends ExecSubqueryExpression {
+  override def dataType: DataType = BooleanType
+  override def children: Seq[Expression] = Nil
+  override def nullable: Boolean = true
+  override def toString: String = 
plan.simpleString(SQLConf.get.maxToStringFields)
+  override def withNewPlan(plan: BaseSubqueryExec): InSubquery = copy(plan = 
plan)
+
+  @volatile private var result: Boolean = _
+  @volatile private var isNull: Boolean = false
+  @volatile private var updated: Boolean = false
+
+  def updateResult(): Unit = {
+    val rows = plan.executeCollect()
+    // The semantic of '(a,b) in ((x1, y1), (x2, y2), ...)' is
+    // '(a = x1 and b = y1) or (a = x2 and b = y2) or ...'
+    val expression = rows.map(row => {
 
 Review comment:
   @francis0407 I am really sorry... as i raised this earlier. I am just very 
afraid of this exploding on a large result set. Are we able to fix this in a 
follow-up in 3.0 ? If not, we should perhaps keep this under a config ? What do 
you think @cloud-fan ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to