Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139259901 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } + test("watermark with 2 streams") { + val first = MemoryStream[Int] + + val firstDf = first.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + + val second = MemoryStream[Int] + + val secondDf = second.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "5 seconds") + .select('value) + + val union = firstDf.union(secondDf) + .writeStream + .format("memory") + .queryName("test") + .start() + + def getWatermarkAfterData( + firstData: Seq[Int] = Seq.empty, + secondData: Seq[Int] = Seq.empty): Long = { + if (firstData.nonEmpty) first.addData(firstData) + if (secondData.nonEmpty) second.addData(secondData) + union.processAllAvailable() + // add a dummy batch so lastExecution has the new watermark + first.addData(0) + union.processAllAvailable() + // get last watermark + val lastExecution = union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution + lastExecution.offsetSeqMetadata.batchWatermarkMs + } + + // Global watermark starts at 0 until we get data from both sides + assert(getWatermarkAfterData(firstData = Seq(11)) == 0) + assert(getWatermarkAfterData(secondData = Seq(6)) == 1000) + // Global watermark stays at left watermark 1 when right watermark moves to 2 + assert(getWatermarkAfterData(secondData = Seq(8)) == 1000) + // Global watermark switches to right side value 2 when left watermark goes higher + assert(getWatermarkAfterData(firstData = Seq(21)) == 3000) + // Global watermark goes back to left + assert(getWatermarkAfterData(secondData = Seq(17, 28, 39)) == 11000) + // Global watermark stays on left as long as it's below right + assert(getWatermarkAfterData(firstData = Seq(31)) == 21000) + assert(getWatermarkAfterData(firstData = Seq(41)) == 31000) + // Global watermark switches back to right again + assert(getWatermarkAfterData(firstData = Seq(51)) == 34000) + + // Global watermark is updated correctly with simultaneous data from both sides + assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 90000) + assert(getWatermarkAfterData(firstData = Seq(120), secondData = Seq(110)) == 105000) + assert(getWatermarkAfterData(firstData = Seq(130), secondData = Seq(125)) == 120000) + + // Global watermark doesn't decrement with simultaneous data + assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(100)) == 120000) + assert(getWatermarkAfterData(firstData = Seq(140), secondData = Seq(100)) == 120000) + assert(getWatermarkAfterData(firstData = Seq(100), secondData = Seq(135)) == 130000) --- End diff -- test recovery of the minimum after a restart.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org