spark git commit: [SPARK-18870] Disallowed Distinct Aggregations on Streaming Datasets

2016-12-15 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 2a8de2e11 -> e430915fa


[SPARK-18870] Disallowed Distinct Aggregations on Streaming Datasets

## What changes were proposed in this pull request?

Check whether Aggregation operators on a streaming subplan have aggregate 
expressions with isDistinct = true.

## How was this patch tested?

Added unit test

Author: Tathagata Das 

Closes #16289 from tdas/SPARK-18870.

(cherry picked from commit 4f7292c87512a7da3542998d0e5aa21c27a511e9)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e430915f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e430915f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e430915f

Branch: refs/heads/branch-2.1
Commit: e430915fad7ffb9397a96f0ef16e741c6b4f158b
Parents: 2a8de2e
Author: Tathagata Das 
Authored: Thu Dec 15 11:54:35 2016 -0800
Committer: Tathagata Das 
Committed: Thu Dec 15 11:54:47 2016 -0800

--
 .../analysis/UnsupportedOperationChecker.scala   | 15 +--
 .../analysis/UnsupportedOperationsSuite.scala| 13 +
 2 files changed, 26 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e430915f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index c054fcb..c4a78f9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.{AnalysisException, InternalOutputModes}
 import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.streaming.OutputMode
@@ -95,6 +96,16 @@ object UnsupportedOperationChecker {
   // Operations that cannot exists anywhere in a streaming plan
   subPlan match {
 
+case Aggregate(_, aggregateExpressions, child) =>
+  val distinctAggExprs = aggregateExpressions.flatMap { expr =>
+expr.collect { case ae: AggregateExpression if ae.isDistinct => ae 
}
+  }
+  throwErrorIf(
+child.isStreaming && distinctAggExprs.nonEmpty,
+"Distinct aggregations are not supported on streaming 
DataFrames/Datasets, unless " +
+  "it is on aggregated DataFrame/Dataset in Complete output mode. 
Consider using " +
+  "approximate distinct aggregation (e.g. approx_count_distinct() 
instead of count()).")
+
 case _: Command =>
   throwError("Commands like CreateTable*, AlterTable*, Show* are not 
supported with " +
 "streaming DataFrames/Datasets")
@@ -143,7 +154,7 @@ object UnsupportedOperationChecker {
   throwError("Union between streaming and batch DataFrames/Datasets is 
not supported")
 
 case Except(left, right) if right.isStreaming =>
-  throwError("Except with a streaming DataFrame/Dataset on the right 
is not supported")
+  throwError("Except on a streaming DataFrame/Dataset on the right is 
not supported")
 
 case Intersect(left, right) if left.isStreaming && right.isStreaming =>
   throwError("Intersect between two streaming DataFrames/Datasets is 
not supported")
@@ -156,7 +167,7 @@ object UnsupportedOperationChecker {
 
 case Sort(_, _, _) | SortPartitions(_, _) if 
!containsCompleteData(subPlan) =>
   throwError("Sorting is not supported on streaming 
DataFrames/Datasets, unless it is on" +
-"aggregated DataFrame/Dataset in Complete mode")
+"aggregated DataFrame/Dataset in Complete output mode")
 
 case Sample(_, _, _, _, child) if child.isStreaming =>
   throwError("Sampling is not supported on streaming 
DataFrames/Datasets")

http://git-wip-us.apache.org/repos/asf/spark/blob/e430915f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 

spark git commit: [SPARK-18870] Disallowed Distinct Aggregations on Streaming Datasets

2016-12-15 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 01e14bf30 -> 4f7292c87


[SPARK-18870] Disallowed Distinct Aggregations on Streaming Datasets

## What changes were proposed in this pull request?

Check whether Aggregation operators on a streaming subplan have aggregate 
expressions with isDistinct = true.

## How was this patch tested?

Added unit test

Author: Tathagata Das 

Closes #16289 from tdas/SPARK-18870.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f7292c8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f7292c8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f7292c8

Branch: refs/heads/master
Commit: 4f7292c87512a7da3542998d0e5aa21c27a511e9
Parents: 01e14bf
Author: Tathagata Das 
Authored: Thu Dec 15 11:54:35 2016 -0800
Committer: Tathagata Das 
Committed: Thu Dec 15 11:54:35 2016 -0800

--
 .../analysis/UnsupportedOperationChecker.scala   | 15 +--
 .../analysis/UnsupportedOperationsSuite.scala| 13 +
 2 files changed, 26 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4f7292c8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index c054fcb..c4a78f9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.{AnalysisException, InternalOutputModes}
 import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.streaming.OutputMode
@@ -95,6 +96,16 @@ object UnsupportedOperationChecker {
   // Operations that cannot exists anywhere in a streaming plan
   subPlan match {
 
+case Aggregate(_, aggregateExpressions, child) =>
+  val distinctAggExprs = aggregateExpressions.flatMap { expr =>
+expr.collect { case ae: AggregateExpression if ae.isDistinct => ae 
}
+  }
+  throwErrorIf(
+child.isStreaming && distinctAggExprs.nonEmpty,
+"Distinct aggregations are not supported on streaming 
DataFrames/Datasets, unless " +
+  "it is on aggregated DataFrame/Dataset in Complete output mode. 
Consider using " +
+  "approximate distinct aggregation (e.g. approx_count_distinct() 
instead of count()).")
+
 case _: Command =>
   throwError("Commands like CreateTable*, AlterTable*, Show* are not 
supported with " +
 "streaming DataFrames/Datasets")
@@ -143,7 +154,7 @@ object UnsupportedOperationChecker {
   throwError("Union between streaming and batch DataFrames/Datasets is 
not supported")
 
 case Except(left, right) if right.isStreaming =>
-  throwError("Except with a streaming DataFrame/Dataset on the right 
is not supported")
+  throwError("Except on a streaming DataFrame/Dataset on the right is 
not supported")
 
 case Intersect(left, right) if left.isStreaming && right.isStreaming =>
   throwError("Intersect between two streaming DataFrames/Datasets is 
not supported")
@@ -156,7 +167,7 @@ object UnsupportedOperationChecker {
 
 case Sort(_, _, _) | SortPartitions(_, _) if 
!containsCompleteData(subPlan) =>
   throwError("Sorting is not supported on streaming 
DataFrames/Datasets, unless it is on" +
-"aggregated DataFrame/Dataset in Complete mode")
+"aggregated DataFrame/Dataset in Complete output mode")
 
 case Sample(_, _, _, _, child) if child.isStreaming =>
   throwError("Sampling is not supported on streaming 
DataFrames/Datasets")

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7292c8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index ff1bb12..34e94c7 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/Unsupporte