Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19239#discussion_r139259901
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
    @@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
         )
       }
     
    +  test("watermark with 2 streams") {
    +    val first = MemoryStream[Int]
    +
    +    val firstDf = first.toDF()
    +      .withColumn("eventTime", $"value".cast("timestamp"))
    +      .withWatermark("eventTime", "10 seconds")
    +      .select('value)
    +
    +    val second = MemoryStream[Int]
    +
    +    val secondDf = second.toDF()
    +      .withColumn("eventTime", $"value".cast("timestamp"))
    +      .withWatermark("eventTime", "5 seconds")
    +      .select('value)
    +
    +    val union = firstDf.union(secondDf)
    +      .writeStream
    +      .format("memory")
    +      .queryName("test")
    +      .start()
    +
    +    def getWatermarkAfterData(
    +        firstData: Seq[Int] = Seq.empty,
    +        secondData: Seq[Int] = Seq.empty): Long = {
    +      if (firstData.nonEmpty) first.addData(firstData)
    +      if (secondData.nonEmpty) second.addData(secondData)
    +      union.processAllAvailable()
    +      // add a dummy batch so lastExecution has the new watermark
    +      first.addData(0)
    +      union.processAllAvailable()
    +      // get last watermark
    +      val lastExecution = 
union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
    +      lastExecution.offsetSeqMetadata.batchWatermarkMs
    +    }
    +
    +    // Global watermark starts at 0 until we get data from both sides
    +    assert(getWatermarkAfterData(firstData = Seq(11)) == 0)
    +    assert(getWatermarkAfterData(secondData = Seq(6)) == 1000)
    +    // Global watermark stays at left watermark 1 when right watermark 
moves to 2
    +    assert(getWatermarkAfterData(secondData = Seq(8)) == 1000)
    +    // Global watermark switches to right side value 2 when left watermark 
goes higher
    +    assert(getWatermarkAfterData(firstData = Seq(21)) == 3000)
    +    // Global watermark goes back to left
    +    assert(getWatermarkAfterData(secondData = Seq(17, 28, 39)) == 11000)
    +    // Global watermark stays on left as long as it's below right
    +    assert(getWatermarkAfterData(firstData = Seq(31)) == 21000)
    +    assert(getWatermarkAfterData(firstData = Seq(41)) == 31000)
    +    // Global watermark switches back to right again
    +    assert(getWatermarkAfterData(firstData = Seq(51)) == 34000)
    +
    +    // Global watermark is updated correctly with simultaneous data from 
both sides
    +    assert(getWatermarkAfterData(firstData = Seq(100), secondData = 
Seq(100)) == 90000)
    +    assert(getWatermarkAfterData(firstData = Seq(120), secondData = 
Seq(110)) == 105000)
    +    assert(getWatermarkAfterData(firstData = Seq(130), secondData = 
Seq(125)) == 120000)
    +
    +    // Global watermark doesn't decrement with simultaneous data
    +    assert(getWatermarkAfterData(firstData = Seq(100), secondData = 
Seq(100)) == 120000)
    +    assert(getWatermarkAfterData(firstData = Seq(140), secondData = 
Seq(100)) == 120000)
    +    assert(getWatermarkAfterData(firstData = Seq(100), secondData = 
Seq(135)) == 130000)
    --- End diff --
    
    test recovery of the minimum after a restart.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to