[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/16404 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...
Github user cloud-fan closed the pull request at: https://github.com/apache/spark/pull/16404 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...
GitHub user cloud-fan reopened a pull request: https://github.com/apache/spark/pull/16404 [SPARK-18969][SQL] Support grouping by nondeterministic expressions ## What changes were proposed in this pull request? Currently nondeterministic expressions are allowed in `Aggregate`(see the [comment](https://github.com/apache/spark/blob/v2.0.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L249-L251)), but the `PullOutNondeterministic` analyzer rule failed to handle `Aggregate`, this PR fixes it. close https://github.com/apache/spark/pull/16379 ## How was this patch tested? a new test suite You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark groupby Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16404.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16404 commit f1451883df9077ecbf31f3a86d2427b60262f863 Author: Wenchen Fan Date: 2016-12-26T10:24:07Z Support grouping by nondeterministic expressions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16404#discussion_r94229396 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1918,28 +1918,37 @@ class Analyzer( case p: Project => p case f: Filter => f + case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) => +val nondeterToAttr = getNondeterToAttr(a.groupingExpressions) +val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child) +a.transformExpressions { case e => + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) +}.copy(child = newChild) + // todo: It's hard to write a general rule to pull out nondeterministic expressions // from LogicalPlan, currently we only do it for UnaryNode which has same output // schema with its child. case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => -val nondeterministicExprs = p.expressions.filterNot(_.deterministic).flatMap { expr => - val leafNondeterministic = expr.collect { -case n: Nondeterministic => n - } - leafNondeterministic.map { e => -val ne = e match { - case n: NamedExpression => n - case _ => Alias(e, "_nondeterministic")(isGenerated = true) -} -new TreeNodeRef(e) -> ne - } -}.toMap +val nondeterToAttr = getNondeterToAttr(p.expressions) val newPlan = p.transformExpressions { case e => - nondeterministicExprs.get(new TreeNodeRef(e)).map(_.toAttribute).getOrElse(e) + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) } -val newChild = Project(p.child.output ++ nondeterministicExprs.values, p.child) +val newChild = Project(p.child.output ++ nondeterToAttr.values, p.child) Project(p.output, newPlan.withNewChildren(newChild :: Nil)) } + +private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = { + exprs.filterNot(_.deterministic).flatMap { expr => +val leafNondeterministic = expr.collect { case n: Nondeterministic => n } --- End diff -- this problem was already there, let's send a new PR to fix it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16404#discussion_r94229355 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1918,28 +1918,37 @@ class Analyzer( case p: Project => p case f: Filter => f + case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) => +val nondeterToAttr = getNondeterToAttr(a.groupingExpressions) +val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child) +a.transformExpressions { case e => + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) +}.copy(child = newChild) + // todo: It's hard to write a general rule to pull out nondeterministic expressions // from LogicalPlan, currently we only do it for UnaryNode which has same output // schema with its child. case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => -val nondeterministicExprs = p.expressions.filterNot(_.deterministic).flatMap { expr => - val leafNondeterministic = expr.collect { -case n: Nondeterministic => n - } - leafNondeterministic.map { e => -val ne = e match { - case n: NamedExpression => n - case _ => Alias(e, "_nondeterministic")(isGenerated = true) -} -new TreeNodeRef(e) -> ne - } -}.toMap +val nondeterToAttr = getNondeterToAttr(p.expressions) val newPlan = p.transformExpressions { case e => - nondeterministicExprs.get(new TreeNodeRef(e)).map(_.toAttribute).getOrElse(e) + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) } -val newChild = Project(p.child.output ++ nondeterministicExprs.values, p.child) +val newChild = Project(p.child.output ++ nondeterToAttr.values, p.child) Project(p.output, newPlan.withNewChildren(newChild :: Nil)) } + +private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = { + exprs.filterNot(_.deterministic).flatMap { expr => +val leafNondeterministic = expr.collect { case n: Nondeterministic => n } --- End diff -- seems we need to add a new API, here we wanna get non-deterministic leaf nodes, and trait `Nondeterministic` is not suitable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16404#discussion_r93998144 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1918,28 +1918,37 @@ class Analyzer( case p: Project => p case f: Filter => f + case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) => +val nondeterToAttr = getNondeterToAttr(a.groupingExpressions) +val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child) +a.transformExpressions { case e => + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) +}.copy(child = newChild) + // todo: It's hard to write a general rule to pull out nondeterministic expressions // from LogicalPlan, currently we only do it for UnaryNode which has same output // schema with its child. case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => -val nondeterministicExprs = p.expressions.filterNot(_.deterministic).flatMap { expr => - val leafNondeterministic = expr.collect { -case n: Nondeterministic => n - } - leafNondeterministic.map { e => -val ne = e match { - case n: NamedExpression => n - case _ => Alias(e, "_nondeterministic")(isGenerated = true) -} -new TreeNodeRef(e) -> ne - } -}.toMap +val nondeterToAttr = getNondeterToAttr(p.expressions) val newPlan = p.transformExpressions { case e => - nondeterministicExprs.get(new TreeNodeRef(e)).map(_.toAttribute).getOrElse(e) + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) } -val newChild = Project(p.child.output ++ nondeterministicExprs.values, p.child) +val newChild = Project(p.child.output ++ nondeterToAttr.values, p.child) Project(p.output, newPlan.withNewChildren(newChild :: Nil)) } + +private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = { + exprs.filterNot(_.deterministic).flatMap { expr => +val leafNondeterministic = expr.collect { case n: Nondeterministic => n } --- End diff -- `statefulUDF()` is a stateful/non-deterministic UDF which does not exend `Nondeterministic`, but its `deterministic` is equal to `false` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16404#discussion_r93997721 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1918,28 +1918,37 @@ class Analyzer( case p: Project => p case f: Filter => f + case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) => +val nondeterToAttr = getNondeterToAttr(a.groupingExpressions) +val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child) +a.transformExpressions { case e => + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) +}.copy(child = newChild) + // todo: It's hard to write a general rule to pull out nondeterministic expressions // from LogicalPlan, currently we only do it for UnaryNode which has same output // schema with its child. case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => -val nondeterministicExprs = p.expressions.filterNot(_.deterministic).flatMap { expr => - val leafNondeterministic = expr.collect { -case n: Nondeterministic => n - } - leafNondeterministic.map { e => -val ne = e match { - case n: NamedExpression => n - case _ => Alias(e, "_nondeterministic")(isGenerated = true) -} -new TreeNodeRef(e) -> ne - } -}.toMap +val nondeterToAttr = getNondeterToAttr(p.expressions) val newPlan = p.transformExpressions { case e => - nondeterministicExprs.get(new TreeNodeRef(e)).map(_.toAttribute).getOrElse(e) + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) } -val newChild = Project(p.child.output ++ nondeterministicExprs.values, p.child) +val newChild = Project(p.child.output ++ nondeterToAttr.values, p.child) Project(p.output, newPlan.withNewChildren(newChild :: Nil)) } + +private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = { + exprs.filterNot(_.deterministic).flatMap { expr => +val leafNondeterministic = expr.collect { case n: Nondeterministic => n } --- End diff -- https://github.com/apache/spark/pull/16404/files#diff-a0f2e45a5da747e9ec483f3557aa1b8bR138 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16404#discussion_r93997700 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1918,28 +1918,37 @@ class Analyzer( case p: Project => p case f: Filter => f + case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) => +val nondeterToAttr = getNondeterToAttr(a.groupingExpressions) +val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child) +a.transformExpressions { case e => + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) +}.copy(child = newChild) + // todo: It's hard to write a general rule to pull out nondeterministic expressions // from LogicalPlan, currently we only do it for UnaryNode which has same output // schema with its child. case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => -val nondeterministicExprs = p.expressions.filterNot(_.deterministic).flatMap { expr => - val leafNondeterministic = expr.collect { -case n: Nondeterministic => n - } - leafNondeterministic.map { e => -val ne = e match { - case n: NamedExpression => n - case _ => Alias(e, "_nondeterministic")(isGenerated = true) -} -new TreeNodeRef(e) -> ne - } -}.toMap +val nondeterToAttr = getNondeterToAttr(p.expressions) val newPlan = p.transformExpressions { case e => - nondeterministicExprs.get(new TreeNodeRef(e)).map(_.toAttribute).getOrElse(e) + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) } -val newChild = Project(p.child.output ++ nondeterministicExprs.values, p.child) +val newChild = Project(p.child.output ++ nondeterToAttr.values, p.child) Project(p.output, newPlan.withNewChildren(newChild :: Nil)) } + +private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = { + exprs.filterNot(_.deterministic).flatMap { expr => +val leafNondeterministic = expr.collect { case n: Nondeterministic => n } --- End diff -- is it same with the existing test? `select a, rand(0), sum(b) from data group by a, 2` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16404#discussion_r93942716 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1918,28 +1918,37 @@ class Analyzer( case p: Project => p case f: Filter => f + case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) => +val nondeterToAttr = getNondeterToAttr(a.groupingExpressions) +val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child) +a.transformExpressions { case e => + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) +}.copy(child = newChild) + // todo: It's hard to write a general rule to pull out nondeterministic expressions // from LogicalPlan, currently we only do it for UnaryNode which has same output // schema with its child. case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => --- End diff -- Thanks! I see --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16404#discussion_r93942649 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1918,28 +1918,37 @@ class Analyzer( case p: Project => p case f: Filter => f + case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) => +val nondeterToAttr = getNondeterToAttr(a.groupingExpressions) +val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child) +a.transformExpressions { case e => + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) +}.copy(child = newChild) + // todo: It's hard to write a general rule to pull out nondeterministic expressions // from LogicalPlan, currently we only do it for UnaryNode which has same output // schema with its child. case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => -val nondeterministicExprs = p.expressions.filterNot(_.deterministic).flatMap { expr => - val leafNondeterministic = expr.collect { -case n: Nondeterministic => n - } - leafNondeterministic.map { e => -val ne = e match { - case n: NamedExpression => n - case _ => Alias(e, "_nondeterministic")(isGenerated = true) -} -new TreeNodeRef(e) -> ne - } -}.toMap +val nondeterToAttr = getNondeterToAttr(p.expressions) val newPlan = p.transformExpressions { case e => - nondeterministicExprs.get(new TreeNodeRef(e)).map(_.toAttribute).getOrElse(e) + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) } -val newChild = Project(p.child.output ++ nondeterministicExprs.values, p.child) +val newChild = Project(p.child.output ++ nondeterToAttr.values, p.child) Project(p.output, newPlan.withNewChildren(newChild :: Nil)) } + +private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = { + exprs.filterNot(_.deterministic).flatMap { expr => +val leafNondeterministic = expr.collect { case n: Nondeterministic => n } --- End diff -- This rule is running once. Thus, it should be safe; otherwise, it might generate many useless `Project` when some expressions are not extending `Nondeterministic` but its deterministics is false. Maybe added a nagative test case for [this check](https://github.com/apache/spark/blob/v2.0.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L248-L254) ```scala sql(s"CREATE TEMPORARY FUNCTION statefulUDF AS '${classOf[StatefulUDF].getName}'") val df = Seq((1, 1)).toDF("a", "b") df.createOrReplaceTempView("data") sql("select a, statefulUDF(), sum(b) from data group by a, 2").show() ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16404#discussion_r93921258 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1918,28 +1918,37 @@ class Analyzer( case p: Project => p case f: Filter => f + case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) => +val nondeterToAttr = getNondeterToAttr(a.groupingExpressions) +val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child) +a.transformExpressions { case e => + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) +}.copy(child = newChild) + // todo: It's hard to write a general rule to pull out nondeterministic expressions // from LogicalPlan, currently we only do it for UnaryNode which has same output // schema with its child. case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => --- End diff -- to narrow down the scope of the affected operators, but ideally we should use a white-list --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16404#discussion_r93921170 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1918,28 +1918,37 @@ class Analyzer( case p: Project => p case f: Filter => f + case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) => +val nondeterToAttr = getNondeterToAttr(a.groupingExpressions) +val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child) +a.transformExpressions { case e => + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) +}.copy(child = newChild) + // todo: It's hard to write a general rule to pull out nondeterministic expressions // from LogicalPlan, currently we only do it for UnaryNode which has same output // schema with its child. case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => -val nondeterministicExprs = p.expressions.filterNot(_.deterministic).flatMap { expr => - val leafNondeterministic = expr.collect { -case n: Nondeterministic => n - } - leafNondeterministic.map { e => -val ne = e match { - case n: NamedExpression => n - case _ => Alias(e, "_nondeterministic")(isGenerated = true) -} -new TreeNodeRef(e) -> ne - } -}.toMap +val nondeterToAttr = getNondeterToAttr(p.expressions) val newPlan = p.transformExpressions { case e => - nondeterministicExprs.get(new TreeNodeRef(e)).map(_.toAttribute).getOrElse(e) + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) } -val newChild = Project(p.child.output ++ nondeterministicExprs.values, p.child) +val newChild = Project(p.child.output ++ nondeterToAttr.values, p.child) Project(p.output, newPlan.withNewChildren(newChild :: Nil)) } + +private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = { + exprs.filterNot(_.deterministic).flatMap { expr => +val leafNondeterministic = expr.collect { case n: Nondeterministic => n } --- End diff -- 1. this is same with the previous behavior 2. according to the variable name: `leafNondeterministic`, it seems reasonable to collect `Nondeterministic` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16404#discussion_r93903360 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1918,28 +1918,37 @@ class Analyzer( case p: Project => p case f: Filter => f + case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) => +val nondeterToAttr = getNondeterToAttr(a.groupingExpressions) +val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child) +a.transformExpressions { case e => + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) +}.copy(child = newChild) + // todo: It's hard to write a general rule to pull out nondeterministic expressions // from LogicalPlan, currently we only do it for UnaryNode which has same output // schema with its child. case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => --- End diff -- Just a question. What is the reason why we need to have such a condition `p.output == p.child.output`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16404#discussion_r93901178 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1918,28 +1918,37 @@ class Analyzer( case p: Project => p case f: Filter => f + case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) => +val nondeterToAttr = getNondeterToAttr(a.groupingExpressions) +val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child) +a.transformExpressions { case e => + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) +}.copy(child = newChild) + // todo: It's hard to write a general rule to pull out nondeterministic expressions // from LogicalPlan, currently we only do it for UnaryNode which has same output // schema with its child. case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => -val nondeterministicExprs = p.expressions.filterNot(_.deterministic).flatMap { expr => - val leafNondeterministic = expr.collect { -case n: Nondeterministic => n - } - leafNondeterministic.map { e => -val ne = e match { - case n: NamedExpression => n - case _ => Alias(e, "_nondeterministic")(isGenerated = true) -} -new TreeNodeRef(e) -> ne - } -}.toMap +val nondeterToAttr = getNondeterToAttr(p.expressions) val newPlan = p.transformExpressions { case e => - nondeterministicExprs.get(new TreeNodeRef(e)).map(_.toAttribute).getOrElse(e) + nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) } -val newChild = Project(p.child.output ++ nondeterministicExprs.values, p.child) +val newChild = Project(p.child.output ++ nondeterToAttr.values, p.child) Project(p.output, newPlan.withNewChildren(newChild :: Nil)) } + +private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = { + exprs.filterNot(_.deterministic).flatMap { expr => +val leafNondeterministic = expr.collect { case n: Nondeterministic => n } --- End diff -- Not all the non-deterministic expressions extend `Nondeterministic`. This might not cover all the cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16404: [SPARK-18969][SQL] Support grouping by nondetermi...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/16404 [SPARK-18969][SQL] Support grouping by nondeterministic expressions ## What changes were proposed in this pull request? Currently nondeterministic expressions are allowed in `Aggregate`(see the [comment](https://github.com/apache/spark/blob/v2.0.2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L249-L251)), but the `PullOutNondeterministic` analyzer rule failed to handle `Aggregate`, this PR fixes it. ## How was this patch tested? a new test suite You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark groupby Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16404.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16404 commit 67f032b6ab7b23aecf432b13a6f455582754fc98 Author: Wenchen Fan Date: 2016-12-26T10:24:07Z Support grouping by nondeterministic expressions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org