[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...

2017-01-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16404


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...

2017-01-03 Thread cloud-fan
Github user cloud-fan closed the pull request at:

https://github.com/apache/spark/pull/16404


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...

2017-01-03 Thread cloud-fan
GitHub user cloud-fan reopened a pull request:

https://github.com/apache/spark/pull/16404

[SPARK-18969][SQL] Support grouping by nondeterministic expressions

## What changes were proposed in this pull request?

Currently nondeterministic expressions are allowed in `Aggregate`(see the 
[comment](https://github.com/apache/spark/blob/v2.0.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L249-L251)),
 but the `PullOutNondeterministic` analyzer rule failed to handle `Aggregate`, 
this PR fixes it.

close https://github.com/apache/spark/pull/16379

## How was this patch tested?

a new test suite

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark groupby

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16404.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16404


commit f1451883df9077ecbf31f3a86d2427b60262f863
Author: Wenchen Fan 
Date:   2016-12-26T10:24:07Z

Support grouping by nondeterministic expressions




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...

2016-12-30 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16404#discussion_r94229396
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1918,28 +1918,37 @@ class Analyzer(
   case p: Project => p
   case f: Filter => f
 
+  case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) 
=>
+val nondeterToAttr = getNondeterToAttr(a.groupingExpressions)
+val newChild = Project(a.child.output ++ nondeterToAttr.values, 
a.child)
+a.transformExpressions { case e =>
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
+}.copy(child = newChild)
+
   // todo: It's hard to write a general rule to pull out 
nondeterministic expressions
   // from LogicalPlan, currently we only do it for UnaryNode which has 
same output
   // schema with its child.
   case p: UnaryNode if p.output == p.child.output && 
p.expressions.exists(!_.deterministic) =>
-val nondeterministicExprs = 
p.expressions.filterNot(_.deterministic).flatMap { expr =>
-  val leafNondeterministic = expr.collect {
-case n: Nondeterministic => n
-  }
-  leafNondeterministic.map { e =>
-val ne = e match {
-  case n: NamedExpression => n
-  case _ => Alias(e, "_nondeterministic")(isGenerated = true)
-}
-new TreeNodeRef(e) -> ne
-  }
-}.toMap
+val nondeterToAttr = getNondeterToAttr(p.expressions)
 val newPlan = p.transformExpressions { case e =>
-  nondeterministicExprs.get(new 
TreeNodeRef(e)).map(_.toAttribute).getOrElse(e)
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
 }
-val newChild = Project(p.child.output ++ 
nondeterministicExprs.values, p.child)
+val newChild = Project(p.child.output ++ nondeterToAttr.values, 
p.child)
 Project(p.output, newPlan.withNewChildren(newChild :: Nil))
 }
+
+private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, 
NamedExpression] = {
+  exprs.filterNot(_.deterministic).flatMap { expr =>
+val leafNondeterministic = expr.collect { case n: Nondeterministic 
=> n }
--- End diff --

this problem was already there, let's send a new PR to fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...

2016-12-30 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16404#discussion_r94229355
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1918,28 +1918,37 @@ class Analyzer(
   case p: Project => p
   case f: Filter => f
 
+  case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) 
=>
+val nondeterToAttr = getNondeterToAttr(a.groupingExpressions)
+val newChild = Project(a.child.output ++ nondeterToAttr.values, 
a.child)
+a.transformExpressions { case e =>
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
+}.copy(child = newChild)
+
   // todo: It's hard to write a general rule to pull out 
nondeterministic expressions
   // from LogicalPlan, currently we only do it for UnaryNode which has 
same output
   // schema with its child.
   case p: UnaryNode if p.output == p.child.output && 
p.expressions.exists(!_.deterministic) =>
-val nondeterministicExprs = 
p.expressions.filterNot(_.deterministic).flatMap { expr =>
-  val leafNondeterministic = expr.collect {
-case n: Nondeterministic => n
-  }
-  leafNondeterministic.map { e =>
-val ne = e match {
-  case n: NamedExpression => n
-  case _ => Alias(e, "_nondeterministic")(isGenerated = true)
-}
-new TreeNodeRef(e) -> ne
-  }
-}.toMap
+val nondeterToAttr = getNondeterToAttr(p.expressions)
 val newPlan = p.transformExpressions { case e =>
-  nondeterministicExprs.get(new 
TreeNodeRef(e)).map(_.toAttribute).getOrElse(e)
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
 }
-val newChild = Project(p.child.output ++ 
nondeterministicExprs.values, p.child)
+val newChild = Project(p.child.output ++ nondeterToAttr.values, 
p.child)
 Project(p.output, newPlan.withNewChildren(newChild :: Nil))
 }
+
+private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, 
NamedExpression] = {
+  exprs.filterNot(_.deterministic).flatMap { expr =>
+val leafNondeterministic = expr.collect { case n: Nondeterministic 
=> n }
--- End diff --

seems we need to add a new API, here we wanna get non-deterministic leaf 
nodes, and trait `Nondeterministic` is not suitable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...

2016-12-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16404#discussion_r93998144
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1918,28 +1918,37 @@ class Analyzer(
   case p: Project => p
   case f: Filter => f
 
+  case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) 
=>
+val nondeterToAttr = getNondeterToAttr(a.groupingExpressions)
+val newChild = Project(a.child.output ++ nondeterToAttr.values, 
a.child)
+a.transformExpressions { case e =>
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
+}.copy(child = newChild)
+
   // todo: It's hard to write a general rule to pull out 
nondeterministic expressions
   // from LogicalPlan, currently we only do it for UnaryNode which has 
same output
   // schema with its child.
   case p: UnaryNode if p.output == p.child.output && 
p.expressions.exists(!_.deterministic) =>
-val nondeterministicExprs = 
p.expressions.filterNot(_.deterministic).flatMap { expr =>
-  val leafNondeterministic = expr.collect {
-case n: Nondeterministic => n
-  }
-  leafNondeterministic.map { e =>
-val ne = e match {
-  case n: NamedExpression => n
-  case _ => Alias(e, "_nondeterministic")(isGenerated = true)
-}
-new TreeNodeRef(e) -> ne
-  }
-}.toMap
+val nondeterToAttr = getNondeterToAttr(p.expressions)
 val newPlan = p.transformExpressions { case e =>
-  nondeterministicExprs.get(new 
TreeNodeRef(e)).map(_.toAttribute).getOrElse(e)
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
 }
-val newChild = Project(p.child.output ++ 
nondeterministicExprs.values, p.child)
+val newChild = Project(p.child.output ++ nondeterToAttr.values, 
p.child)
 Project(p.output, newPlan.withNewChildren(newChild :: Nil))
 }
+
+private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, 
NamedExpression] = {
+  exprs.filterNot(_.deterministic).flatMap { expr =>
+val leafNondeterministic = expr.collect { case n: Nondeterministic 
=> n }
--- End diff --

`statefulUDF()` is a stateful/non-deterministic UDF which does not exend 
`Nondeterministic`, but its `deterministic` is equal to `false` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...

2016-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16404#discussion_r93997721
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1918,28 +1918,37 @@ class Analyzer(
   case p: Project => p
   case f: Filter => f
 
+  case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) 
=>
+val nondeterToAttr = getNondeterToAttr(a.groupingExpressions)
+val newChild = Project(a.child.output ++ nondeterToAttr.values, 
a.child)
+a.transformExpressions { case e =>
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
+}.copy(child = newChild)
+
   // todo: It's hard to write a general rule to pull out 
nondeterministic expressions
   // from LogicalPlan, currently we only do it for UnaryNode which has 
same output
   // schema with its child.
   case p: UnaryNode if p.output == p.child.output && 
p.expressions.exists(!_.deterministic) =>
-val nondeterministicExprs = 
p.expressions.filterNot(_.deterministic).flatMap { expr =>
-  val leafNondeterministic = expr.collect {
-case n: Nondeterministic => n
-  }
-  leafNondeterministic.map { e =>
-val ne = e match {
-  case n: NamedExpression => n
-  case _ => Alias(e, "_nondeterministic")(isGenerated = true)
-}
-new TreeNodeRef(e) -> ne
-  }
-}.toMap
+val nondeterToAttr = getNondeterToAttr(p.expressions)
 val newPlan = p.transformExpressions { case e =>
-  nondeterministicExprs.get(new 
TreeNodeRef(e)).map(_.toAttribute).getOrElse(e)
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
 }
-val newChild = Project(p.child.output ++ 
nondeterministicExprs.values, p.child)
+val newChild = Project(p.child.output ++ nondeterToAttr.values, 
p.child)
 Project(p.output, newPlan.withNewChildren(newChild :: Nil))
 }
+
+private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, 
NamedExpression] = {
+  exprs.filterNot(_.deterministic).flatMap { expr =>
+val leafNondeterministic = expr.collect { case n: Nondeterministic 
=> n }
--- End diff --


https://github.com/apache/spark/pull/16404/files#diff-a0f2e45a5da747e9ec483f3557aa1b8bR138


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...

2016-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16404#discussion_r93997700
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1918,28 +1918,37 @@ class Analyzer(
   case p: Project => p
   case f: Filter => f
 
+  case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) 
=>
+val nondeterToAttr = getNondeterToAttr(a.groupingExpressions)
+val newChild = Project(a.child.output ++ nondeterToAttr.values, 
a.child)
+a.transformExpressions { case e =>
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
+}.copy(child = newChild)
+
   // todo: It's hard to write a general rule to pull out 
nondeterministic expressions
   // from LogicalPlan, currently we only do it for UnaryNode which has 
same output
   // schema with its child.
   case p: UnaryNode if p.output == p.child.output && 
p.expressions.exists(!_.deterministic) =>
-val nondeterministicExprs = 
p.expressions.filterNot(_.deterministic).flatMap { expr =>
-  val leafNondeterministic = expr.collect {
-case n: Nondeterministic => n
-  }
-  leafNondeterministic.map { e =>
-val ne = e match {
-  case n: NamedExpression => n
-  case _ => Alias(e, "_nondeterministic")(isGenerated = true)
-}
-new TreeNodeRef(e) -> ne
-  }
-}.toMap
+val nondeterToAttr = getNondeterToAttr(p.expressions)
 val newPlan = p.transformExpressions { case e =>
-  nondeterministicExprs.get(new 
TreeNodeRef(e)).map(_.toAttribute).getOrElse(e)
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
 }
-val newChild = Project(p.child.output ++ 
nondeterministicExprs.values, p.child)
+val newChild = Project(p.child.output ++ nondeterToAttr.values, 
p.child)
 Project(p.output, newPlan.withNewChildren(newChild :: Nil))
 }
+
+private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, 
NamedExpression] = {
+  exprs.filterNot(_.deterministic).flatMap { expr =>
+val leafNondeterministic = expr.collect { case n: Nondeterministic 
=> n }
--- End diff --

is it same with the existing test? `select a, rand(0), sum(b) from data 
group by a, 2`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...

2016-12-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16404#discussion_r93942716
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1918,28 +1918,37 @@ class Analyzer(
   case p: Project => p
   case f: Filter => f
 
+  case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) 
=>
+val nondeterToAttr = getNondeterToAttr(a.groupingExpressions)
+val newChild = Project(a.child.output ++ nondeterToAttr.values, 
a.child)
+a.transformExpressions { case e =>
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
+}.copy(child = newChild)
+
   // todo: It's hard to write a general rule to pull out 
nondeterministic expressions
   // from LogicalPlan, currently we only do it for UnaryNode which has 
same output
   // schema with its child.
   case p: UnaryNode if p.output == p.child.output && 
p.expressions.exists(!_.deterministic) =>
--- End diff --

Thanks! I see


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...

2016-12-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16404#discussion_r93942649
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1918,28 +1918,37 @@ class Analyzer(
   case p: Project => p
   case f: Filter => f
 
+  case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) 
=>
+val nondeterToAttr = getNondeterToAttr(a.groupingExpressions)
+val newChild = Project(a.child.output ++ nondeterToAttr.values, 
a.child)
+a.transformExpressions { case e =>
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
+}.copy(child = newChild)
+
   // todo: It's hard to write a general rule to pull out 
nondeterministic expressions
   // from LogicalPlan, currently we only do it for UnaryNode which has 
same output
   // schema with its child.
   case p: UnaryNode if p.output == p.child.output && 
p.expressions.exists(!_.deterministic) =>
-val nondeterministicExprs = 
p.expressions.filterNot(_.deterministic).flatMap { expr =>
-  val leafNondeterministic = expr.collect {
-case n: Nondeterministic => n
-  }
-  leafNondeterministic.map { e =>
-val ne = e match {
-  case n: NamedExpression => n
-  case _ => Alias(e, "_nondeterministic")(isGenerated = true)
-}
-new TreeNodeRef(e) -> ne
-  }
-}.toMap
+val nondeterToAttr = getNondeterToAttr(p.expressions)
 val newPlan = p.transformExpressions { case e =>
-  nondeterministicExprs.get(new 
TreeNodeRef(e)).map(_.toAttribute).getOrElse(e)
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
 }
-val newChild = Project(p.child.output ++ 
nondeterministicExprs.values, p.child)
+val newChild = Project(p.child.output ++ nondeterToAttr.values, 
p.child)
 Project(p.output, newPlan.withNewChildren(newChild :: Nil))
 }
+
+private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, 
NamedExpression] = {
+  exprs.filterNot(_.deterministic).flatMap { expr =>
+val leafNondeterministic = expr.collect { case n: Nondeterministic 
=> n }
--- End diff --

This rule is running once. Thus, it should be safe; otherwise, it might 
generate many useless `Project` when some expressions are not extending 
`Nondeterministic` but its deterministics is false. 

Maybe added a nagative test case for [this 
check](https://github.com/apache/spark/blob/v2.0.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L248-L254)

```scala
sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS 
'${classOf[StatefulUDF].getName}'")
val df = Seq((1, 1)).toDF("a", "b")
df.createOrReplaceTempView("data")
sql("select a, statefulUDF(), sum(b) from data group by a, 2").show()
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...

2016-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16404#discussion_r93921258
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1918,28 +1918,37 @@ class Analyzer(
   case p: Project => p
   case f: Filter => f
 
+  case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) 
=>
+val nondeterToAttr = getNondeterToAttr(a.groupingExpressions)
+val newChild = Project(a.child.output ++ nondeterToAttr.values, 
a.child)
+a.transformExpressions { case e =>
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
+}.copy(child = newChild)
+
   // todo: It's hard to write a general rule to pull out 
nondeterministic expressions
   // from LogicalPlan, currently we only do it for UnaryNode which has 
same output
   // schema with its child.
   case p: UnaryNode if p.output == p.child.output && 
p.expressions.exists(!_.deterministic) =>
--- End diff --

to narrow down the scope of the affected operators, but ideally we should 
use a white-list


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...

2016-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16404#discussion_r93921170
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1918,28 +1918,37 @@ class Analyzer(
   case p: Project => p
   case f: Filter => f
 
+  case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) 
=>
+val nondeterToAttr = getNondeterToAttr(a.groupingExpressions)
+val newChild = Project(a.child.output ++ nondeterToAttr.values, 
a.child)
+a.transformExpressions { case e =>
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
+}.copy(child = newChild)
+
   // todo: It's hard to write a general rule to pull out 
nondeterministic expressions
   // from LogicalPlan, currently we only do it for UnaryNode which has 
same output
   // schema with its child.
   case p: UnaryNode if p.output == p.child.output && 
p.expressions.exists(!_.deterministic) =>
-val nondeterministicExprs = 
p.expressions.filterNot(_.deterministic).flatMap { expr =>
-  val leafNondeterministic = expr.collect {
-case n: Nondeterministic => n
-  }
-  leafNondeterministic.map { e =>
-val ne = e match {
-  case n: NamedExpression => n
-  case _ => Alias(e, "_nondeterministic")(isGenerated = true)
-}
-new TreeNodeRef(e) -> ne
-  }
-}.toMap
+val nondeterToAttr = getNondeterToAttr(p.expressions)
 val newPlan = p.transformExpressions { case e =>
-  nondeterministicExprs.get(new 
TreeNodeRef(e)).map(_.toAttribute).getOrElse(e)
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
 }
-val newChild = Project(p.child.output ++ 
nondeterministicExprs.values, p.child)
+val newChild = Project(p.child.output ++ nondeterToAttr.values, 
p.child)
 Project(p.output, newPlan.withNewChildren(newChild :: Nil))
 }
+
+private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, 
NamedExpression] = {
+  exprs.filterNot(_.deterministic).flatMap { expr =>
+val leafNondeterministic = expr.collect { case n: Nondeterministic 
=> n }
--- End diff --

1. this is same with the previous behavior
2. according to the variable name: `leafNondeterministic`, it seems 
reasonable to collect `Nondeterministic` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...

2016-12-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16404#discussion_r93903360
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1918,28 +1918,37 @@ class Analyzer(
   case p: Project => p
   case f: Filter => f
 
+  case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) 
=>
+val nondeterToAttr = getNondeterToAttr(a.groupingExpressions)
+val newChild = Project(a.child.output ++ nondeterToAttr.values, 
a.child)
+a.transformExpressions { case e =>
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
+}.copy(child = newChild)
+
   // todo: It's hard to write a general rule to pull out 
nondeterministic expressions
   // from LogicalPlan, currently we only do it for UnaryNode which has 
same output
   // schema with its child.
   case p: UnaryNode if p.output == p.child.output && 
p.expressions.exists(!_.deterministic) =>
--- End diff --

Just a question. What is the reason why we need to have such a condition 
`p.output == p.child.output`? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...

2016-12-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16404#discussion_r93901178
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1918,28 +1918,37 @@ class Analyzer(
   case p: Project => p
   case f: Filter => f
 
+  case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) 
=>
+val nondeterToAttr = getNondeterToAttr(a.groupingExpressions)
+val newChild = Project(a.child.output ++ nondeterToAttr.values, 
a.child)
+a.transformExpressions { case e =>
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
+}.copy(child = newChild)
+
   // todo: It's hard to write a general rule to pull out 
nondeterministic expressions
   // from LogicalPlan, currently we only do it for UnaryNode which has 
same output
   // schema with its child.
   case p: UnaryNode if p.output == p.child.output && 
p.expressions.exists(!_.deterministic) =>
-val nondeterministicExprs = 
p.expressions.filterNot(_.deterministic).flatMap { expr =>
-  val leafNondeterministic = expr.collect {
-case n: Nondeterministic => n
-  }
-  leafNondeterministic.map { e =>
-val ne = e match {
-  case n: NamedExpression => n
-  case _ => Alias(e, "_nondeterministic")(isGenerated = true)
-}
-new TreeNodeRef(e) -> ne
-  }
-}.toMap
+val nondeterToAttr = getNondeterToAttr(p.expressions)
 val newPlan = p.transformExpressions { case e =>
-  nondeterministicExprs.get(new 
TreeNodeRef(e)).map(_.toAttribute).getOrElse(e)
+  nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
 }
-val newChild = Project(p.child.output ++ 
nondeterministicExprs.values, p.child)
+val newChild = Project(p.child.output ++ nondeterToAttr.values, 
p.child)
 Project(p.output, newPlan.withNewChildren(newChild :: Nil))
 }
+
+private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, 
NamedExpression] = {
+  exprs.filterNot(_.deterministic).flatMap { expr =>
+val leafNondeterministic = expr.collect { case n: Nondeterministic 
=> n }
--- End diff --

Not all the non-deterministic expressions extend `Nondeterministic`. This 
might not cover all the cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...

2016-12-26 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/16404

[SPARK-18969][SQL] Support grouping by nondeterministic expressions

## What changes were proposed in this pull request?

Currently nondeterministic expressions are allowed in `Aggregate`(see the 
[comment](https://github.com/apache/spark/blob/v2.0.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L249-L251)),
 but the `PullOutNondeterministic` analyzer rule failed to handle `Aggregate`, 
this PR fixes it.

## How was this patch tested?

a new test suite

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark groupby

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16404.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16404


commit 67f032b6ab7b23aecf432b13a6f455582754fc98
Author: Wenchen Fan 
Date:   2016-12-26T10:24:07Z

Support grouping by nondeterministic expressions




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org