[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18920#discussion_r133002116 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -449,6 +451,49 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { ).foreach(assertValuesDoNotChangeAfterCoalesceOrUnion(_)) } + private def assertNoExceptions(c: Column): Unit = { --- End diff -- Could you submit a follow-up PR to move this test case to `DataFrameAggregateSuite`? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18920 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18920#discussion_r132879849 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -449,6 +451,49 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { ).foreach(assertValuesDoNotChangeAfterCoalesceOrUnion(_)) } + private def assertNoExceptions(c: Column): Unit = { +for ((wholeStage, useObjectHashAgg) <- + Seq((true, true), (true, false), (false, true), (false, false))) { + withSQLConf( +(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString), +(SQLConf.USE_OBJECT_HASH_AGG.key, useObjectHashAgg.toString)) { + +val df = Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4)).toDF("x", "y") + +// HashAggregate test case +val hashAggDF = df.groupBy("x").agg(c, sum("y")) +val hashAggPlan = hashAggDF.queryExecution.executedPlan +if (wholeStage) { + assert(hashAggPlan.find(p => +p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child +.isInstanceOf[HashAggregateExec]).isDefined) +} else { + assert(hashAggPlan.isInstanceOf[HashAggregateExec]) +} +hashAggDF.collect() + +// ObjectHashAggregate and SortAggregate test cases +val objHashOrSort_AggDF = df.groupBy("x").agg(c, collect_list("y")) +val objHashOrSort_Plan = objHashOrSort_AggDF.queryExecution.executedPlan --- End diff -- `objHashOrSort_Plan ` -> `objHashAggOrSortAggPlan` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18920#discussion_r132879756 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -449,6 +451,49 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { ).foreach(assertValuesDoNotChangeAfterCoalesceOrUnion(_)) } + private def assertNoExceptions(c: Column): Unit = { +for ((wholeStage, useObjectHashAgg) <- + Seq((true, true), (true, false), (false, true), (false, false))) { + withSQLConf( +(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString), +(SQLConf.USE_OBJECT_HASH_AGG.key, useObjectHashAgg.toString)) { + +val df = Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4)).toDF("x", "y") + +// HashAggregate test case +val hashAggDF = df.groupBy("x").agg(c, sum("y")) +val hashAggPlan = hashAggDF.queryExecution.executedPlan +if (wholeStage) { + assert(hashAggPlan.find(p => +p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child +.isInstanceOf[HashAggregateExec]).isDefined) +} else { + assert(hashAggPlan.isInstanceOf[HashAggregateExec]) +} +hashAggDF.collect() + +// ObjectHashAggregate and SortAggregate test cases +val objHashOrSort_AggDF = df.groupBy("x").agg(c, collect_list("y")) --- End diff -- `objHashOrSort_AggDF ` -> `objHashAggOrSortAggDf` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18920#discussion_r132879420 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala --- @@ -251,12 +253,14 @@ abstract class AggregationIterator( typedImperativeAggregates(i).serializeAggregateBufferInPlace(currentBuffer) i += 1 } +resultProjection.initialize(partIndex) --- End diff -- Move it to line 240 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18920#discussion_r132878232 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -449,6 +451,49 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { ).foreach(assertValuesDoNotChangeAfterCoalesceOrUnion(_)) } + private def assertNoExceptions(c: Column): Unit = { +for ((wholeStage, useObjectHashAgg) <- + Seq((true, true), (true, false), (false, true), (false, false))) { + withSQLConf( +(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString), +(SQLConf.USE_OBJECT_HASH_AGG.key, useObjectHashAgg.toString)) { + +val df = Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4)).toDF("x", "y") + +// HashAggregate test case +val hashAggDF = df.groupBy("x").agg(c, sum("y")) +val hashAggPlan = hashAggDF.queryExecution.executedPlan +if (wholeStage) { + assert(hashAggPlan.find(p => +p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child +.isInstanceOf[HashAggregateExec]).isDefined) +} else { + assert(hashAggPlan.isInstanceOf[HashAggregateExec]) +} +hashAggDF.collect() + +// ObjectHashAggregate and SortAggregate test cases +val objHashOrSort_AggDF = df.groupBy("x").agg(c, collect_list("y")) +val objHashOrSort_Plan = objHashOrSort_AggDF.queryExecution.executedPlan +if (useObjectHashAgg) { + assert(objHashOrSort_Plan.isInstanceOf[ObjectHashAggregateExec]) +} else { + assert(objHashOrSort_Plan.isInstanceOf[SortAggregateExec]) +} +objHashOrSort_AggDF.collect() + } +} + } + + test("SPARK-19471: AggregationIterator does not initialize the generated result projection" + +" before using it") { +Seq( + monotonically_increasing_id(), spark_partition_id(), + rand(Random.nextLong()), randn(Random.nextLong()) +).foreach(assertNoExceptions(_)) --- End diff -- -> `).foreach(assertNoExceptions)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18920#discussion_r132878196 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -449,6 +451,49 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { ).foreach(assertValuesDoNotChangeAfterCoalesceOrUnion(_)) } + private def assertNoExceptions(c: Column): Unit = { +for ((wholeStage, useObjectHashAgg) <- + Seq((true, true), (true, false), (false, true), (false, false))) { + withSQLConf( +(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString), +(SQLConf.USE_OBJECT_HASH_AGG.key, useObjectHashAgg.toString)) { + +val df = Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4)).toDF("x", "y") + +// HashAggregate test case +val hashAggDF = df.groupBy("x").agg(c, sum("y")) +val hashAggPlan = hashAggDF.queryExecution.executedPlan +if (wholeStage) { + assert(hashAggPlan.find(p => +p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child +.isInstanceOf[HashAggregateExec]).isDefined) --- End diff -- ```Scala assert(hashAggPlan.find { case WholeStageCodegenExec(_: HashAggregateExec) => true case _ => false }.isDefined) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18920#discussion_r132879442 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala --- @@ -251,12 +253,14 @@ abstract class AggregationIterator( typedImperativeAggregates(i).serializeAggregateBufferInPlace(currentBuffer) i += 1 } +resultProjection.initialize(partIndex) resultProjection(joinedRow(currentGroupingKey, currentBuffer)) } } else { // Grouping-only: we only output values based on grouping expressions. val resultProjection = UnsafeProjection.create(resultExpressions, groupingAttributes) (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => { +resultProjection.initialize(partIndex) --- End diff -- Move it to line 261 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18920#discussion_r132879406 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala --- @@ -229,6 +230,7 @@ abstract class AggregationIterator( allImperativeAggregateFunctions(i).eval(currentBuffer)) i += 1 } +resultProjection.initialize(partIndex) --- End diff -- Move it to line 221 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18920#discussion_r132847406 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -449,6 +449,28 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { ).foreach(assertValuesDoNotChangeAfterCoalesceOrUnion(_)) } + private def assertNoExceptions(c: Column): Unit = { +for ((wholeStage, useObjectHashAgg) <- Seq((true, false), (false, false), (false, true))) { + withSQLConf( +(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString), +(SQLConf.USE_OBJECT_HASH_AGG.key, useObjectHashAgg.toString)) { +val df = Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4)).toDF("x", "y") +// HashAggregate --- End diff -- We need to check/compare the plans to ensure they are HashAggregate, ObjectHashAggregate and SortAggregate. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...
GitHub user DonnyZone opened a pull request: https://github.com/apache/spark/pull/18920 [SPARK-19471][SQL]AggregationIterator does not initialize the generated result projection before using it ## What changes were proposed in this pull request? Recently, we have also encountered such NPE issues in our production environment as described in: https://issues.apache.org/jira/browse/SPARK-19471 This issue can be reproduced by the following examples: ` val df = spark.createDataFrame(Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4))).toDF("x", "y") //HashAggregate, SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key=false df.groupBy("x").agg(rand(),sum("y")).show() //ObjectHashAggregate, SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key=false df.groupBy("x").agg(rand(),collect_list("y")).show() //SortAggregate, SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key=false &&SQLConf.USE_OBJECT_HASH_AGG.key=false df.groupBy("x").agg(rand(),collect_list("y")).show()` ` This PR is based on PR-16820(https://github.com/apache/spark/pull/16820) with test cases for all aggregation paths. We want to push it forward. > When AggregationIterator generates result projection, it does not call the initialize method of the Projection class. This will cause a runtime NullPointerException when the projection involves nondeterministic expressions. ## How was this patch tested? unit test verified in production environment You can merge this pull request into a Git repository by running: $ git pull https://github.com/DonnyZone/spark Branch-spark-19471 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18920.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18920 commit b932d2f3a6741a8ef052cbd8087f4b0836c617d6 Author: donnyzone Date: 2017-08-11T13:00:00Z spark-19471 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org