Repository: spark
Updated Branches:
  refs/heads/branch-2.0 9fc053c30 -> 4c2065d0a


[SPARK-19314][SS][CATALYST] Do not allow sort before aggregation in Structured 
Streaming plan

## What changes were proposed in this pull request?

Sort in a streaming plan should be allowed only after a aggregation in complete 
mode. Currently it is incorrectly allowed when present anywhere in the plan. It 
gives unpredictable potentially incorrect results.

## How was this patch tested?
New test

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #16662 from tdas/SPARK-19314.

(cherry picked from commit 552e5f08841828e55f5924f1686825626da8bcd0)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c2065d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c2065d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c2065d0

Branch: refs/heads/branch-2.0
Commit: 4c2065d0ab3009b95e67a92af7ab0ba52905e7e0
Parents: 9fc053c
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Fri Jan 20 14:04:51 2017 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Fri Jan 20 14:05:23 2017 -0800

----------------------------------------------------------------------
 .../sql/catalyst/analysis/UnsupportedOperationChecker.scala | 2 +-
 .../sql/catalyst/analysis/UnsupportedOperationsSuite.scala  | 9 +++++++--
 2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4c2065d0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index f6e32e2..e6ac198 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -73,7 +73,7 @@ object UnsupportedOperationChecker {
      * data.
      */
     def containsCompleteData(subplan: LogicalPlan): Boolean = {
-      val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => 
a }
+      val aggs = subplan.collect { case a@Aggregate(_, _, _) if a.isStreaming 
=> a }
       // Either the subplan has no streaming source, or it has aggregation 
with Complete mode
       !subplan.isStreaming || (aggs.nonEmpty && outputMode == 
InternalOutputModes.Complete)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/4c2065d0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index ff1bb12..4a156ac 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -186,13 +186,18 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.intersect(_),
     streamStreamSupported = false)
 
-  // Sort: supported only on batch subplans and on aggregation + complete 
output mode
+  // Sort: supported only on batch subplans and after aggregation on streaming 
plan + complete mode
   testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _))
   assertSupportedInStreamingPlan(
-    "sort - sort over aggregated data in Complete output mode",
+    "sort - sort after aggregation in Complete output mode",
     streamRelation.groupBy()(Count("*")).sortBy(),
     Complete)
   assertNotSupportedInStreamingPlan(
+    "sort - sort before aggregation in Complete output mode",
+    streamRelation.sortBy().groupBy()(Count("*")),
+    Complete,
+    Seq("sort", "aggregat", "complete"))
+  assertNotSupportedInStreamingPlan(
     "sort - sort over aggregated data in Update output mode",
     streamRelation.groupBy()(Count("*")).sortBy(),
     Update,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to