This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2011ab5  [SPARK-32567][SQL][FOLLOWUP] Fix CompileException when code 
generate for FULL OUTER shuffled hash join
2011ab5 is described below

commit 2011ab5cfb9a878714763296149c6b6f3b46c584
Author: RoryQi <1242949...@qq.com>
AuthorDate: Sun Nov 14 20:54:54 2021 -0800

    [SPARK-32567][SQL][FOLLOWUP] Fix CompileException when code generate for 
FULL OUTER shuffled hash join
    
    ### What changes were proposed in this pull request?
    Add `throws IOException` to `consumeFullOuterJoinRow` in 
ShuffledHashJoinExec
    
    ### Why are the changes needed?
    pr #34444 add code-gen for full outer shuffled hash join. If we don't have 
this patch,   when the dataframes are `fullter outer` shuffled hash joined, and 
then aggregate the results.  the dataframes will throw a `CompileException`.
    For example:
    ```scala
        val df1 = spark.range(5).select($"id".as("k1"))
        val df2 = spark.range(10).select($"id".as("k2"))
        df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2", 
"full_outer").count()
    ```
    The dataframe will throw an Exception which is as follows:
    ```
    23:28:19.079 ERROR 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: failed to 
compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', 
Line 175, Column 1: Thrown exception of type "java.io.IOException" is neither 
caught by a "try...catch" block nor declared in the "throws" clause of the 
declaring function
    org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 
175, Column 1: Thrown exception of type "java.io.IOException" is neither caught 
by a "try...catch" block nor declared in the "throws" clause of the declaring 
function
        at 
org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12021)
        at 
org.codehaus.janino.UnitCompiler.checkThrownException(UnitCompiler.java:9801)
        at 
org.codehaus.janino.UnitCompiler.checkThrownExceptions(UnitCompiler.java:9720)
        at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9163)
        at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5055)
    ```
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add a new UT
    
    Closes #34589 from jerqi/SPARK-32567.
    
    Authored-by: RoryQi <1242949...@qq.com>
    Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com>
---
 .../org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala     | 2 +-
 .../scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala   | 1 +
 2 files changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
index 7136229..9e81a6f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
@@ -376,7 +376,7 @@ case class ShuffledHashJoinExec(
     val consumeFullOuterJoinRow = ctx.freshName("consumeFullOuterJoinRow")
     ctx.addNewFunction(consumeFullOuterJoinRow,
       s"""
-         |private void $consumeFullOuterJoinRow() {
+         |private void $consumeFullOuterJoinRow() throws java.io.IOException {
          |  ${metricTerm(ctx, "numOutputRows")}.add(1);
          |  ${consume(ctx, resultVars)}
          |}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 7da813c..f483971 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -183,6 +183,7 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
     }.size === 1)
     checkAnswer(joinUniqueDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(3, 3), 
Row(4, 4),
       Row(null, 5), Row(null, 6), Row(null, 7), Row(null, 8), Row(null, 9)))
+    assert(joinUniqueDF.count() === 10)
 
     // test one join with non-unique key from build side
     val joinNonUniqueDF = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2" % 
3, "full_outer")

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to