Repository: spark
Updated Branches:
  refs/heads/branch-2.1 cbc37007a -> 3b648a626


[SPARK-19859][SS] The new watermark should override the old one

## What changes were proposed in this pull request?

The new watermark should override the old one. Otherwise, we just pick up the 
first column which has a watermark, it may be unexpected.

## How was this patch tested?

The new test.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #17199 from zsxwing/SPARK-19859.

(cherry picked from commit d8830c5039d9c7c5ef03631904c32873ab558e22)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b648a62
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b648a62
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b648a62

Branch: refs/heads/branch-2.1
Commit: 3b648a62626850470f8cceea3f0ec5dfd46e4e33
Parents: cbc3700
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Tue Mar 7 20:34:55 2017 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Mar 7 20:35:08 2017 -0800

----------------------------------------------------------------------
 .../catalyst/plans/logical/EventTimeWatermark.scala   |  7 +++++++
 .../spark/sql/streaming/EventTimeWatermarkSuite.scala | 14 ++++++++++++++
 2 files changed, 21 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3b648a62/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
index 4224a79..c919cdb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
@@ -42,6 +42,13 @@ case class EventTimeWatermark(
         .putLong(EventTimeWatermark.delayKey, delay.milliseconds)
         .build()
       a.withMetadata(updatedMetadata)
+    } else if (a.metadata.contains(EventTimeWatermark.delayKey)) {
+      // Remove existing watermark
+      val updatedMetadata = new MetadataBuilder()
+        .withMetadata(a.metadata)
+        .remove(EventTimeWatermark.delayKey)
+        .build()
+      a.withMetadata(updatedMetadata)
     } else {
       a
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/3b648a62/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index c34d119..c768525 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.streaming.OutputMode._
@@ -305,6 +306,19 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Loggin
     )
   }
 
+  test("the new watermark should override the old one") {
+    val df = MemoryStream[(Long, Long)].toDF()
+      .withColumn("first", $"_1".cast("timestamp"))
+      .withColumn("second", $"_2".cast("timestamp"))
+      .withWatermark("first", "1 minute")
+      .withWatermark("second", "2 minutes")
+
+    val eventTimeColumns = df.logicalPlan.output
+      .filter(_.metadata.contains(EventTimeWatermark.delayKey))
+    assert(eventTimeColumns.size === 1)
+    assert(eventTimeColumns(0).name === "second")
+  }
+
   private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = 
AssertOnQuery { q =>
     val progressWithData = q.recentProgress.filter(_.numInputRows > 
0).lastOption.get
     assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to