Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22507#discussion_r220061053
  
    --- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
    @@ -874,6 +874,57 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
           )
         }
       }
    +
    +  test("SPARK-25495: FetchedData.reset should reset all fields") {
    +    val topic = newTopic()
    +    val topicPartition = new TopicPartition(topic, 0)
    +    testUtils.createTopic(topic, partitions = 1)
    +
    +    val ds = spark
    +      .readStream
    +      .format("kafka")
    +      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
    +      .option("kafka.metadata.max.age.ms", "1")
    +      .option("kafka.isolation.level", "read_committed")
    +      .option("subscribe", topic)
    +      .option("startingOffsets", "earliest")
    +      .load()
    +      .select($"value".as[String])
    +
    +    testUtils.withTranscationalProducer { producer =>
    +      producer.beginTransaction()
    +      (0 to 3).foreach { i =>
    +        producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
    +      }
    +      producer.commitTransaction()
    +    }
    +    testUtils.waitUntilOffsetAppears(topicPartition, 5)
    +
    +    val q = ds.writeStream.foreachBatch { (ds, epochId) =>
    +      if (epochId == 0) {
    +        // Post more messages to Kafka so that the executors will fetch 
messages in the next batch
    +        // and drop them. In this case, if we forget to reset 
`FetchedData._nextOffsetInFetchedData`
    --- End diff --
    
    Make it clear that you " want to send more message *before* the tasks of 
the current batch start reading the current batch data, so that the executors 
....
    
    also, I am not entirely sure how it causes `fetchedData.reset()` thus 
creating the issue. Are you sure this fails without your fix?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to