This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push: new a5d22da [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly a5d22da is described below commit a5d22da1888b8110b490d52d2c36b3fc907254f6 Author: Shixiong Zhu <zsxw...@gmail.com> AuthorDate: Fri Feb 1 11:15:05 2019 -0800 [SPARK-26806][SS] EventTimeStats.merge should handle zeros correctly ## What changes were proposed in this pull request? Right now, EventTimeStats.merge doesn't handle `zero.merge(zero)` correctly. This will make `avg` become `NaN`. And whatever gets merged with the result of `zero.merge(zero)`, `avg` will still be `NaN`. Then finally, we call `NaN.toLong` and get `0`, and the user will see the following incorrect report: ``` "eventTime" : { "avg" : "1970-01-01T00:00:00.000Z", "max" : "2019-01-31T12:57:00.000Z", "min" : "2019-01-30T18:44:04.000Z", "watermark" : "1970-01-01T00:00:00.000Z" } ``` This issue was reported by liancheng . This PR fixes the above issue. ## How was this patch tested? The new unit tests. Closes #23718 from zsxwing/merge-zero. Authored-by: Shixiong Zhu <zsxw...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> (cherry picked from commit 03a928cbecaf38bbbab3e6b957fcbb542771cfbd) Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> --- .../streaming/EventTimeWatermarkExec.scala | 17 +++++++++--- .../sql/streaming/EventTimeWatermarkSuite.scala | 32 ++++++++++++++++++++-- 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index b161651..6fa7ee0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -36,10 +36,19 @@ case class EventTimeStats(var max: Long, var min: Long, var avg: Double, var cou } def merge(that: EventTimeStats): Unit = { - this.max = math.max(this.max, that.max) - this.min = math.min(this.min, that.min) - this.count += that.count - this.avg += (that.avg - this.avg) * that.count / this.count + if (that.count == 0) { + // no-op + } else if (this.count == 0) { + this.max = that.max + this.min = that.min + this.count = that.count + this.avg = that.avg + } else { + this.max = math.max(this.max, that.max) + this.min = math.min(this.min, that.min) + this.count += that.count + this.avg += (that.avg - this.avg) * that.count / this.count + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index d6bef9c..a51f086 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -38,9 +38,9 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche sqlContext.streams.active.foreach(_.stop()) } - test("EventTimeStats") { - val epsilon = 10E-6 + private val epsilon = 10E-6 + test("EventTimeStats") { val stats = EventTimeStats(max = 100, min = 10, avg = 20.0, count = 5) stats.add(80L) stats.max should be (100) @@ -57,7 +57,6 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche } test("EventTimeStats: avg on large values") { - val epsilon = 10E-6 val largeValue = 10000000000L // 10B // Make sure `largeValue` will cause overflow if we use a Long sum to calc avg. assert(largeValue * largeValue != BigInt(largeValue) * BigInt(largeValue)) @@ -75,6 +74,33 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche stats.avg should be ((largeValue + 0.5) +- epsilon) } + test("EventTimeStats: zero merge zero") { + val stats = EventTimeStats.zero + val stats2 = EventTimeStats.zero + stats.merge(stats2) + stats should be (EventTimeStats.zero) + } + + test("EventTimeStats: non-zero merge zero") { + val stats = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3) + val stats2 = EventTimeStats.zero + stats.merge(stats2) + stats.max should be (10L) + stats.min should be (1L) + stats.avg should be (5.0 +- epsilon) + stats.count should be (3L) + } + + test("EventTimeStats: zero merge non-zero") { + val stats = EventTimeStats.zero + val stats2 = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3) + stats.merge(stats2) + stats.max should be (10L) + stats.min should be (1L) + stats.avg should be (5.0 +- epsilon) + stats.count should be (3L) + } + test("error on bad column") { val inputData = MemoryStream[Int].toDF() val e = intercept[AnalysisException] { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org