Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20096#discussion_r160569877
  
    --- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
    @@ -237,85 +378,67 @@ class KafkaSourceSuite extends KafkaSourceTest {
         }
       }
     
    -  test("(de)serialization of initial offsets") {
    +  test("KafkaSource with watermark") {
    +    val now = System.currentTimeMillis()
         val topic = newTopic()
    -    testUtils.createTopic(topic, partitions = 64)
    +    testUtils.createTopic(newTopic(), partitions = 1)
    +    testUtils.sendMessages(topic, Array(1).map(_.toString))
     
    -    val reader = spark
    +    val kafka = spark
           .readStream
           .format("kafka")
           .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    +      .option("kafka.metadata.max.age.ms", "1")
    +      .option("startingOffsets", s"earliest")
           .option("subscribe", topic)
    +      .load()
     
    -    testStream(reader.load)(
    -      makeSureGetOffsetCalled,
    -      StopStream,
    -      StartStream(),
    -      StopStream)
    +    val windowedAggregation = kafka
    +      .withWatermark("timestamp", "10 seconds")
    +      .groupBy(window($"timestamp", "5 seconds") as 'window)
    +      .agg(count("*") as 'count)
    +      .select($"window".getField("start") as 'window, $"count")
    +
    +    val query = windowedAggregation
    +      .writeStream
    +      .format("memory")
    +      .outputMode("complete")
    +      .queryName("kafkaWatermark")
    +      .start()
    +    query.processAllAvailable()
    +    val rows = spark.table("kafkaWatermark").collect()
    +    assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
    +    val row = rows(0)
    +    // We cannot check the exact window start time as it depands on the 
time that messages were
    +    // inserted by the producer. So here we just use a low bound to make 
sure the internal
    +    // conversion works.
    +    assert(
    +      row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000,
    +      s"Unexpected results: $row")
    +    assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row")
    +    query.stop()
       }
    +}
     
    -  test("maxOffsetsPerTrigger") {
    +class KafkaSourceSuiteBase extends KafkaSourceTest {
    +
    +  import testImplicits._
    +
    +  test("(de)serialization of initial offsets") {
         val topic = newTopic()
    -    testUtils.createTopic(topic, partitions = 3)
    -    testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, 
Some(0))
    -    testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, 
Some(1))
    -    testUtils.sendMessages(topic, Array("1"), Some(2))
    +    testUtils.createTopic(topic, partitions = 5)
     
         val reader = spark
           .readStream
           .format("kafka")
           .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    -      .option("kafka.metadata.max.age.ms", "1")
    -      .option("maxOffsetsPerTrigger", 10)
           .option("subscribe", topic)
    -      .option("startingOffsets", "earliest")
    -    val kafka = reader.load()
    -      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    -      .as[(String, String)]
    -    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => 
kv._2.toInt)
    -
    -    val clock = new StreamManualClock
    -
    -    val waitUntilBatchProcessed = AssertOnQuery { q =>
    -      eventually(Timeout(streamingTimeout)) {
    -        if (!q.exception.isDefined) {
    -          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
    -        }
    -      }
    -      if (q.exception.isDefined) {
    -        throw q.exception.get
    -      }
    -      true
    -    }
     
    -    testStream(mapped)(
    -      StartStream(ProcessingTime(100), clock),
    -      waitUntilBatchProcessed,
    -      // 1 from smallest, 1 from middle, 8 from biggest
    -      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
    -      AdvanceManualClock(100),
    -      waitUntilBatchProcessed,
    -      // smallest now empty, 1 more from middle, 9 more from biggest
    -      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
    -        11, 108, 109, 110, 111, 112, 113, 114, 115, 116
    -      ),
    +    testStream(reader.load)(
    +      makeSureGetOffsetCalled,
           StopStream,
    -      StartStream(ProcessingTime(100), clock),
    -      waitUntilBatchProcessed,
    -      // smallest now empty, 1 more from middle, 9 more from biggest
    -      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
    -        11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
    -        12, 117, 118, 119, 120, 121, 122, 123, 124, 125
    -      ),
    -      AdvanceManualClock(100),
    -      waitUntilBatchProcessed,
    -      // smallest now empty, 1 more from middle, 9 more from biggest
    -      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
    -        11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
    -        12, 117, 118, 119, 120, 121, 122, 123, 124, 125,
    -        13, 126, 127, 128, 129, 130, 131, 132, 133, 134
    -      )
    -    )
    +      StartStream(),
    +      StopStream)
       }
     
       test("cannot stop Kafka stream") {
    --- End diff --
    
    I think it makes sense to have a common test verifying the basic "start a 
stream and then stop it" flow, to provide a clear failure in case it's just 
completely broken by some change.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to