This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 9c0b803ba12 [SPARK-45171][SQL] Initialize non-deterministic expressions in `GenerateExec` 9c0b803ba12 is described below commit 9c0b803ba124a6e70762aec1e5559b0d66529f4d Author: Bruce Robbins <bersprock...@gmail.com> AuthorDate: Fri Sep 15 13:22:40 2023 +0900 [SPARK-45171][SQL] Initialize non-deterministic expressions in `GenerateExec` ### What changes were proposed in this pull request? Before evaluating the generator function in `GenerateExec`, initialize non-deterministic expressions. ### Why are the changes needed? The following query fails: ``` select * from explode( transform(sequence(0, cast(rand()*1000 as int) + 1), x -> x * 22) ); 23/09/14 09:27:25 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3) java.lang.IllegalArgumentException: requirement failed: Nondeterministic expression org.apache.spark.sql.catalyst.expressions.Rand should be initialized before eval. at scala.Predef$.require(Predef.scala:281) at org.apache.spark.sql.catalyst.expressions.Nondeterministic.eval(Expression.scala:497) at org.apache.spark.sql.catalyst.expressions.Nondeterministic.eval$(Expression.scala:495) at org.apache.spark.sql.catalyst.expressions.RDG.eval(randomExpressions.scala:35) at org.apache.spark.sql.catalyst.expressions.BinaryArithmetic.eval(arithmetic.scala:384) at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:543) at org.apache.spark.sql.catalyst.expressions.BinaryArithmetic.eval(arithmetic.scala:384) at org.apache.spark.sql.catalyst.expressions.Sequence.eval(collectionOperations.scala:3062) at org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval(higherOrderFunctions.scala:275) at org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval$(higherOrderFunctions.scala:274) at org.apache.spark.sql.catalyst.expressions.ArrayTransform.eval(higherOrderFunctions.scala:308) at org.apache.spark.sql.catalyst.expressions.ExplodeBase.eval(generators.scala:375) at org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$8(GenerateExec.scala:108) ... ``` However, this query succeeds: ``` select * from explode( sequence(0, cast(rand()*1000 as int) + 1) ); 0 1 2 3 ... 801 802 803 ``` The difference is that `transform` turns off whole-stage codegen, which exposes a bug in `GenerateExec` in which the non-deterministic expression passed to the generator function is not initialized before being used. This PR fixes the bug in `GenerateExec`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42933 from bersprockets/nondeterm_issue. Lead-authored-by: Bruce Robbins <bersprock...@gmail.com> Co-authored-by: Hyukjin Kwon <gurwls...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit e097f916a2769dfe82bfd216fedcd6962e8280c8) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../main/scala/org/apache/spark/sql/execution/GenerateExec.scala | 4 ++++ .../test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala | 7 +++++++ 2 files changed, 11 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index f6dbf5fda18..b99361437e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -78,6 +78,10 @@ case class GenerateExec( // boundGenerator.terminate() should be triggered after all of the rows in the partition val numOutputRows = longMetric("numOutputRows") child.execute().mapPartitionsWithIndexInternal { (index, iter) => + boundGenerator.foreach { + case n: Nondeterministic => n.initialize(index) + case _ => + } val generatorNullRow = new GenericInternalRow(generator.elementSchema.length) val rows = if (requiredChildOutput.nonEmpty) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala index abec582d43a..0746a4b92af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala @@ -536,6 +536,13 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession { checkAnswer(df, Row(1, 1) :: Row(1, 2) :: Row(2, 2) :: Row(2, 3) :: Row(3, null) :: Nil) } + + test("SPARK-45171: Handle evaluated nondeterministic expression") { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + val df = sql("select explode(array(rand(0)))") + checkAnswer(df, Row(0.7604953758285915d)) + } + } } case class EmptyGenerator() extends Generator with LeafLike[Expression] { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org