Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20096#discussion_r160550392
  
    --- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
    @@ -237,85 +378,67 @@ class KafkaSourceSuite extends KafkaSourceTest {
         }
       }
     
    -  test("(de)serialization of initial offsets") {
    +  test("KafkaSource with watermark") {
    +    val now = System.currentTimeMillis()
         val topic = newTopic()
    -    testUtils.createTopic(topic, partitions = 64)
    +    testUtils.createTopic(newTopic(), partitions = 1)
    +    testUtils.sendMessages(topic, Array(1).map(_.toString))
     
    -    val reader = spark
    +    val kafka = spark
           .readStream
           .format("kafka")
           .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    +      .option("kafka.metadata.max.age.ms", "1")
    +      .option("startingOffsets", s"earliest")
           .option("subscribe", topic)
    +      .load()
     
    -    testStream(reader.load)(
    -      makeSureGetOffsetCalled,
    -      StopStream,
    -      StartStream(),
    -      StopStream)
    +    val windowedAggregation = kafka
    +      .withWatermark("timestamp", "10 seconds")
    +      .groupBy(window($"timestamp", "5 seconds") as 'window)
    +      .agg(count("*") as 'count)
    +      .select($"window".getField("start") as 'window, $"count")
    +
    +    val query = windowedAggregation
    +      .writeStream
    +      .format("memory")
    +      .outputMode("complete")
    +      .queryName("kafkaWatermark")
    +      .start()
    +    query.processAllAvailable()
    +    val rows = spark.table("kafkaWatermark").collect()
    +    assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
    +    val row = rows(0)
    +    // We cannot check the exact window start time as it depands on the 
time that messages were
    +    // inserted by the producer. So here we just use a low bound to make 
sure the internal
    +    // conversion works.
    +    assert(
    +      row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000,
    +      s"Unexpected results: $row")
    +    assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row")
    +    query.stop()
       }
    +}
     
    -  test("maxOffsetsPerTrigger") {
    +class KafkaSourceSuiteBase extends KafkaSourceTest {
    +
    +  import testImplicits._
    +
    +  test("(de)serialization of initial offsets") {
    --- End diff --
    
    Is this needed in the common KafkaSourceSuiteBase?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to