Repository: spark Updated Branches: refs/heads/master 04db03537 -> 66d29870c
[SPARK-25495][SS] FetchedData.reset should reset all fields ## What changes were proposed in this pull request? `FetchedData.reset` should reset `_nextOffsetInFetchedData` and `_offsetAfterPoll`. Otherwise it will cause inconsistent cached data and may make Kafka connector return wrong results. ## How was this patch tested? The new unit test. Closes #22507 from zsxwing/fix-kafka-reset. Lead-authored-by: Shixiong Zhu <zsxw...@gmail.com> Co-authored-by: Shixiong Zhu <shixi...@databricks.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/66d29870 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/66d29870 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/66d29870 Branch: refs/heads/master Commit: 66d29870c09e6050dd846336e596faaa8b0d14ad Parents: 04db035 Author: Shixiong Zhu <zsxw...@gmail.com> Authored: Tue Sep 25 11:42:27 2018 -0700 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Tue Sep 25 11:42:27 2018 -0700 ---------------------------------------------------------------------- .../spark/sql/kafka010/KafkaDataConsumer.scala | 5 +- .../kafka010/KafkaMicroBatchSourceSuite.scala | 52 ++++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/66d29870/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index ceb9e31..7b1314b 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -134,6 +134,8 @@ private[kafka010] case class InternalKafkaConsumer( /** Reset the internal pre-fetched data. */ def reset(): Unit = { _records = ju.Collections.emptyListIterator() + _nextOffsetInFetchedData = UNKNOWN_OFFSET + _offsetAfterPoll = UNKNOWN_OFFSET } /** @@ -361,8 +363,9 @@ private[kafka010] case class InternalKafkaConsumer( if (offset < fetchedData.offsetAfterPoll) { // Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. Return a record to ask // the next call to start from `fetchedData.offsetAfterPoll`. + val nextOffsetToFetch = fetchedData.offsetAfterPoll fetchedData.reset() - return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll) + return fetchedRecord.withRecord(null, nextOffsetToFetch) } else { // Fetch records from Kafka and update `fetchedData`. fetchData(offset, pollTimeoutMs) http://git-wip-us.apache.org/repos/asf/spark/blob/66d29870/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index e5f0088..39c2cde 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -874,6 +874,58 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { ) } } + + test("SPARK-25495: FetchedData.reset should reset all fields") { + val topic = newTopic() + val topicPartition = new TopicPartition(topic, 0) + testUtils.createTopic(topic, partitions = 1) + + val ds = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_committed") + .option("subscribe", topic) + .option("startingOffsets", "earliest") + .load() + .select($"value".as[String]) + + testUtils.withTranscationalProducer { producer => + producer.beginTransaction() + (0 to 3).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() + } + testUtils.waitUntilOffsetAppears(topicPartition, 5) + + val q = ds.writeStream.foreachBatch { (ds, epochId) => + if (epochId == 0) { + // Send more message before the tasks of the current batch start reading the current batch + // data, so that the executors will prefetch messages in the next batch and drop them. In + // this case, if we forget to reset `FetchedData._nextOffsetInFetchedData` or + // `FetchedData._offsetAfterPoll` (See SPARK-25495), the next batch will see incorrect + // values and return wrong results hence fail the test. + testUtils.withTranscationalProducer { producer => + producer.beginTransaction() + (4 to 7).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() + } + testUtils.waitUntilOffsetAppears(topicPartition, 10) + checkDatasetUnorderly(ds, (0 to 3).map(_.toString): _*) + } else { + checkDatasetUnorderly(ds, (4 to 7).map(_.toString): _*) + } + }.start() + try { + q.processAllAvailable() + } finally { + q.stop() + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org