spark git commit: [SPARK-18870] Disallowed Distinct Aggregations on Streaming Datasets
Repository: spark Updated Branches: refs/heads/branch-2.1 2a8de2e11 -> e430915fa [SPARK-18870] Disallowed Distinct Aggregations on Streaming Datasets ## What changes were proposed in this pull request? Check whether Aggregation operators on a streaming subplan have aggregate expressions with isDistinct = true. ## How was this patch tested? Added unit test Author: Tathagata Das Closes #16289 from tdas/SPARK-18870. (cherry picked from commit 4f7292c87512a7da3542998d0e5aa21c27a511e9) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e430915f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e430915f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e430915f Branch: refs/heads/branch-2.1 Commit: e430915fad7ffb9397a96f0ef16e741c6b4f158b Parents: 2a8de2e Author: Tathagata Das Authored: Thu Dec 15 11:54:35 2016 -0800 Committer: Tathagata Das Committed: Thu Dec 15 11:54:47 2016 -0800 -- .../analysis/UnsupportedOperationChecker.scala | 15 +-- .../analysis/UnsupportedOperationsSuite.scala| 13 + 2 files changed, 26 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e430915f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index c054fcb..c4a78f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.{AnalysisException, InternalOutputModes} import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.streaming.OutputMode @@ -95,6 +96,16 @@ object UnsupportedOperationChecker { // Operations that cannot exists anywhere in a streaming plan subPlan match { +case Aggregate(_, aggregateExpressions, child) => + val distinctAggExprs = aggregateExpressions.flatMap { expr => +expr.collect { case ae: AggregateExpression if ae.isDistinct => ae } + } + throwErrorIf( +child.isStreaming && distinctAggExprs.nonEmpty, +"Distinct aggregations are not supported on streaming DataFrames/Datasets, unless " + + "it is on aggregated DataFrame/Dataset in Complete output mode. Consider using " + + "approximate distinct aggregation (e.g. approx_count_distinct() instead of count()).") + case _: Command => throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " + "streaming DataFrames/Datasets") @@ -143,7 +154,7 @@ object UnsupportedOperationChecker { throwError("Union between streaming and batch DataFrames/Datasets is not supported") case Except(left, right) if right.isStreaming => - throwError("Except with a streaming DataFrame/Dataset on the right is not supported") + throwError("Except on a streaming DataFrame/Dataset on the right is not supported") case Intersect(left, right) if left.isStreaming && right.isStreaming => throwError("Intersect between two streaming DataFrames/Datasets is not supported") @@ -156,7 +167,7 @@ object UnsupportedOperationChecker { case Sort(_, _, _) | SortPartitions(_, _) if !containsCompleteData(subPlan) => throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on" + -"aggregated DataFrame/Dataset in Complete mode") +"aggregated DataFrame/Dataset in Complete output mode") case Sample(_, _, _, _, child) if child.isStreaming => throwError("Sampling is not supported on streaming DataFrames/Datasets") http://git-wip-us.apache.org/repos/asf/spark/blob/e430915f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index
spark git commit: [SPARK-18870] Disallowed Distinct Aggregations on Streaming Datasets
Repository: spark Updated Branches: refs/heads/master 01e14bf30 -> 4f7292c87 [SPARK-18870] Disallowed Distinct Aggregations on Streaming Datasets ## What changes were proposed in this pull request? Check whether Aggregation operators on a streaming subplan have aggregate expressions with isDistinct = true. ## How was this patch tested? Added unit test Author: Tathagata Das Closes #16289 from tdas/SPARK-18870. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f7292c8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f7292c8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f7292c8 Branch: refs/heads/master Commit: 4f7292c87512a7da3542998d0e5aa21c27a511e9 Parents: 01e14bf Author: Tathagata Das Authored: Thu Dec 15 11:54:35 2016 -0800 Committer: Tathagata Das Committed: Thu Dec 15 11:54:35 2016 -0800 -- .../analysis/UnsupportedOperationChecker.scala | 15 +-- .../analysis/UnsupportedOperationsSuite.scala| 13 + 2 files changed, 26 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4f7292c8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index c054fcb..c4a78f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.{AnalysisException, InternalOutputModes} import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.streaming.OutputMode @@ -95,6 +96,16 @@ object UnsupportedOperationChecker { // Operations that cannot exists anywhere in a streaming plan subPlan match { +case Aggregate(_, aggregateExpressions, child) => + val distinctAggExprs = aggregateExpressions.flatMap { expr => +expr.collect { case ae: AggregateExpression if ae.isDistinct => ae } + } + throwErrorIf( +child.isStreaming && distinctAggExprs.nonEmpty, +"Distinct aggregations are not supported on streaming DataFrames/Datasets, unless " + + "it is on aggregated DataFrame/Dataset in Complete output mode. Consider using " + + "approximate distinct aggregation (e.g. approx_count_distinct() instead of count()).") + case _: Command => throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " + "streaming DataFrames/Datasets") @@ -143,7 +154,7 @@ object UnsupportedOperationChecker { throwError("Union between streaming and batch DataFrames/Datasets is not supported") case Except(left, right) if right.isStreaming => - throwError("Except with a streaming DataFrame/Dataset on the right is not supported") + throwError("Except on a streaming DataFrame/Dataset on the right is not supported") case Intersect(left, right) if left.isStreaming && right.isStreaming => throwError("Intersect between two streaming DataFrames/Datasets is not supported") @@ -156,7 +167,7 @@ object UnsupportedOperationChecker { case Sort(_, _, _) | SortPartitions(_, _) if !containsCompleteData(subPlan) => throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on" + -"aggregated DataFrame/Dataset in Complete mode") +"aggregated DataFrame/Dataset in Complete output mode") case Sample(_, _, _, _, child) if child.isStreaming => throwError("Sampling is not supported on streaming DataFrames/Datasets") http://git-wip-us.apache.org/repos/asf/spark/blob/4f7292c8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index ff1bb12..34e94c7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/Unsupporte