Repository: spark
Updated Branches:
  refs/heads/branch-2.0 54b4763d2 -> 47c2a265f


[SPARK-15812][SQ][STREAMING] Added support for sorting after streaming 
aggregation with complete mode

## What changes were proposed in this pull request?

When the output mode is complete, then the output of a streaming aggregation 
essentially will contain the complete aggregates every time. So this is not 
different from a batch dataset within an incremental execution. Other 
non-streaming operations should be supported on this dataset. In this PR, I am 
just adding support for sorting, as it is a common useful functionality. 
Support for other operations will come later.

## How was this patch tested?
Additional unit tests.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #13549 from tdas/SPARK-15812.

(cherry picked from commit abdb5d42c5802c8f60876aa1285c803d02881258)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47c2a265
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47c2a265
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47c2a265

Branch: refs/heads/branch-2.0
Commit: 47c2a265fbdb91cf5684f0d6342869ca08cb2d27
Parents: 54b4763
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Fri Jun 10 10:48:28 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Fri Jun 10 10:48:38 2016 -0700

----------------------------------------------------------------------
 .../analysis/UnsupportedOperationChecker.scala  | 61 ++++++++++++--------
 .../analysis/UnsupportedOperationsSuite.scala   | 17 +++++-
 .../apache/spark/sql/streaming/StreamTest.scala | 24 +++++---
 .../streaming/StreamingAggregationSuite.scala   | 25 ++++++++
 4 files changed, 95 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/47c2a265/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 8373fa3..689e016 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -43,6 +43,41 @@ object UnsupportedOperationChecker {
         "Queries without streaming sources cannot be executed with 
write.startStream()")(plan)
     }
 
+    // Disallow multiple streaming aggregations
+    val aggregates = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming 
=> a }
+
+    if (aggregates.size > 1) {
+      throwError(
+        "Multiple streaming aggregations are not supported with " +
+          "streaming DataFrames/Datasets")(plan)
+    }
+
+    // Disallow some output mode
+    outputMode match {
+      case InternalOutputModes.Append if aggregates.nonEmpty =>
+        throwError(
+          s"$outputMode output mode not supported when there are streaming 
aggregations on " +
+            s"streaming DataFrames/DataSets")(plan)
+
+      case InternalOutputModes.Complete | InternalOutputModes.Update if 
aggregates.isEmpty =>
+        throwError(
+          s"$outputMode output mode not supported when there are no streaming 
aggregations on " +
+            s"streaming DataFrames/Datasets")(plan)
+
+      case _ =>
+    }
+
+    /**
+     * Whether the subplan will contain complete data or incremental data in 
every incremental
+     * execution. Some operations may be allowed only when the child logical 
plan gives complete
+     * data.
+     */
+    def containsCompleteData(subplan: LogicalPlan): Boolean = {
+      val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => 
a }
+      // Either the subplan has no streaming source, or it has aggregation 
with Complete mode
+      !subplan.isStreaming || (aggs.nonEmpty && outputMode == 
InternalOutputModes.Complete)
+    }
+
     plan.foreachUp { implicit subPlan =>
 
       // Operations that cannot exists anywhere in a streaming plan
@@ -107,8 +142,9 @@ object UnsupportedOperationChecker {
         case GlobalLimit(_, _) | LocalLimit(_, _) if 
subPlan.children.forall(_.isStreaming) =>
           throwError("Limits are not supported on streaming 
DataFrames/Datasets")
 
-        case Sort(_, _, _) | SortPartitions(_, _) if 
subPlan.children.forall(_.isStreaming) =>
-          throwError("Sorting is not supported on streaming 
DataFrames/Datasets")
+        case Sort(_, _, _) | SortPartitions(_, _) if 
!containsCompleteData(subPlan) =>
+          throwError("Sorting is not supported on streaming 
DataFrames/Datasets, unless it is on" +
+            "aggregated DataFrame/Dataset in Complete mode")
 
         case Sample(_, _, _, _, child) if child.isStreaming =>
           throwError("Sampling is not supported on streaming 
DataFrames/Datasets")
@@ -123,27 +159,6 @@ object UnsupportedOperationChecker {
         case _ =>
       }
     }
-
-    // Checks related to aggregations
-    val aggregates = plan.collect { case a @ Aggregate(_, _, _) if 
a.isStreaming => a }
-    outputMode match {
-      case InternalOutputModes.Append if aggregates.nonEmpty =>
-        throwError(
-          s"$outputMode output mode not supported when there are streaming 
aggregations on " +
-            s"streaming DataFrames/DataSets")(plan)
-
-      case InternalOutputModes.Complete | InternalOutputModes.Update if 
aggregates.isEmpty =>
-        throwError(
-          s"$outputMode output mode not supported when there are no streaming 
aggregations on " +
-            s"streaming DataFrames/Datasets")(plan)
-
-      case _ =>
-    }
-    if (aggregates.size > 1) {
-      throwError(
-        "Multiple streaming aggregations are not supported with " +
-          "streaming DataFrames/Datasets")(plan)
-    }
   }
 
   private def throwErrorIf(

http://git-wip-us.apache.org/repos/asf/spark/blob/47c2a265/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 378cca3..c21ad5e 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -81,7 +81,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     outputMode = Append,
     expectedMsgs = "commands" :: Nil)
 
-  // Multiple streaming aggregations not supported
+  // Aggregation: Multiple streaming aggregations not supported
   def aggExprs(name: String): Seq[NamedExpression] = Seq(Count("*").as(name))
 
   assertSupportedInStreamingPlan(
@@ -189,8 +189,20 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.intersect(_),
     streamStreamSupported = false)
 
-  // Unary operations
+  // Sort: supported only on batch subplans and on aggregation + complete 
output mode
   testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _))
+  assertSupportedInStreamingPlan(
+    "sort - sort over aggregated data in Complete output mode",
+    streamRelation.groupBy()(Count("*")).sortBy(),
+    Complete)
+  assertNotSupportedInStreamingPlan(
+    "sort - sort over aggregated data in Update output mode",
+    streamRelation.groupBy()(Count("*")).sortBy(),
+    Update,
+    Seq("sort", "aggregat", "complete")) // sort on aggregations is supported 
on Complete mode only
+
+
+  // Other unary operations
   testUnaryOperatorInStreamingPlan("sort partitions", SortPartitions(Nil, _), 
expectedMsg = "sort")
   testUnaryOperatorInStreamingPlan(
     "sample", Sample(0.1, 1, true, 1L, _)(), expectedMsg = "sampling")
@@ -299,6 +311,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
       outputMode)
   }
 
