spark git commit: [SPARK-25495][SS] FetchedData.reset should reset all fields

2018-09-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.4 a709718da -> 544f86a69


[SPARK-25495][SS] FetchedData.reset should reset all fields

## What changes were proposed in this pull request?

`FetchedData.reset` should reset `_nextOffsetInFetchedData` and 
`_offsetAfterPoll`. Otherwise it will cause inconsistent cached data and may 
make Kafka connector return wrong results.

## How was this patch tested?

The new unit test.

Closes #22507 from zsxwing/fix-kafka-reset.

Lead-authored-by: Shixiong Zhu 
Co-authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
(cherry picked from commit 66d29870c09e6050dd846336e596faaa8b0d14ad)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/544f86a6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/544f86a6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/544f86a6

Branch: refs/heads/branch-2.4
Commit: 544f86a69bba94dfcb241e41c799ed63ef4210fc
Parents: a709718
Author: Shixiong Zhu 
Authored: Tue Sep 25 11:42:27 2018 -0700
Committer: Shixiong Zhu 
Committed: Tue Sep 25 11:42:39 2018 -0700

--
 .../spark/sql/kafka010/KafkaDataConsumer.scala  |  5 +-
 .../kafka010/KafkaMicroBatchSourceSuite.scala   | 52 
 2 files changed, 56 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/544f86a6/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
index ceb9e31..7b1314b 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
@@ -134,6 +134,8 @@ private[kafka010] case class InternalKafkaConsumer(
 /** Reset the internal pre-fetched data. */
 def reset(): Unit = {
   _records = ju.Collections.emptyListIterator()
+  _nextOffsetInFetchedData = UNKNOWN_OFFSET
+  _offsetAfterPoll = UNKNOWN_OFFSET
 }
 
 /**
@@ -361,8 +363,9 @@ private[kafka010] case class InternalKafkaConsumer(
   if (offset < fetchedData.offsetAfterPoll) {
 // Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. 
Return a record to ask
 // the next call to start from `fetchedData.offsetAfterPoll`.
+val nextOffsetToFetch = fetchedData.offsetAfterPoll
 fetchedData.reset()
-return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
+return fetchedRecord.withRecord(null, nextOffsetToFetch)
   } else {
 // Fetch records from Kafka and update `fetchedData`.
 fetchData(offset, pollTimeoutMs)

http://git-wip-us.apache.org/repos/asf/spark/blob/544f86a6/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 65615fd..e0b6d8c 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -853,6 +853,58 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   )
 }
   }
+
+  test("SPARK-25495: FetchedData.reset should reset all fields") {
+val topic = newTopic()
+val topicPartition = new TopicPartition(topic, 0)
+testUtils.createTopic(topic, partitions = 1)
+
+val ds = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("kafka.isolation.level", "read_committed")
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  .load()
+  .select($"value".as[String])
+
+testUtils.withTranscationalProducer { producer =>
+  producer.beginTransaction()
+  (0 to 3).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.commitTransaction()
+}
+testUtils.waitUntilOffsetAppears(topicPartition, 5)
+
+val q = ds.writeStream.foreachBatch { (ds, epochId) =>
+  if (epochId == 0) {
+// Send more message before the tasks of the current batch start 
reading the current bat

spark git commit: [SPARK-25495][SS] FetchedData.reset should reset all fields

2018-09-25 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 04db03537 -> 66d29870c


[SPARK-25495][SS] FetchedData.reset should reset all fields

## What changes were proposed in this pull request?

`FetchedData.reset` should reset `_nextOffsetInFetchedData` and 
`_offsetAfterPoll`. Otherwise it will cause inconsistent cached data and may 
make Kafka connector return wrong results.

## How was this patch tested?

The new unit test.

Closes #22507 from zsxwing/fix-kafka-reset.

Lead-authored-by: Shixiong Zhu 
Co-authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/66d29870
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/66d29870
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/66d29870

Branch: refs/heads/master
Commit: 66d29870c09e6050dd846336e596faaa8b0d14ad
Parents: 04db035
Author: Shixiong Zhu 
Authored: Tue Sep 25 11:42:27 2018 -0700
Committer: Shixiong Zhu 
Committed: Tue Sep 25 11:42:27 2018 -0700

--
 .../spark/sql/kafka010/KafkaDataConsumer.scala  |  5 +-
 .../kafka010/KafkaMicroBatchSourceSuite.scala   | 52 
 2 files changed, 56 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/66d29870/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
index ceb9e31..7b1314b 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
@@ -134,6 +134,8 @@ private[kafka010] case class InternalKafkaConsumer(
 /** Reset the internal pre-fetched data. */
 def reset(): Unit = {
   _records = ju.Collections.emptyListIterator()
+  _nextOffsetInFetchedData = UNKNOWN_OFFSET
+  _offsetAfterPoll = UNKNOWN_OFFSET
 }
 
 /**
@@ -361,8 +363,9 @@ private[kafka010] case class InternalKafkaConsumer(
   if (offset < fetchedData.offsetAfterPoll) {
 // Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. 
Return a record to ask
 // the next call to start from `fetchedData.offsetAfterPoll`.
+val nextOffsetToFetch = fetchedData.offsetAfterPoll
 fetchedData.reset()
-return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
+return fetchedRecord.withRecord(null, nextOffsetToFetch)
   } else {
 // Fetch records from Kafka and update `fetchedData`.
 fetchData(offset, pollTimeoutMs)

http://git-wip-us.apache.org/repos/asf/spark/blob/66d29870/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index e5f0088..39c2cde 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -874,6 +874,58 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   )
 }
   }
+
+  test("SPARK-25495: FetchedData.reset should reset all fields") {
+val topic = newTopic()
+val topicPartition = new TopicPartition(topic, 0)
+testUtils.createTopic(topic, partitions = 1)
+
+val ds = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("kafka.isolation.level", "read_committed")
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  .load()
+  .select($"value".as[String])
+
+testUtils.withTranscationalProducer { producer =>
+  producer.beginTransaction()
+  (0 to 3).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.commitTransaction()
+}
+testUtils.waitUntilOffsetAppears(topicPartition, 5)
+
+val q = ds.writeStream.foreachBatch { (ds, epochId) =>
+  if (epochId == 0) {
+// Send more message before the tasks of the current batch start 
reading the current batch
+// data, so that the executors will prefetch messages in the next 
batch and drop them. In
+