Repository: spark
Updated Branches:
  refs/heads/branch-2.4 a709718da -> 544f86a69


[SPARK-25495][SS] FetchedData.reset should reset all fields

## What changes were proposed in this pull request?

`FetchedData.reset` should reset `_nextOffsetInFetchedData` and 
`_offsetAfterPoll`. Otherwise it will cause inconsistent cached data and may 
make Kafka connector return wrong results.

## How was this patch tested?

The new unit test.

Closes #22507 from zsxwing/fix-kafka-reset.

Lead-authored-by: Shixiong Zhu <zsxw...@gmail.com>
Co-authored-by: Shixiong Zhu <shixi...@databricks.com>
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
(cherry picked from commit 66d29870c09e6050dd846336e596faaa8b0d14ad)
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/544f86a6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/544f86a6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/544f86a6

Branch: refs/heads/branch-2.4
Commit: 544f86a69bba94dfcb241e41c799ed63ef4210fc
Parents: a709718
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Tue Sep 25 11:42:27 2018 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Tue Sep 25 11:42:39 2018 -0700

----------------------------------------------------------------------
 .../spark/sql/kafka010/KafkaDataConsumer.scala  |  5 +-
 .../kafka010/KafkaMicroBatchSourceSuite.scala   | 52 ++++++++++++++++++++
 2 files changed, 56 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/544f86a6/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
index ceb9e31..7b1314b 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
@@ -134,6 +134,8 @@ private[kafka010] case class InternalKafkaConsumer(
     /** Reset the internal pre-fetched data. */
     def reset(): Unit = {
       _records = ju.Collections.emptyListIterator()
+      _nextOffsetInFetchedData = UNKNOWN_OFFSET
+      _offsetAfterPoll = UNKNOWN_OFFSET
     }
 
     /**
@@ -361,8 +363,9 @@ private[kafka010] case class InternalKafkaConsumer(
       if (offset < fetchedData.offsetAfterPoll) {
         // Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. 
Return a record to ask
         // the next call to start from `fetchedData.offsetAfterPoll`.
+        val nextOffsetToFetch = fetchedData.offsetAfterPoll
         fetchedData.reset()
-        return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
+        return fetchedRecord.withRecord(null, nextOffsetToFetch)
       } else {
         // Fetch records from Kafka and update `fetchedData`.
         fetchData(offset, pollTimeoutMs)

http://git-wip-us.apache.org/repos/asf/spark/blob/544f86a6/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 65615fd..e0b6d8c 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -853,6 +853,58 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
       )
     }
   }
+
+  test("SPARK-25495: FetchedData.reset should reset all fields") {
+    val topic = newTopic()
+    val topicPartition = new TopicPartition(topic, 0)
+    testUtils.createTopic(topic, partitions = 1)
+
+    val ds = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("kafka.isolation.level", "read_committed")
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+      .load()
+      .select($"value".as[String])
+
+    testUtils.withTranscationalProducer { producer =>
+      producer.beginTransaction()
+      (0 to 3).foreach { i =>
+        producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+      }
+      producer.commitTransaction()
+    }
+    testUtils.waitUntilOffsetAppears(topicPartition, 5)
+
+    val q = ds.writeStream.foreachBatch { (ds, epochId) =>
+      if (epochId == 0) {
+        // Send more message before the tasks of the current batch start 
reading the current batch
+        // data, so that the executors will prefetch messages in the next 
batch and drop them. In
+        // this case, if we forget to reset 
`FetchedData._nextOffsetInFetchedData` or
+        // `FetchedData._offsetAfterPoll` (See SPARK-25495), the next batch 
will see incorrect
+        // values and return wrong results hence fail the test.
+        testUtils.withTranscationalProducer { producer =>
+          producer.beginTransaction()
+          (4 to 7).foreach { i =>
+            producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+          }
+          producer.commitTransaction()
+        }
+        testUtils.waitUntilOffsetAppears(topicPartition, 10)
+        checkDatasetUnorderly(ds, (0 to 3).map(_.toString): _*)
+      } else {
+        checkDatasetUnorderly(ds, (4 to 7).map(_.toString): _*)
+      }
+    }.start()
+    try {
+      q.processAllAvailable()
+    } finally {
+      q.stop()
+    }
+  }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to