Repository: spark Updated Branches: refs/heads/branch-2.0 9fc053c30 -> 4c2065d0a
[SPARK-19314][SS][CATALYST] Do not allow sort before aggregation in Structured Streaming plan ## What changes were proposed in this pull request? Sort in a streaming plan should be allowed only after a aggregation in complete mode. Currently it is incorrectly allowed when present anywhere in the plan. It gives unpredictable potentially incorrect results. ## How was this patch tested? New test Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #16662 from tdas/SPARK-19314. (cherry picked from commit 552e5f08841828e55f5924f1686825626da8bcd0) Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c2065d0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c2065d0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c2065d0 Branch: refs/heads/branch-2.0 Commit: 4c2065d0ab3009b95e67a92af7ab0ba52905e7e0 Parents: 9fc053c Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Fri Jan 20 14:04:51 2017 -0800 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Fri Jan 20 14:05:23 2017 -0800 ---------------------------------------------------------------------- .../sql/catalyst/analysis/UnsupportedOperationChecker.scala | 2 +- .../sql/catalyst/analysis/UnsupportedOperationsSuite.scala | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4c2065d0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index f6e32e2..e6ac198 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -73,7 +73,7 @@ object UnsupportedOperationChecker { * data. */ def containsCompleteData(subplan: LogicalPlan): Boolean = { - val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a } + val aggs = subplan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a } // Either the subplan has no streaming source, or it has aggregation with Complete mode !subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete) } http://git-wip-us.apache.org/repos/asf/spark/blob/4c2065d0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index ff1bb12..4a156ac 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -186,13 +186,18 @@ class UnsupportedOperationsSuite extends SparkFunSuite { _.intersect(_), streamStreamSupported = false) - // Sort: supported only on batch subplans and on aggregation + complete output mode + // Sort: supported only on batch subplans and after aggregation on streaming plan + complete mode testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _)) assertSupportedInStreamingPlan( - "sort - sort over aggregated data in Complete output mode", + "sort - sort after aggregation in Complete output mode", streamRelation.groupBy()(Count("*")).sortBy(), Complete) assertNotSupportedInStreamingPlan( + "sort - sort before aggregation in Complete output mode", + streamRelation.sortBy().groupBy()(Count("*")), + Complete, + Seq("sort", "aggregat", "complete")) + assertNotSupportedInStreamingPlan( "sort - sort over aggregated data in Update output mode", streamRelation.groupBy()(Count("*")).sortBy(), Update, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org