Repository: spark
Updated Branches:
  refs/heads/branch-2.0 9bfb16a6b -> 24ea16598


[SPARK-15428][SQL] Disable multiple streaming aggregations

## What changes were proposed in this pull request?

Incrementalizing plans of with multiple streaming aggregation is tricky and we 
dont have the necessary support for "delta" to implement correctly. So 
disabling the support for multiple streaming aggregations.

## How was this patch tested?
Additional unit tests

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #13210 from tdas/SPARK-15428.

(cherry picked from commit 1ffa608ba5a849739a56047bda8b157b86b08650)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24ea1659
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24ea1659
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24ea1659

Branch: refs/heads/branch-2.0
Commit: 24ea16598d4d912ac6a4e7961be1b66e82c2c23f
Parents: 9bfb16a
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Sun May 22 02:08:18 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Sun May 22 02:08:35 2016 -0700

----------------------------------------------------------------------
 .../analysis/UnsupportedOperationChecker.scala  | 18 +++++++--
 .../analysis/UnsupportedOperationsSuite.scala   | 39 ++++++++++++++------
 .../streaming/StreamingAggregationSuite.scala   | 19 ----------
 3 files changed, 41 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/24ea1659/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index aadc1d3..0e08bf0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -55,10 +55,20 @@ object UnsupportedOperationChecker {
         case _: InsertIntoTable =>
           throwError("InsertIntoTable is not supported with streaming 
DataFrames/Datasets")
 
-        case Aggregate(_, _, child) if child.isStreaming && outputMode == 
Append =>
-          throwError(
-            "Aggregations are not supported on streaming DataFrames/Datasets 
in " +
-              "Append output mode. Consider changing output mode to Update.")
+        case Aggregate(_, _, child) if child.isStreaming =>
+          if (outputMode == Append) {
+            throwError(
+              "Aggregations are not supported on streaming DataFrames/Datasets 
in " +
+                "Append output mode. Consider changing output mode to Update.")
+          }
+          val moreStreamingAggregates = child.find {
+            case Aggregate(_, _, grandchild) if grandchild.isStreaming => true
+            case _ => false
+          }
+          if (moreStreamingAggregates.nonEmpty) {
+            throwError("Multiple streaming aggregations are not supported with 
" +
+              "streaming DataFrames/Datasets")
+          }
 
         case Join(left, right, joinType, _) =>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/24ea1659/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 50baebe..674277b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -23,7 +23,8 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.aggregate.Count
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.types.IntegerType
@@ -95,6 +96,26 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     outputMode = Append,
     Seq("aggregation", "append output mode"))
 
+  // Multiple streaming aggregations not supported
+  def aggExprs(name: String): Seq[NamedExpression] = Seq(Count("*").as(name))
+
+  assertSupportedInStreamingPlan(
+    "aggregate - multiple batch aggregations",
+    Aggregate(Nil, aggExprs("c"), Aggregate(Nil, aggExprs("d"), 
batchRelation)),
+    Update)
+
+  assertSupportedInStreamingPlan(
+    "aggregate - multiple aggregations but only one streaming aggregation",
+    Aggregate(Nil, aggExprs("c"), batchRelation).join(
+      Aggregate(Nil, aggExprs("d"), streamRelation), joinType = Inner),
+    Update)
+
+  assertNotSupportedInStreamingPlan(
+    "aggregate - multiple streaming aggregations",
+    Aggregate(Nil, aggExprs("c"), Aggregate(Nil, aggExprs("d"), 
streamRelation)),
+    outputMode = Update,
+    expectedMsgs = Seq("multiple streaming aggregations"))
+
   // Inner joins: Stream-stream not supported
   testBinaryOperationInStreamingPlan(
     "inner join",
@@ -354,17 +375,11 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
       val e = intercept[AnalysisException] {
         testBody
       }
-
-      if 
(!expectedMsgs.map(_.toLowerCase).forall(e.getMessage.toLowerCase.contains)) {
-        fail(
-          s"""Exception message should contain the following substrings:
-              |
-          |  ${expectedMsgs.mkString("\n  ")}
-              |
-          |Actual exception message:
-              |
-          |  ${e.getMessage}
-          """.stripMargin)
+      expectedMsgs.foreach { m =>
+        if (!e.getMessage.toLowerCase.contains(m.toLowerCase)) {
+          fail(s"Exception message should contain: '$m', " +
+            s"actual exception message:\n\t'${e.getMessage}'")
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/24ea1659/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 0f5fc9c..7104d01 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -84,25 +84,6 @@ class StreamingAggregationSuite extends StreamTest with 
SharedSQLContext with Be
     )
   }
 
-  test("multiple aggregations") {
-    val inputData = MemoryStream[Int]
-
-    val aggregated =
-      inputData.toDF()
-        .groupBy($"value")
-        .agg(count("*") as 'count)
-        .groupBy($"value" % 2)
-        .agg(sum($"count"))
-        .as[(Int, Long)]
-
-    testStream(aggregated)(
-      AddData(inputData, 1, 2, 3, 4),
-      CheckLastBatch((0, 2), (1, 2)),
-      AddData(inputData, 1, 3, 5),
-      CheckLastBatch((1, 5))
-    )
-  }
-
   testQuietly("midbatch failure") {
     val inputData = MemoryStream[Int]
     FailureSinglton.firstTime = true


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to