Repository: spark Updated Branches: refs/heads/branch-2.0 9bfb16a6b -> 24ea16598
[SPARK-15428][SQL] Disable multiple streaming aggregations ## What changes were proposed in this pull request? Incrementalizing plans of with multiple streaming aggregation is tricky and we dont have the necessary support for "delta" to implement correctly. So disabling the support for multiple streaming aggregations. ## How was this patch tested? Additional unit tests Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #13210 from tdas/SPARK-15428. (cherry picked from commit 1ffa608ba5a849739a56047bda8b157b86b08650) Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24ea1659 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24ea1659 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24ea1659 Branch: refs/heads/branch-2.0 Commit: 24ea16598d4d912ac6a4e7961be1b66e82c2c23f Parents: 9bfb16a Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Sun May 22 02:08:18 2016 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Sun May 22 02:08:35 2016 -0700 ---------------------------------------------------------------------- .../analysis/UnsupportedOperationChecker.scala | 18 +++++++-- .../analysis/UnsupportedOperationsSuite.scala | 39 ++++++++++++++------ .../streaming/StreamingAggregationSuite.scala | 19 ---------- 3 files changed, 41 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/24ea1659/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index aadc1d3..0e08bf0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -55,10 +55,20 @@ object UnsupportedOperationChecker { case _: InsertIntoTable => throwError("InsertIntoTable is not supported with streaming DataFrames/Datasets") - case Aggregate(_, _, child) if child.isStreaming && outputMode == Append => - throwError( - "Aggregations are not supported on streaming DataFrames/Datasets in " + - "Append output mode. Consider changing output mode to Update.") + case Aggregate(_, _, child) if child.isStreaming => + if (outputMode == Append) { + throwError( + "Aggregations are not supported on streaming DataFrames/Datasets in " + + "Append output mode. Consider changing output mode to Update.") + } + val moreStreamingAggregates = child.find { + case Aggregate(_, _, grandchild) if grandchild.isStreaming => true + case _ => false + } + if (moreStreamingAggregates.nonEmpty) { + throwError("Multiple streaming aggregations are not supported with " + + "streaming DataFrames/Datasets") + } case Join(left, right, joinType, _) => http://git-wip-us.apache.org/repos/asf/spark/blob/24ea1659/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index 50baebe..674277b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -23,7 +23,8 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types.IntegerType @@ -95,6 +96,26 @@ class UnsupportedOperationsSuite extends SparkFunSuite { outputMode = Append, Seq("aggregation", "append output mode")) + // Multiple streaming aggregations not supported + def aggExprs(name: String): Seq[NamedExpression] = Seq(Count("*").as(name)) + + assertSupportedInStreamingPlan( + "aggregate - multiple batch aggregations", + Aggregate(Nil, aggExprs("c"), Aggregate(Nil, aggExprs("d"), batchRelation)), + Update) + + assertSupportedInStreamingPlan( + "aggregate - multiple aggregations but only one streaming aggregation", + Aggregate(Nil, aggExprs("c"), batchRelation).join( + Aggregate(Nil, aggExprs("d"), streamRelation), joinType = Inner), + Update) + + assertNotSupportedInStreamingPlan( + "aggregate - multiple streaming aggregations", + Aggregate(Nil, aggExprs("c"), Aggregate(Nil, aggExprs("d"), streamRelation)), + outputMode = Update, + expectedMsgs = Seq("multiple streaming aggregations")) + // Inner joins: Stream-stream not supported testBinaryOperationInStreamingPlan( "inner join", @@ -354,17 +375,11 @@ class UnsupportedOperationsSuite extends SparkFunSuite { val e = intercept[AnalysisException] { testBody } - - if (!expectedMsgs.map(_.toLowerCase).forall(e.getMessage.toLowerCase.contains)) { - fail( - s"""Exception message should contain the following substrings: - | - | ${expectedMsgs.mkString("\n ")} - | - |Actual exception message: - | - | ${e.getMessage} - """.stripMargin) + expectedMsgs.foreach { m => + if (!e.getMessage.toLowerCase.contains(m.toLowerCase)) { + fail(s"Exception message should contain: '$m', " + + s"actual exception message:\n\t'${e.getMessage}'") + } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/24ea1659/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 0f5fc9c..7104d01 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -84,25 +84,6 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext with Be ) } - test("multiple aggregations") { - val inputData = MemoryStream[Int] - - val aggregated = - inputData.toDF() - .groupBy($"value") - .agg(count("*") as 'count) - .groupBy($"value" % 2) - .agg(sum($"count")) - .as[(Int, Long)] - - testStream(aggregated)( - AddData(inputData, 1, 2, 3, 4), - CheckLastBatch((0, 2), (1, 2)), - AddData(inputData, 1, 3, 5), - CheckLastBatch((1, 5)) - ) - } - testQuietly("midbatch failure") { val inputData = MemoryStream[Int] FailureSinglton.firstTime = true --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org