[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...

2017-08-14 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18920#discussion_r133002116
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
@@ -449,6 +451,49 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
 ).foreach(assertValuesDoNotChangeAfterCoalesceOrUnion(_))
   }
 
+  private def assertNoExceptions(c: Column): Unit = {
--- End diff --

Could you submit a follow-up PR to move this test case to 
`DataFrameAggregateSuite`? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...

2017-08-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18920


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...

2017-08-13 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18920#discussion_r132879849
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
@@ -449,6 +451,49 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
 ).foreach(assertValuesDoNotChangeAfterCoalesceOrUnion(_))
   }
 
+  private def assertNoExceptions(c: Column): Unit = {
+for ((wholeStage, useObjectHashAgg) <-
+ Seq((true, true), (true, false), (false, true), (false, false))) {
+  withSQLConf(
+(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString),
+(SQLConf.USE_OBJECT_HASH_AGG.key, useObjectHashAgg.toString)) {
+
+val df = Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4)).toDF("x", "y")
+
+// HashAggregate test case
+val hashAggDF = df.groupBy("x").agg(c, sum("y"))
+val hashAggPlan = hashAggDF.queryExecution.executedPlan
+if (wholeStage) {
+  assert(hashAggPlan.find(p =>
+p.isInstanceOf[WholeStageCodegenExec] &&
+  p.asInstanceOf[WholeStageCodegenExec].child
+.isInstanceOf[HashAggregateExec]).isDefined)
+} else {
+  assert(hashAggPlan.isInstanceOf[HashAggregateExec])
+}
+hashAggDF.collect()
+
+// ObjectHashAggregate and SortAggregate test cases
+val objHashOrSort_AggDF = df.groupBy("x").agg(c, collect_list("y"))
+val objHashOrSort_Plan = 
objHashOrSort_AggDF.queryExecution.executedPlan
--- End diff --

`objHashOrSort_Plan ` -> `objHashAggOrSortAggPlan`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...

2017-08-13 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18920#discussion_r132879756
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
@@ -449,6 +451,49 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
 ).foreach(assertValuesDoNotChangeAfterCoalesceOrUnion(_))
   }
 
+  private def assertNoExceptions(c: Column): Unit = {
+for ((wholeStage, useObjectHashAgg) <-
+ Seq((true, true), (true, false), (false, true), (false, false))) {
+  withSQLConf(
+(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString),
+(SQLConf.USE_OBJECT_HASH_AGG.key, useObjectHashAgg.toString)) {
+
+val df = Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4)).toDF("x", "y")
+
+// HashAggregate test case
+val hashAggDF = df.groupBy("x").agg(c, sum("y"))
+val hashAggPlan = hashAggDF.queryExecution.executedPlan
+if (wholeStage) {
+  assert(hashAggPlan.find(p =>
+p.isInstanceOf[WholeStageCodegenExec] &&
+  p.asInstanceOf[WholeStageCodegenExec].child
+.isInstanceOf[HashAggregateExec]).isDefined)
+} else {
+  assert(hashAggPlan.isInstanceOf[HashAggregateExec])
+}
+hashAggDF.collect()
+
+// ObjectHashAggregate and SortAggregate test cases
+val objHashOrSort_AggDF = df.groupBy("x").agg(c, collect_list("y"))
--- End diff --

`objHashOrSort_AggDF ` -> `objHashAggOrSortAggDf`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...

2017-08-13 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18920#discussion_r132879420
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
 ---
@@ -251,12 +253,14 @@ abstract class AggregationIterator(
   
typedImperativeAggregates(i).serializeAggregateBufferInPlace(currentBuffer)
   i += 1
 }
+resultProjection.initialize(partIndex)
--- End diff --

Move it to line 240


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...

2017-08-13 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18920#discussion_r132878232
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
@@ -449,6 +451,49 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
 ).foreach(assertValuesDoNotChangeAfterCoalesceOrUnion(_))
   }
 
