Repository: spark Updated Branches: refs/heads/branch-2.1 cbc37007a -> 3b648a626
[SPARK-19859][SS] The new watermark should override the old one ## What changes were proposed in this pull request? The new watermark should override the old one. Otherwise, we just pick up the first column which has a watermark, it may be unexpected. ## How was this patch tested? The new test. Author: Shixiong Zhu <shixi...@databricks.com> Closes #17199 from zsxwing/SPARK-19859. (cherry picked from commit d8830c5039d9c7c5ef03631904c32873ab558e22) Signed-off-by: Shixiong Zhu <shixi...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b648a62 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b648a62 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b648a62 Branch: refs/heads/branch-2.1 Commit: 3b648a62626850470f8cceea3f0ec5dfd46e4e33 Parents: cbc3700 Author: Shixiong Zhu <shixi...@databricks.com> Authored: Tue Mar 7 20:34:55 2017 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Tue Mar 7 20:35:08 2017 -0800 ---------------------------------------------------------------------- .../catalyst/plans/logical/EventTimeWatermark.scala | 7 +++++++ .../spark/sql/streaming/EventTimeWatermarkSuite.scala | 14 ++++++++++++++ 2 files changed, 21 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3b648a62/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala index 4224a79..c919cdb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala @@ -42,6 +42,13 @@ case class EventTimeWatermark( .putLong(EventTimeWatermark.delayKey, delay.milliseconds) .build() a.withMetadata(updatedMetadata) + } else if (a.metadata.contains(EventTimeWatermark.delayKey)) { + // Remove existing watermark + val updatedMetadata = new MetadataBuilder() + .withMetadata(a.metadata) + .remove(EventTimeWatermark.delayKey) + .build() + a.withMetadata(updatedMetadata) } else { a } http://git-wip-us.apache.org/repos/asf/spark/blob/3b648a62/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index c34d119..c768525 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.streaming.OutputMode._ @@ -305,6 +306,19 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin ) } + test("the new watermark should override the old one") { + val df = MemoryStream[(Long, Long)].toDF() + .withColumn("first", $"_1".cast("timestamp")) + .withColumn("second", $"_2".cast("timestamp")) + .withWatermark("first", "1 minute") + .withWatermark("second", "2 minutes") + + val eventTimeColumns = df.logicalPlan.output + .filter(_.metadata.contains(EventTimeWatermark.delayKey)) + assert(eventTimeColumns.size === 1) + assert(eventTimeColumns(0).name === "second") + } + private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = AssertOnQuery { q => val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org