+  /** Test output mode with and without aggregation in the streaming plan */
   def testOutputMode(
       outputMode: OutputMode,
       shouldSupportAggregation: Boolean): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/47c2a265/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 194c3e7..7f1e5fe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -111,10 +111,13 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
     def apply[A : Encoder](data: A*): CheckAnswerRows = {
       val encoder = encoderFor[A]
       val toExternalRow = RowEncoder(encoder.schema).resolveAndBind()
-      CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), 
false)
+      CheckAnswerRows(
+        data.map(d => toExternalRow.fromRow(encoder.toRow(d))),
+        lastOnly = false,
+        isSorted = false)
     }
 
-    def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, false)
+    def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, false, 
false)
   }
 
   /**
@@ -123,15 +126,22 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
    */
   object CheckLastBatch {
     def apply[A : Encoder](data: A*): CheckAnswerRows = {
+      apply(isSorted = false, data: _*)
+    }
+
+    def apply[A: Encoder](isSorted: Boolean, data: A*): CheckAnswerRows = {
       val encoder = encoderFor[A]
       val toExternalRow = RowEncoder(encoder.schema).resolveAndBind()
-      CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))), 
true)
+      CheckAnswerRows(
+        data.map(d => toExternalRow.fromRow(encoder.toRow(d))),
+        lastOnly = true,
+        isSorted = isSorted)
     }
 
-    def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, true)
+    def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows, true, false)
   }
 
-  case class CheckAnswerRows(expectedAnswer: Seq[Row], lastOnly: Boolean)
+  case class CheckAnswerRows(expectedAnswer: Seq[Row], lastOnly: Boolean, 
isSorted: Boolean)
       extends StreamAction with StreamMustBeRunning {
     override def toString: String = s"$operatorName: 
${expectedAnswer.mkString(",")}"
     private def operatorName = if (lastOnly) "CheckLastBatch" else 
"CheckAnswer"
@@ -414,7 +424,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
                 failTest("Error adding data", e)
             }
 
-          case CheckAnswerRows(expectedAnswer, lastOnly) =>
+          case CheckAnswerRows(expectedAnswer, lastOnly, isSorted) =>
             verify(currentStream != null, "stream not running")
             // Get the map of source index to the current source objects
             val indexToSource = currentStream
@@ -436,7 +446,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
                 failTest("Exception while getting data from sink", e)
             }
 
-            QueryTest.sameRows(expectedAnswer, sparkAnswer).foreach {
+            QueryTest.sameRows(expectedAnswer, sparkAnswer, isSorted).foreach {
               error => failTest(error)
             }
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/47c2a265/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 1f174ae..8681199 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -104,6 +104,31 @@ class StreamingAggregationSuite extends StreamTest with 
BeforeAndAfterAll {
     }
   }
 
+  test("sort after aggregate in complete mode") {
+    val inputData = MemoryStream[Int]
+
+    val aggregated =
+      inputData.toDF()
+        .groupBy($"value")
+        .agg(count("*"))
+        .toDF("value", "count")
+        .orderBy($"count".desc)
+        .as[(Int, Long)]
+
+    testStream(aggregated, Complete)(
+      AddData(inputData, 3),
+      CheckLastBatch(isSorted = true, (3, 1)),
+      AddData(inputData, 2, 3),
+      CheckLastBatch(isSorted = true, (3, 2), (2, 1)),
+      StopStream,
+      StartStream(),
+      AddData(inputData, 3, 2, 1),
+      CheckLastBatch(isSorted = true, (3, 3), (2, 2), (1, 1)),
+      AddData(inputData, 4, 4, 4, 4),
+      CheckLastBatch(isSorted = true, (4, 4), (3, 3), (2, 2), (1, 1))
+    )
+  }
+
   test("multiple keys") {
     val inputData = MemoryStream[Int]
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to