+  private def assertNoExceptions(c: Column): Unit = {
+for ((wholeStage, useObjectHashAgg) <-
+ Seq((true, true), (true, false), (false, true), (false, false))) {
+  withSQLConf(
+(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString),
+(SQLConf.USE_OBJECT_HASH_AGG.key, useObjectHashAgg.toString)) {
+
+val df = Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4)).toDF("x", "y")
+
+// HashAggregate test case
+val hashAggDF = df.groupBy("x").agg(c, sum("y"))
+val hashAggPlan = hashAggDF.queryExecution.executedPlan
+if (wholeStage) {
+  assert(hashAggPlan.find(p =>
+p.isInstanceOf[WholeStageCodegenExec] &&
+  p.asInstanceOf[WholeStageCodegenExec].child
+.isInstanceOf[HashAggregateExec]).isDefined)
+} else {
+  assert(hashAggPlan.isInstanceOf[HashAggregateExec])
+}
+hashAggDF.collect()
+
+// ObjectHashAggregate and SortAggregate test cases
+val objHashOrSort_AggDF = df.groupBy("x").agg(c, collect_list("y"))
+val objHashOrSort_Plan = 
objHashOrSort_AggDF.queryExecution.executedPlan
+if (useObjectHashAgg) {
+  assert(objHashOrSort_Plan.isInstanceOf[ObjectHashAggregateExec])
+} else {
+  assert(objHashOrSort_Plan.isInstanceOf[SortAggregateExec])
+}
+objHashOrSort_AggDF.collect()
+  }
+}
+  }
+
+  test("SPARK-19471: AggregationIterator does not initialize the generated 
result projection" +
+" before using it") {
+Seq(
+  monotonically_increasing_id(), spark_partition_id(),
+  rand(Random.nextLong()), randn(Random.nextLong())
+).foreach(assertNoExceptions(_))
--- End diff --

-> `).foreach(assertNoExceptions)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...

2017-08-13 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18920#discussion_r132878196
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
@@ -449,6 +451,49 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
 ).foreach(assertValuesDoNotChangeAfterCoalesceOrUnion(_))
   }
 
+  private def assertNoExceptions(c: Column): Unit = {
+for ((wholeStage, useObjectHashAgg) <-
+ Seq((true, true), (true, false), (false, true), (false, false))) {
+  withSQLConf(
+(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString),
+(SQLConf.USE_OBJECT_HASH_AGG.key, useObjectHashAgg.toString)) {
+
+val df = Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4)).toDF("x", "y")
+
+// HashAggregate test case
+val hashAggDF = df.groupBy("x").agg(c, sum("y"))
+val hashAggPlan = hashAggDF.queryExecution.executedPlan
+if (wholeStage) {
+  assert(hashAggPlan.find(p =>
+p.isInstanceOf[WholeStageCodegenExec] &&
+  p.asInstanceOf[WholeStageCodegenExec].child
+.isInstanceOf[HashAggregateExec]).isDefined)
--- End diff --

```Scala
  assert(hashAggPlan.find {
case WholeStageCodegenExec(_: HashAggregateExec) => true
case _ => false
  }.isDefined)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...

2017-08-13 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18920#discussion_r132879442
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
 ---
@@ -251,12 +253,14 @@ abstract class AggregationIterator(
   
typedImperativeAggregates(i).serializeAggregateBufferInPlace(currentBuffer)
   i += 1
 }
+resultProjection.initialize(partIndex)
 resultProjection(joinedRow(currentGroupingKey, currentBuffer))
   }
 } else {
   // Grouping-only: we only output values based on grouping 
expressions.
   val resultProjection = UnsafeProjection.create(resultExpressions, 
groupingAttributes)
   (currentGroupingKey: UnsafeRow, currentBuffer: InternalRow) => {
+resultProjection.initialize(partIndex)
--- End diff --

Move it to line 261


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...

2017-08-13 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18920#discussion_r132879406
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
 ---
@@ -229,6 +230,7 @@ abstract class AggregationIterator(
 allImperativeAggregateFunctions(i).eval(currentBuffer))
   i += 1
 }
+resultProjection.initialize(partIndex)
--- End diff --

Move it to line 221


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...

2017-08-13 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18920#discussion_r132847406
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
@@ -449,6 +449,28 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
 ).foreach(assertValuesDoNotChangeAfterCoalesceOrUnion(_))
   }
 
+  private def assertNoExceptions(c: Column): Unit = {
+for ((wholeStage, useObjectHashAgg) <- Seq((true, false), (false, 
false), (false, true))) {
+  withSQLConf(
+(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString),
+(SQLConf.USE_OBJECT_HASH_AGG.key, useObjectHashAgg.toString)) {
+val df = Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4)).toDF("x", "y")
+// HashAggregate
--- End diff --

We need to check/compare the plans to ensure they are HashAggregate, 
ObjectHashAggregate and SortAggregate. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18920: [SPARK-19471][SQL]AggregationIterator does not in...

2017-08-11 Thread DonnyZone
GitHub user DonnyZone opened a pull request:

https://github.com/apache/spark/pull/18920

[SPARK-19471][SQL]AggregationIterator does not initialize the generated 
result projection before using it

## What changes were proposed in this pull request?

Recently, we have also encountered such NPE issues in our production 
environment as described in:
https://issues.apache.org/jira/browse/SPARK-19471

This issue can be reproduced by the following examples:
` val df = spark.createDataFrame(Seq(("1", 1), ("1", 2), ("2", 3), ("2", 
4))).toDF("x", "y")

//HashAggregate, SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key=false
df.groupBy("x").agg(rand(),sum("y")).show()

//ObjectHashAggregate, SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key=false
df.groupBy("x").agg(rand(),collect_list("y")).show()

//SortAggregate, SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key=false 
&&SQLConf.USE_OBJECT_HASH_AGG.key=false
df.groupBy("x").agg(rand(),collect_list("y")).show()`
`

This PR is based on PR-16820(https://github.com/apache/spark/pull/16820) 
with test cases for all aggregation paths. We want to push it forward. 

> When AggregationIterator generates result projection, it does not call 
the initialize method of the Projection class. This will cause a runtime 
NullPointerException when the projection involves nondeterministic expressions.

## How was this patch tested?

unit test
verified in production environment


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/DonnyZone/spark Branch-spark-19471

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18920.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18920


commit b932d2f3a6741a8ef052cbd8087f4b0836c617d6
Author: donnyzone 
Date:   2017-08-11T13:00:00Z

spark-19471




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org