[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...

2018-11-29 Thread jose-torres
Github user jose-torres closed the pull request at:

https://github.com/apache/spark/pull/20859


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...

2018-03-19 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20859#discussion_r175585873
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -160,6 +160,19 @@ object UnsupportedOperationChecker {
 case _: InsertIntoDir =>
   throwError("InsertIntoDir is not supported with streaming 
DataFrames/Datasets")
 
+case e: EventTimeWatermark =>
+  val statefulChildren = e.collect {
+case a: Aggregate if a.isStreaming => a
+case d: Deduplicate if d.isStreaming => d
+case f: FlatMapGroupsWithState if f.isStreaming => f
+  }
+  statefulChildren.foreach { statefulNode =>
+if (statefulNode.collectFirst{ case e: EventTimeWatermark => e 
}.isDefined) {
+  throwError("Watermarks both before and after a stateful 
operator in a streaming " +
--- End diff --

WDYT of something like "watermarks may not be present...". Talking about 
"well-defined" seems a bit confusing to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...

2018-03-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20859#discussion_r175580404
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -160,6 +160,19 @@ object UnsupportedOperationChecker {
 case _: InsertIntoDir =>
   throwError("InsertIntoDir is not supported with streaming 
DataFrames/Datasets")
 
+case e: EventTimeWatermark =>
+  val statefulChildren = e.collect {
+case a: Aggregate if a.isStreaming => a
+case d: Deduplicate if d.isStreaming => d
+case f: FlatMapGroupsWithState if f.isStreaming => f
+  }
+  statefulChildren.foreach { statefulNode =>
+if (statefulNode.collectFirst{ case e: EventTimeWatermark => e 
}.isDefined) {
+  throwError("Watermarks both before and after a stateful 
operator in a streaming " +
--- End diff --

This gives the impression that it makes sense but we dont support it. In 
fact, its just ill-defined. May change this to something like ... Multiple 
watermarks before and after stateful operators is not well-defined in a 
streaming query.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...

2018-03-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20859#discussion_r175580451
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 ---
@@ -140,6 +140,21 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
{
 outputMode = Complete,
 expectedMsgs = Seq("distinct aggregation"))
 
+  assertNotSupportedInStreamingPlan(
+"aggregate on both sides of stateful op",
+EventTimeWatermark(
+  attribute,
+  CalendarInterval.fromString("interval 1 second"),
+  Aggregate(
+attributeWithWatermark :: Nil,
+aggExprs("a"),
+EventTimeWatermark(
+  attribute,
+  CalendarInterval.fromString("interval 2 seconds"),
+  streamRelation))),
+outputMode = Append,
+expectedMsgs = Seq("both before and after"))
--- End diff --

Add for other stateful operators as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...

2018-03-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20859#discussion_r175579879
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -160,6 +160,19 @@ object UnsupportedOperationChecker {
 case _: InsertIntoDir =>
   throwError("InsertIntoDir is not supported with streaming 
DataFrames/Datasets")
 
+case e: EventTimeWatermark =>
+  val statefulChildren = e.collect {
+case a: Aggregate if a.isStreaming => a
+case d: Deduplicate if d.isStreaming => d
+case f: FlatMapGroupsWithState if f.isStreaming => f
--- End diff --

Should be for joins as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20859: [SPARK-23702][SS] Forbid watermarks on both sides...

2018-03-19 Thread jose-torres
GitHub user jose-torres opened a pull request:

https://github.com/apache/spark/pull/20859

[SPARK-23702][SS] Forbid watermarks on both sides of stateful streaming 
operators.

## What changes were proposed in this pull request?

Forbid watermarks on both sides of stateful streaming operators.

Multiple sequential watermarks are in general not supported by the 
execution engine; support is only in parallel, e.g. on both sides of a join. We 
can normally resolve this by simply picking the topmost watermark operator  and 
ignoring the rest, but this is not semantically valid when there's a stateful 
operator in between.

## How was this patch tested?

new unit test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jose-torres/spark triggerDisallow

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20859.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20859


commit 051c85afe727f39ba9d505e00e162620f69c808f
Author: Jose Torres 
Date:   2018-03-19T18:48:11Z

disallow watermarks on both sides of stateful




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org