This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2011ab5 [SPARK-32567][SQL][FOLLOWUP] Fix CompileException when code generate for FULL OUTER shuffled hash join 2011ab5 is described below commit 2011ab5cfb9a878714763296149c6b6f3b46c584 Author: RoryQi <1242949...@qq.com> AuthorDate: Sun Nov 14 20:54:54 2021 -0800 [SPARK-32567][SQL][FOLLOWUP] Fix CompileException when code generate for FULL OUTER shuffled hash join ### What changes were proposed in this pull request? Add `throws IOException` to `consumeFullOuterJoinRow` in ShuffledHashJoinExec ### Why are the changes needed? pr #34444 add code-gen for full outer shuffled hash join. If we don't have this patch, when the dataframes are `fullter outer` shuffled hash joined, and then aggregate the results. the dataframes will throw a `CompileException`. For example: ```scala val df1 = spark.range(5).select($"id".as("k1")) val df2 = spark.range(10).select($"id".as("k2")) df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2", "full_outer").count() ``` The dataframe will throw an Exception which is as follows: ``` 23:28:19.079 ERROR org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 175, Column 1: Thrown exception of type "java.io.IOException" is neither caught by a "try...catch" block nor declared in the "throws" clause of the declaring function org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 175, Column 1: Thrown exception of type "java.io.IOException" is neither caught by a "try...catch" block nor declared in the "throws" clause of the declaring function at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12021) at org.codehaus.janino.UnitCompiler.checkThrownException(UnitCompiler.java:9801) at org.codehaus.janino.UnitCompiler.checkThrownExceptions(UnitCompiler.java:9720) at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9163) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5055) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add a new UT Closes #34589 from jerqi/SPARK-32567. Authored-by: RoryQi <1242949...@qq.com> Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> --- .../org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala | 2 +- .../scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index 7136229..9e81a6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -376,7 +376,7 @@ case class ShuffledHashJoinExec( val consumeFullOuterJoinRow = ctx.freshName("consumeFullOuterJoinRow") ctx.addNewFunction(consumeFullOuterJoinRow, s""" - |private void $consumeFullOuterJoinRow() { + |private void $consumeFullOuterJoinRow() throws java.io.IOException { | ${metricTerm(ctx, "numOutputRows")}.add(1); | ${consume(ctx, resultVars)} |} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 7da813c..f483971 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -183,6 +183,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession }.size === 1) checkAnswer(joinUniqueDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(3, 3), Row(4, 4), Row(null, 5), Row(null, 6), Row(null, 7), Row(null, 8), Row(null, 9))) + assert(joinUniqueDF.count() === 10) // test one join with non-unique key from build side val joinNonUniqueDF = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2" % 3, "full_outer") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org