[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...
Github user jose-torres closed the pull request at: https://github.com/apache/spark/pull/20859 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20859#discussion_r175585873 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -160,6 +160,19 @@ object UnsupportedOperationChecker { case _: InsertIntoDir => throwError("InsertIntoDir is not supported with streaming DataFrames/Datasets") +case e: EventTimeWatermark => + val statefulChildren = e.collect { +case a: Aggregate if a.isStreaming => a +case d: Deduplicate if d.isStreaming => d +case f: FlatMapGroupsWithState if f.isStreaming => f + } + statefulChildren.foreach { statefulNode => +if (statefulNode.collectFirst{ case e: EventTimeWatermark => e }.isDefined) { + throwError("Watermarks both before and after a stateful operator in a streaming " + --- End diff -- WDYT of something like "watermarks may not be present...". Talking about "well-defined" seems a bit confusing to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20859#discussion_r175580404 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -160,6 +160,19 @@ object UnsupportedOperationChecker { case _: InsertIntoDir => throwError("InsertIntoDir is not supported with streaming DataFrames/Datasets") +case e: EventTimeWatermark => + val statefulChildren = e.collect { +case a: Aggregate if a.isStreaming => a +case d: Deduplicate if d.isStreaming => d +case f: FlatMapGroupsWithState if f.isStreaming => f + } + statefulChildren.foreach { statefulNode => +if (statefulNode.collectFirst{ case e: EventTimeWatermark => e }.isDefined) { + throwError("Watermarks both before and after a stateful operator in a streaming " + --- End diff -- This gives the impression that it makes sense but we dont support it. In fact, its just ill-defined. May change this to something like ... Multiple watermarks before and after stateful operators is not well-defined in a streaming query. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20859#discussion_r175580451 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala --- @@ -140,6 +140,21 @@ class UnsupportedOperationsSuite extends SparkFunSuite { outputMode = Complete, expectedMsgs = Seq("distinct aggregation")) + assertNotSupportedInStreamingPlan( +"aggregate on both sides of stateful op", +EventTimeWatermark( + attribute, + CalendarInterval.fromString("interval 1 second"), + Aggregate( +attributeWithWatermark :: Nil, +aggExprs("a"), +EventTimeWatermark( + attribute, + CalendarInterval.fromString("interval 2 seconds"), + streamRelation))), +outputMode = Append, +expectedMsgs = Seq("both before and after")) --- End diff -- Add for other stateful operators as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20859#discussion_r175579879 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -160,6 +160,19 @@ object UnsupportedOperationChecker { case _: InsertIntoDir => throwError("InsertIntoDir is not supported with streaming DataFrames/Datasets") +case e: EventTimeWatermark => + val statefulChildren = e.collect { +case a: Aggregate if a.isStreaming => a +case d: Deduplicate if d.isStreaming => d +case f: FlatMapGroupsWithState if f.isStreaming => f --- End diff -- Should be for joins as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...
GitHub user jose-torres opened a pull request: https://github.com/apache/spark/pull/20859 [SPARK-23702][SS] Forbid watermarks on both sides of stateful streaming operators. ## What changes were proposed in this pull request? Forbid watermarks on both sides of stateful streaming operators. Multiple sequential watermarks are in general not supported by the execution engine; support is only in parallel, e.g. on both sides of a join. We can normally resolve this by simply picking the topmost watermark operator and ignoring the rest, but this is not semantically valid when there's a stateful operator in between. ## How was this patch tested? new unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/jose-torres/spark triggerDisallow Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20859.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20859 commit 051c85afe727f39ba9d505e00e162620f69c808f Author: Jose Torres Date: 2018-03-19T18:48:11Z disallow watermarks on both sides of stateful --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org