This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new a2dd949d2bf [SPARK-42937][SQL] `PlanSubqueries` should set 
`InSubqueryExec#shouldBroadcast` to true
a2dd949d2bf is described below

commit a2dd949d2bf6056fc9d3a1725d35ebfd117865a8
Author: Bruce Robbins <[email protected]>
AuthorDate: Tue Mar 28 05:32:06 2023 -0700

    [SPARK-42937][SQL] `PlanSubqueries` should set 
`InSubqueryExec#shouldBroadcast` to true
    
    ### What changes were proposed in this pull request?
    
    Change `PlanSubqueries` to set `shouldBroadcast` to true when instantiating 
an `InSubqueryExec` instance.
    
    ### Why are the changes needed?
    
    The below left outer join gets an error:
    ```
    create or replace temp view v1 as
    select * from values
    (1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1),
    (2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2),
    (3, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)
    as v1(key, value1, value2, value3, value4, value5, value6, value7, value8, 
value9, value10);
    
    create or replace temp view v2 as
    select * from values
    (1, 2),
    (3, 8),
    (7, 9)
    as v2(a, b);
    
    create or replace temp view v3 as
    select * from values
    (3),
    (8)
    as v3(col1);
    
    set spark.sql.codegen.maxFields=10; -- let's make maxFields 10 instead of 
100
    set spark.sql.adaptive.enabled=false;
    
    select *
    from v1
    left outer join v2
    on key = a
    and key in (select col1 from v3);
    ```
    The join fails during predicate codegen:
    ```
    23/03/27 12:24:12 WARN Predicate: Expr codegen error and falling back to 
interpreter mode
    java.lang.IllegalArgumentException: requirement failed: input[0, int, 
false] IN subquery#34 has not finished
            at scala.Predef$.require(Predef.scala:281)
            at 
org.apache.spark.sql.execution.InSubqueryExec.prepareResult(subquery.scala:144)
            at 
org.apache.spark.sql.execution.InSubqueryExec.doGenCode(subquery.scala:156)
            at 
org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:201)
            at scala.Option.getOrElse(Option.scala:189)
            at 
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:196)
            at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$generateExpressions$2(CodeGenerator.scala:1278)
            at scala.collection.immutable.List.map(List.scala:293)
            at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1278)
            at 
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.create(GeneratePredicate.scala:41)
            at 
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$.generate(GeneratePredicate.scala:33)
            at 
org.apache.spark.sql.catalyst.expressions.Predicate$.createCodeGeneratedObject(predicates.scala:73)
            at 
org.apache.spark.sql.catalyst.expressions.Predicate$.createCodeGeneratedObject(predicates.scala:70)
            at 
org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:51)
            at 
org.apache.spark.sql.catalyst.expressions.Predicate$.create(predicates.scala:86)
            at 
org.apache.spark.sql.execution.joins.HashJoin.boundCondition(HashJoin.scala:146)
            at 
org.apache.spark.sql.execution.joins.HashJoin.boundCondition$(HashJoin.scala:140)
            at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.boundCondition$lzycompute(BroadcastHashJoinExec.scala:40)
            at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.boundCondition(BroadcastHashJoinExec.scala:40)
    ```
    It fails again after fallback to interpreter mode:
    ```
    23/03/27 12:24:12 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 7)
    java.lang.IllegalArgumentException: requirement failed: input[0, int, 
false] IN subquery#34 has not finished
            at scala.Predef$.require(Predef.scala:281)
            at 
org.apache.spark.sql.execution.InSubqueryExec.prepareResult(subquery.scala:144)
            at 
org.apache.spark.sql.execution.InSubqueryExec.eval(subquery.scala:151)
            at 
org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:52)
            at 
org.apache.spark.sql.execution.joins.HashJoin.$anonfun$boundCondition$2(HashJoin.scala:146)
            at 
org.apache.spark.sql.execution.joins.HashJoin.$anonfun$boundCondition$2$adapted(HashJoin.scala:146)
            at 
org.apache.spark.sql.execution.joins.HashJoin.$anonfun$outerJoin$1(HashJoin.scala:205)
    ```
    Both the predicate codegen and the evaluation fail for the same reason: 
`PlanSubqueries` creates `InSubqueryExec` with `shouldBroadcast=false`. The 
driver waits for the subquery to finish, but it's the executor that uses the 
results of the subquery (for predicate codegen or evaluation). Because 
`shouldBroadcast` is set to false, the result is stored in a transient field 
(`InSubqueryExec#result`), so the result of the subquery is not serialized when 
the `InSubqueryExec` instance is sen [...]
    
    The issue occurs, as far as I can tell, only when both whole stage codegen 
is disabled and adaptive execution is disabled. When wholestage codegen is 
enabled, the predicate codegen happens on the driver, so the subquery's result 
is available. When adaptive execution is enabled, `PlanAdaptiveSubqueries` 
always sets `shouldBroadcast=true`, so the subquery's result is available on 
the executor, if needed.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit test.
    
    Closes #40569 from bersprockets/join_subquery_issue.
    
    Authored-by: Bruce Robbins <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 5b20f3d94095f54017be3d31d11305e597334d8b)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../org/apache/spark/sql/execution/subquery.scala  |  3 ++-
 .../scala/org/apache/spark/sql/SubquerySuite.scala | 22 ++++++++++++++++++++++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index aaebe99eeee..656c50c8232 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -193,7 +193,8 @@ case class PlanSubqueries(sparkSession: SparkSession) 
extends Rule[SparkPlan] {
           )
         }
         val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, 
query)
-        InSubqueryExec(expr, SubqueryExec(s"subquery#${exprId.id}", 
executedPlan), exprId)
+        InSubqueryExec(expr, SubqueryExec(s"subquery#${exprId.id}", 
executedPlan),
+          exprId, shouldBroadcast = true)
     }
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 18405977b2c..3c7ce1edcd3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -2639,4 +2639,26 @@ class SubquerySuite extends QueryTest
       }
     }
   }
+
+  test("SPARK-42937: Outer join with subquery in condition") {
+    withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
+      SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+      withTempView("t2") {
+        // this is the same as the view t created in beforeAll, but that gets 
dropped by
+        // one of the tests above
+        r.filter($"c".isNotNull && 
$"d".isNotNull).createOrReplaceTempView("t2")
+        val expected = Row(1, 2.0d, null, null) :: Row(1, 2.0d, null, null) ::
+          Row(3, 3.0d, 3, 2.0d) :: Row(null, 5.0d, null, null) :: Nil
+        checkAnswer(sql(
+          """
+            |select *
+            |from l
+            |left outer join r
+            |on a = c
+            |and a in (select c from t2 where d in (1.0, 2.0))
+            |where b > 1.0""".stripMargin),
+          expected)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to