Repository: spark Updated Branches: refs/heads/master 9456176da -> 0d26b3aa5
[SPARK-21546][SS] dropDuplicates should ignore watermark when it's not a key ## What changes were proposed in this pull request? When the watermark is not a column of `dropDuplicates`, right now it will crash. This PR fixed this issue. ## How was this patch tested? The new unit test. Author: Shixiong Zhu <shixi...@databricks.com> Closes #18822 from zsxwing/SPARK-21546. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d26b3aa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d26b3aa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d26b3aa Branch: refs/heads/master Commit: 0d26b3aa55f9cc75096b0e2b309f64fe3270b9a5 Parents: 9456176 Author: Shixiong Zhu <shixi...@databricks.com> Authored: Wed Aug 2 14:02:13 2017 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Wed Aug 2 14:02:13 2017 -0700 ---------------------------------------------------------------------- .../sql/execution/streaming/statefulOperators.scala | 9 +++++++-- .../apache/spark/sql/streaming/DeduplicateSuite.scala | 13 +++++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0d26b3aa/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 6addab6..e463563 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -156,8 +156,13 @@ trait WatermarkSupport extends UnaryExecNode { } /** Predicate based on keys that matches data older than the watermark */ - lazy val watermarkPredicateForKeys: Option[Predicate] = - watermarkExpression.map(newPredicate(_, keyExpressions)) + lazy val watermarkPredicateForKeys: Option[Predicate] = watermarkExpression.flatMap { e => + if (keyExpressions.exists(_.metadata.contains(EventTimeWatermark.delayKey))) { + Some(newPredicate(e, keyExpressions)) + } else { + None + } + } /** Predicate based on the child output that matches data older than the watermark. */ lazy val watermarkPredicateForData: Option[Predicate] = http://git-wip-us.apache.org/repos/asf/spark/blob/0d26b3aa/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala index a15c2cf..e858b7d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DeduplicateSuite.scala @@ -268,4 +268,17 @@ class DeduplicateSuite extends StateStoreMetricsTest with BeforeAndAfterAll { CheckLastBatch(7) ) } + + test("SPARK-21546: dropDuplicates should ignore watermark when it's not a key") { + val input = MemoryStream[(Int, Int)] + val df = input.toDS.toDF("id", "time") + .withColumn("time", $"time".cast("timestamp")) + .withWatermark("time", "1 second") + .dropDuplicates("id") + .select($"id", $"time".cast("long")) + testStream(df)( + AddData(input, 1 -> 1, 1 -> 2, 2 -> 2), + CheckLastBatch(1 -> 1, 2 -> 2) + ) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org