Repository: spark
Updated Branches:
  refs/heads/master 592e3a42c -> 1149c4efb


[SPARK-25005][SS] Support non-consecutive offsets for Kafka

## What changes were proposed in this pull request?

As the user uses Kafka transactions to write data, the offsets in Kafka will be 
non-consecutive. It will contains some transaction (commit or abort) markers. 
In addition, if the consumer's `isolation.level` is `read_committed`, `poll` 
will not return aborted messages either. Hence, we will see non-consecutive 
offsets in the date returned by `poll`. However, as `seekToEnd` may move the 
offset point to these missing offsets, there are 4 possible corner cases we 
need to support:

- The whole batch contains no data messages
- The first offset in a batch is not a committed data message
- The last offset in a batch is not a committed data message
- There is a gap in the middle of a batch

They are all covered by the new unit tests.

## How was this patch tested?

The new unit tests.

Closes #22042 from zsxwing/kafka-transaction-read.

Authored-by: Shixiong Zhu <zsxw...@gmail.com>
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1149c4ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1149c4ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1149c4ef

Branch: refs/heads/master
Commit: 1149c4efbc5ebe5b412d8f9c61558fef59179a9e
Parents: 592e3a4
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Tue Aug 28 08:38:07 2018 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Tue Aug 28 08:38:07 2018 -0700

----------------------------------------------------------------------
 .../kafka010/KafkaContinuousReadSupport.scala   |   2 +-
 .../spark/sql/kafka010/KafkaDataConsumer.scala  | 273 ++++++++++++++-----
 .../kafka010/KafkaContinuousSourceSuite.scala   | 149 +++++++++-
 .../kafka010/KafkaMicroBatchSourceSuite.scala   | 255 ++++++++++++++++-
 .../spark/sql/kafka010/KafkaRelationSuite.scala |  93 +++++++
 .../spark/sql/kafka010/KafkaTestUtils.scala     |  22 +-
 6 files changed, 720 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1149c4ef/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
index 4a18839..1753a28 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
@@ -227,7 +227,7 @@ class KafkaContinuousPartitionReader(
 
         // This is a failOnDataLoss exception. Retry if nextKafkaOffset is 
within the data range,
         // or if it's the endpoint of the data range (i.e. the "true" next 
offset).
-        case e: IllegalStateException  if 
e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
+        case e: IllegalStateException if 
e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
           val range = consumer.getAvailableOffsetRange()
           if (range.latest >= nextKafkaOffset && range.earliest <= 
nextKafkaOffset) {
             // retry

http://git-wip-us.apache.org/repos/asf/spark/blob/1149c4ef/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
index 65046c1..ceb9e31 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
@@ -33,9 +33,19 @@ import org.apache.spark.util.UninterruptibleThread
 
 private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available. Otherwise it will 
either throw error
-   * (if failOnDataLoss = true), or return the next available offset within 
[offset, untilOffset),
-   * or null.
+   * Get the record for the given offset if available.
+   *
+   * If the record is invisible (either a
+   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
+   * `read_committed`), it will be skipped and this method will try to fetch 
next available record
+   * within [offset, untilOffset).
+   *
+   * This method also will try its best to detect data loss. If 
`failOnDataLoss` is `true`, it will
+   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
+   * method will try to fetch next available record within [offset, 
untilOffset).
+   *
+   * When this method tries to skip offsets due to either invisible messages 
or data loss and
+   * reaches `untilOffset`, it will return `null`.
    *
    * @param offset         the offset to fetch.
    * @param untilOffset    the max offset to fetch. Exclusive.
@@ -80,6 +90,83 @@ private[kafka010] case class InternalKafkaConsumer(
     kafkaParams: ju.Map[String, Object]) extends Logging {
   import InternalKafkaConsumer._
 
+  /**
+   * The internal object to store the fetched data from Kafka consumer and the 
next offset to poll.
+   *
+   * @param _records the pre-fetched Kafka records.
+   * @param _nextOffsetInFetchedData the next offset in `records`. We use this 
to verify if we
+   *                                 should check if the pre-fetched data is 
still valid.
+   * @param _offsetAfterPoll the Kafka offset after calling `poll`. We will 
use this offset to
+   *                           poll when `records` is drained.
+   */
+  private case class FetchedData(
+      private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+      private var _nextOffsetInFetchedData: Long,
+      private var _offsetAfterPoll: Long) {
+
+    def withNewPoll(
+        records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
+        offsetAfterPoll: Long): FetchedData = {
+      this._records = records
+      this._nextOffsetInFetchedData = UNKNOWN_OFFSET
+      this._offsetAfterPoll = offsetAfterPoll
+      this
+    }
+
+    /** Whether there are more elements */
+    def hasNext: Boolean = _records.hasNext
+
+    /** Move `records` forward and return the next record. */
+    def next(): ConsumerRecord[Array[Byte], Array[Byte]] = {
+      val record = _records.next()
+      _nextOffsetInFetchedData = record.offset + 1
+      record
+    }
+
+    /** Move `records` backward and return the previous record. */
+    def previous(): ConsumerRecord[Array[Byte], Array[Byte]] = {
+      assert(_records.hasPrevious, "fetchedData cannot move back")
+      val record = _records.previous()
+      _nextOffsetInFetchedData = record.offset
+      record
+    }
+
+    /** Reset the internal pre-fetched data. */
+    def reset(): Unit = {
+      _records = ju.Collections.emptyListIterator()
+    }
+
+    /**
+     * Returns the next offset in `records`. We use this to verify if we 
should check if the
+     * pre-fetched data is still valid.
+     */
+    def nextOffsetInFetchedData: Long = _nextOffsetInFetchedData
+
+    /**
+     * Returns the next offset to poll after draining the pre-fetched records.
+     */
+    def offsetAfterPoll: Long = _offsetAfterPoll
+  }
+
+  /**
+   * The internal object returned by the `fetchRecord` method. If `record` is 
empty, it means it is
+   * invisible (either a transaction message, or an aborted message when the 
consumer's
+   * `isolation.level` is `read_committed`), and the caller should use 
`nextOffsetToFetch` to fetch
+   * instead.
+   */
+  private case class FetchedRecord(
+      var record: ConsumerRecord[Array[Byte], Array[Byte]],
+      var nextOffsetToFetch: Long) {
+
+    def withRecord(
+        record: ConsumerRecord[Array[Byte], Array[Byte]],
+        nextOffsetToFetch: Long): FetchedRecord = {
+      this.record = record
+      this.nextOffsetToFetch = nextOffsetToFetch
+      this
+    }
+  }
+
   private val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
 
   @volatile private var consumer = createConsumer
@@ -90,10 +177,21 @@ private[kafka010] case class InternalKafkaConsumer(
   /** indicate whether this consumer is going to be stopped in the next 
release */
   @volatile var markedForClose = false
 
-  /** Iterator to the already fetch data */
-  @volatile private var fetchedData =
-    ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
-  @volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET
+  /**
+   * The fetched data returned from Kafka consumer. This is a reusable private 
object to avoid
+   * memory allocation.
+   */
+  private val fetchedData = FetchedData(
+    ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
+    UNKNOWN_OFFSET,
+    UNKNOWN_OFFSET)
+
+  /**
+   * The fetched record returned from the `fetchRecord` method. This is a 
reusable private object to
+   * avoid memory allocation.
+   */
+  private val fetchedRecord: FetchedRecord = FetchedRecord(null, 
UNKNOWN_OFFSET)
+
 
   /** Create a KafkaConsumer to fetch records for `topicPartition` */
   private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
@@ -125,20 +223,7 @@ private[kafka010] case class InternalKafkaConsumer(
     AvailableOffsetRange(earliestOffset, latestOffset)
   }
 
-  /**
-   * Get the record for the given offset if available. Otherwise it will 
either throw error
-   * (if failOnDataLoss = true), or return the next available offset within 
[offset, untilOffset),
-   * or null.
-   *
-   * @param offset the offset to fetch.
-   * @param untilOffset the max offset to fetch. Exclusive.
-   * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
-   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method will 
either return record at
-   *                       offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
-   *                       this method will either return record at offset if 
available, or return
-   *                       the next earliest available record less than 
untilOffset, or null. It
-   *                       will not throw any exception.
-   */
+  /** @see [[KafkaDataConsumer.get]] */
   def get(
       offset: Long,
       untilOffset: Long,
@@ -147,21 +232,32 @@ private[kafka010] case class InternalKafkaConsumer(
     ConsumerRecord[Array[Byte], Array[Byte]] = runUninterruptiblyIfPossible {
     require(offset < untilOffset,
       s"offset must always be less than untilOffset [offset: $offset, 
untilOffset: $untilOffset]")
-    logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
+    logDebug(s"Get $groupId $topicPartition nextOffset 
${fetchedData.nextOffsetInFetchedData} " +
+      s"requested $offset")
     // The following loop is basically for `failOnDataLoss = false`. When 
`failOnDataLoss` is
     // `false`, first, we will try to fetch the record at `offset`. If no such 
record exists, then
     // we will move to the next available offset within `[offset, 
untilOffset)` and retry.
     // If `failOnDataLoss` is `true`, the loop body will be executed only once.
     var toFetchOffset = offset
-    var consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]] = null
+    var fetchedRecord: FetchedRecord = null
     // We want to break out of the while loop on a successful fetch to avoid 
using "return"
     // which may cause a NonLocalReturnControl exception when this method is 
used as a function.
     var isFetchComplete = false
 
     while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) {
       try {
-        consumerRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
-        isFetchComplete = true
+        fetchedRecord = fetchRecord(toFetchOffset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+        if (fetchedRecord.record != null) {
+          isFetchComplete = true
+        } else {
+          toFetchOffset = fetchedRecord.nextOffsetToFetch
+          if (toFetchOffset >= untilOffset) {
+            fetchedData.reset()
+            toFetchOffset = UNKNOWN_OFFSET
+          } else {
+            logDebug(s"Skipped offsets [$offset, $toFetchOffset]")
+          }
+        }
       } catch {
         case e: OffsetOutOfRangeException =>
           // When there is some error thrown, it's better to use a new 
consumer to drop all cached
@@ -174,9 +270,9 @@ private[kafka010] case class InternalKafkaConsumer(
     }
 
     if (isFetchComplete) {
-      consumerRecord
+      fetchedRecord.record
     } else {
-      resetFetchedData()
+      fetchedData.reset()
       null
     }
   }
@@ -239,57 +335,73 @@ private[kafka010] case class InternalKafkaConsumer(
   }
 
   /**
-   * Get the record for the given offset if available. Otherwise it will 
either throw error
-   * (if failOnDataLoss = true), or return the next available offset within 
[offset, untilOffset),
-   * or null.
+   * Get the fetched record for the given offset if available.
+   *
+   * If the record is invisible (either a  transaction message, or an aborted 
message when the
+   * consumer's `isolation.level` is `read_committed`), it will return a 
`FetchedRecord` with the
+   * next offset to fetch.
+   *
+   * This method also will try the best to detect data loss. If 
`failOnDataLoss` is true`, it will
+   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
+   * method will return `null` if the next available record is within [offset, 
untilOffset).
    *
    * @throws OffsetOutOfRangeException if `offset` is out of range
    * @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` 
milliseconds.
    */
-  private def fetchData(
+  private def fetchRecord(
       offset: Long,
       untilOffset: Long,
       pollTimeoutMs: Long,
-      failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
-    if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
-      // This is the first fetch, or the last pre-fetched data has been 
drained.
-      // Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
-      seek(offset)
-      poll(pollTimeoutMs)
-    }
-
-    if (!fetchedData.hasNext()) {
-      // We cannot fetch anything after `poll`. Two possible cases:
-      // - `offset` is out of range so that Kafka returns nothing. Just throw
-      // `OffsetOutOfRangeException` to let the caller handle it.
-      // - Cannot fetch any data before timeout. TimeoutException will be 
thrown.
-      val range = getAvailableOffsetRange()
-      if (offset < range.earliest || offset >= range.latest) {
-        throw new OffsetOutOfRangeException(
-          Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
+      failOnDataLoss: Boolean): FetchedRecord = {
+    if (offset != fetchedData.nextOffsetInFetchedData) {
+      // This is the first fetch, or the fetched data has been reset.
+      // Fetch records from Kafka and update `fetchedData`.
+      fetchData(offset, pollTimeoutMs)
+    } else if (!fetchedData.hasNext) { // The last pre-fetched data has been 
drained.
+      if (offset < fetchedData.offsetAfterPoll) {
+        // Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. 
Return a record to ask
+        // the next call to start from `fetchedData.offsetAfterPoll`.
+        fetchedData.reset()
+        return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
       } else {
-        throw new TimeoutException(
-          s"Cannot fetch record for offset $offset in $pollTimeoutMs 
milliseconds")
+        // Fetch records from Kafka and update `fetchedData`.
+        fetchData(offset, pollTimeoutMs)
       }
+    }
+
+    if (!fetchedData.hasNext) {
+      // When we reach here, we have already tried to poll from Kafka. As 
`fetchedData` is still
+      // empty, all messages in [offset, fetchedData.offsetAfterPoll) are 
invisible. Return a
+      // record to ask the next call to start from 
`fetchedData.offsetAfterPoll`.
+      assert(offset <= fetchedData.offsetAfterPoll,
+        s"seek to $offset and poll but the offset was reset to 
${fetchedData.offsetAfterPoll}")
+      fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll)
     } else {
       val record = fetchedData.next()
-      nextOffsetInFetchedData = record.offset + 1
       // In general, Kafka uses the specified offset as the start point, and 
tries to fetch the next
       // available offset. Hence we need to handle offset mismatch.
       if (record.offset > offset) {
+        val range = getAvailableOffsetRange()
+        if (range.earliest <= offset) {
+          // `offset` is still valid but the corresponding message is 
invisible. We should skip it
+          // and jump to `record.offset`. Here we move `fetchedData` back so 
that the next call of
+          // `fetchRecord` can just return `record` directly.
+          fetchedData.previous()
+          return fetchedRecord.withRecord(null, record.offset)
+        }
         // This may happen when some records aged out but their offsets 
already got verified
         if (failOnDataLoss) {
           reportDataLoss(true, s"Cannot fetch records in [$offset, 
${record.offset})")
           // Never happen as "reportDataLoss" will throw an exception
-          null
+          throw new IllegalStateException(
+            "reportDataLoss didn't throw an exception when 'failOnDataLoss' is 
true")
+        } else if (record.offset >= untilOffset) {
+          reportDataLoss(false, s"Skip missing records in [$offset, 
$untilOffset)")
+          // Set `nextOffsetToFetch` to `untilOffset` to finish the current 
batch.
+          fetchedRecord.withRecord(null, untilOffset)
         } else {
-          if (record.offset >= untilOffset) {
-            reportDataLoss(false, s"Skip missing records in [$offset, 
$untilOffset)")
-            null
-          } else {
-            reportDataLoss(false, s"Skip missing records in [$offset, 
${record.offset})")
-            record
-          }
+          reportDataLoss(false, s"Skip missing records in [$offset, 
${record.offset})")
+          fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData)
         }
       } else if (record.offset < offset) {
         // This should not happen. If it does happen, then we probably 
misunderstand Kafka internal
@@ -297,7 +409,7 @@ private[kafka010] case class InternalKafkaConsumer(
         throw new IllegalStateException(
           s"Tried to fetch $offset but the returned record offset was 
${record.offset}")
       } else {
-        record
+        fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData)
       }
     }
   }
@@ -306,13 +418,7 @@ private[kafka010] case class InternalKafkaConsumer(
   private def resetConsumer(): Unit = {
     consumer.close()
     consumer = createConsumer
-    resetFetchedData()
-  }
-
-  /** Reset the internal pre-fetched data. */
-  private def resetFetchedData(): Unit = {
-    nextOffsetInFetchedData = UNKNOWN_OFFSET
-    fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]]
+    fetchedData.reset()
   }
 
   /**
@@ -346,11 +452,40 @@ private[kafka010] case class InternalKafkaConsumer(
     consumer.seek(topicPartition, offset)
   }
 
-  private def poll(pollTimeoutMs: Long): Unit = {
+  /**
+   * Poll messages from Kafka starting from `offset` and update `fetchedData`. 
`fetchedData` may be
+   * empty if the Kafka consumer fetches some messages but all of them are not 
visible messages
+   * (either transaction messages, or aborted messages when `isolation.level` 
is `read_committed`).
+   *
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed after 
polling. It means the
+   *                          consumer polls nothing before timeout.
+   */
+  private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = {
+    // Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
+    seek(offset)
     val p = consumer.poll(pollTimeoutMs)
     val r = p.records(topicPartition)
     logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
-    fetchedData = r.iterator
+    val offsetAfterPoll = consumer.position(topicPartition)
+    logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling")
+    fetchedData.withNewPoll(r.listIterator, offsetAfterPoll)
+    if (!fetchedData.hasNext) {
+      // We cannot fetch anything after `poll`. Two possible cases:
+      // - `offset` is out of range so that Kafka returns nothing. 
`OffsetOutOfRangeException` will
+      //   be thrown.
+      // - Cannot fetch any data before timeout. `TimeoutException` will be 
thrown.
+      // - Fetched something but all of them are not invisible. This is a 
valid case and let the
+      //   caller handles this.
+      val range = getAvailableOffsetRange()
+      if (offset < range.earliest || offset >= range.latest) {
+        throw new OffsetOutOfRangeException(
+          Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
+      } else if (offset == offsetAfterPoll) {
+        throw new TimeoutException(
+          s"Cannot fetch record for offset $offset in $pollTimeoutMs 
milliseconds")
+      }
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1149c4ef/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index 5d68a14..af51021 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -17,12 +17,159 @@
 
 package org.apache.spark.sql.kafka010
 
+import org.apache.kafka.clients.producer.ProducerRecord
+
 import org.apache.spark.sql.Dataset
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
+import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.streaming.Trigger
 
 // Run tests in KafkaSourceSuiteBase in continuous execution mode.
-class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with 
KafkaContinuousTest
+class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with 
KafkaContinuousTest {
+  import testImplicits._
+
+  test("read Kafka transactional messages: read_committed") {
+    val table = "kafka_continuous_source_test"
+    withTable(table) {
+      val topic = newTopic()
+      testUtils.createTopic(topic)
+      testUtils.withTranscationalProducer { producer =>
+        val df = spark
+          .readStream
+          .format("kafka")
+          .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+          .option("kafka.isolation.level", "read_committed")
+          .option("startingOffsets", "earliest")
+          .option("subscribe", topic)
+          .load()
+          .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+          .as[(String, String)]
+          .map(kv => kv._2.toInt)
+
+        val q = df
+          .writeStream
+          .format("memory")
+          .queryName(table)
+          .trigger(ContinuousTrigger(100))
+          .start()
+        try {
+          producer.beginTransaction()
+          (1 to 5).foreach { i =>
+            producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+          }
+
+          // Should not read any messages before they are committed
+          assert(spark.table(table).isEmpty)
+
+          producer.commitTransaction()
+
+          eventually(timeout(streamingTimeout)) {
+            // Should read all committed messages
+            checkAnswer(spark.table(table), (1 to 5).toDF)
+          }
+
+          producer.beginTransaction()
+          (6 to 10).foreach { i =>
+            producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+          }
+          producer.abortTransaction()
+
+          // Should not read aborted messages
+          checkAnswer(spark.table(table), (1 to 5).toDF)
+
+          producer.beginTransaction()
+          (11 to 15).foreach { i =>
+            producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+          }
+          producer.commitTransaction()
+
+          eventually(timeout(streamingTimeout)) {
+            // Should skip aborted messages and read new committed ones.
+            checkAnswer(spark.table(table), ((1 to 5) ++ (11 to 15)).toDF)
+          }
+        } finally {
+          q.stop()
+        }
+      }
+    }
+  }
+
+  test("read Kafka transactional messages: read_uncommitted") {
+    val table = "kafka_continuous_source_test"
+    withTable(table) {
+      val topic = newTopic()
+      testUtils.createTopic(topic)
+      testUtils.withTranscationalProducer { producer =>
+        val df = spark
+          .readStream
+          .format("kafka")
+          .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+          .option("kafka.isolation.level", "read_uncommitted")
+          .option("startingOffsets", "earliest")
+          .option("subscribe", topic)
+          .load()
+          .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+          .as[(String, String)]
+          .map(kv => kv._2.toInt)
+
+        val q = df
+          .writeStream
+          .format("memory")
+          .queryName(table)
+          .trigger(ContinuousTrigger(100))
+          .start()
+        try {
+          producer.beginTransaction()
+          (1 to 5).foreach { i =>
+            producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+          }
+
+          eventually(timeout(streamingTimeout)) {
+            // Should read uncommitted messages
+            checkAnswer(spark.table(table), (1 to 5).toDF)
+          }
+
+          producer.commitTransaction()
+
+          eventually(timeout(streamingTimeout)) {
+            // Should read all committed messages
+            checkAnswer(spark.table(table), (1 to 5).toDF)
+          }
+
+          producer.beginTransaction()
+          (6 to 10).foreach { i =>
+            producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+          }
+          producer.abortTransaction()
+
+          eventually(timeout(streamingTimeout)) {
+            // Should read aborted messages
+            checkAnswer(spark.table(table), (1 to 10).toDF)
+          }
+
+          producer.beginTransaction()
+          (11 to 15).foreach { i =>
+            producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+          }
+
+          eventually(timeout(streamingTimeout)) {
+            // Should read all messages including committed, aborted and 
uncommitted messages
+            checkAnswer(spark.table(table), (1 to 15).toDF)
+          }
+
+          producer.commitTransaction()
+
+          eventually(timeout(streamingTimeout)) {
+            // Should read all messages including committed and aborted 
messages
+            checkAnswer(spark.table(table), (1 to 15).toDF)
+          }
+        } finally {
+          q.stop()
+        }
+      }
+    }
+  }
+}
 
 class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
   import testImplicits._

http://git-wip-us.apache.org/repos/asf/spark/blob/1149c4ef/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 1d14550..eb66cca 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -28,7 +28,7 @@ import scala.collection.JavaConverters._
 import scala.io.Source
 import scala.util.Random
 
-import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, 
RecordMetadata}
 import org.apache.kafka.common.TopicPartition
 import org.json4s.DefaultFormats
 import org.json4s.jackson.JsonMethods._
@@ -159,6 +159,19 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext with Kaf
       s"AddKafkaData(topics = $topics, data = $data, message = $message)"
   }
 
+  object WithOffsetSync {
+    def apply(topic: String)(func: () => Unit): StreamAction = {
+      Execute("Run Kafka Producer")(_ => {
+        func()
+        // This is a hack for the race condition that the committed message 
may be not visible to
+        // consumer for a short time.
+        // Looks like after the following call returns, the consumer can 
always read the committed
+        // messages.
+        testUtils.getLatestOffsets(Set(topic))
+      })
+    }
+  }
+
   private val topicId = new AtomicInteger(0)
   protected def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
 }
@@ -596,6 +609,246 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
       }
     )
   }
+
+  test("read Kafka transactional messages: read_committed") {
+    // This test will cover the following cases:
+    // 1. the whole batch contains no data messages
+    // 2. the first offset in a batch is not a committed data message
+    // 3. the last offset in a batch is not a committed data message
+    // 4. there is a gap in the middle of a batch
+
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 1)
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("kafka.isolation.level", "read_committed")
+      .option("maxOffsetsPerTrigger", 3)
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+      // Set a short timeout to make the test fast. When a batch doesn't 
contain any visible data
+      // messages, "poll" will wait until timeout.
+      .option("kafkaConsumer.pollTimeoutMs", 5000)
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+    val clock = new StreamManualClock
+
+    // Wait until the manual clock is waiting on further instructions to move 
forward. Then we can
+    // ensure all batches we are waiting for have been processed.
+    val waitUntilBatchProcessed = Execute { q =>
+      eventually(Timeout(streamingTimeout)) {
+        if (!q.exception.isDefined) {
+          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+        }
+      }
+      if (q.exception.isDefined) {
+        throw q.exception.get
+      }
+    }
+
+    // The message values are the same as their offsets to make the test easy 
to follow
+    testUtils.withTranscationalProducer { producer =>
+      testStream(mapped)(
+        StartStream(ProcessingTime(100), clock),
+        waitUntilBatchProcessed,
+        CheckAnswer(),
+        WithOffsetSync(topic) { () =>
+          // Send 5 messages. They should be visible only after being 
committed.
+          producer.beginTransaction()
+          (0 to 4).foreach { i =>
+            producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+          }
+        },
+        AdvanceManualClock(100),
+        waitUntilBatchProcessed,
+        // Should not see any uncommitted messages
+        CheckNewAnswer(),
+        WithOffsetSync(topic) { () =>
+          producer.commitTransaction()
+        },
+        AdvanceManualClock(100),
+        waitUntilBatchProcessed,
+        CheckNewAnswer(0, 1, 2), // offset 0, 1, 2
+        AdvanceManualClock(100),
+        waitUntilBatchProcessed,
+        CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a 
committed data message]
+        WithOffsetSync(topic) { () =>
+          // Send 5 messages and abort the transaction. They should not be 
read.
+          producer.beginTransaction()
+          (6 to 10).foreach { i =>
+            producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+          }
+          producer.abortTransaction()
+        },
+        AdvanceManualClock(100),
+        waitUntilBatchProcessed,
+        CheckNewAnswer(), // offset: 6*, 7*, 8*
+        AdvanceManualClock(100),
+        waitUntilBatchProcessed,
+        CheckNewAnswer(), // offset: 9*, 10*, 11*
+        WithOffsetSync(topic) { () =>
+          // Send 5 messages again. The consumer should skip the above aborted 
messages and read
+          // them.
+          producer.beginTransaction()
+          (12 to 16).foreach { i =>
+            producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+          }
+          producer.commitTransaction()
+        },
+        AdvanceManualClock(100),
+        waitUntilBatchProcessed,
+        CheckNewAnswer(12, 13, 14), // offset: 12, 13, 14
+        AdvanceManualClock(100),
+        waitUntilBatchProcessed,
+        CheckNewAnswer(15, 16),  // offset: 15, 16, 17*
+        WithOffsetSync(topic) { () =>
+          producer.beginTransaction()
+          producer.send(new ProducerRecord[String, String](topic, "18")).get()
+          producer.commitTransaction()
+          producer.beginTransaction()
+          producer.send(new ProducerRecord[String, String](topic, "20")).get()
+          producer.commitTransaction()
+          producer.beginTransaction()
+          producer.send(new ProducerRecord[String, String](topic, "22")).get()
+          producer.send(new ProducerRecord[String, String](topic, "23")).get()
+          producer.commitTransaction()
+        },
+        AdvanceManualClock(100),
+        waitUntilBatchProcessed,
+        CheckNewAnswer(18, 20), // offset: 18, 19*, 20
+        AdvanceManualClock(100),
+        waitUntilBatchProcessed,
+        CheckNewAnswer(22, 23), // offset: 21*, 22, 23
+        AdvanceManualClock(100),
+        waitUntilBatchProcessed,
+        CheckNewAnswer() // offset: 24*
+      )
+    }
+  }
+
+  test("read Kafka transactional messages: read_uncommitted") {
+    // This test will cover the following cases:
+    // 1. the whole batch contains no data messages
+    // 2. the first offset in a batch is not a committed data message
+    // 3. the last offset in a batch is not a committed data message
+    // 4. there is a gap in the middle of a batch
+
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 1)
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("kafka.isolation.level", "read_uncommitted")
+      .option("maxOffsetsPerTrigger", 3)
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+      // Set a short timeout to make the test fast. When a batch doesn't 
contain any visible data
+      // messages, "poll" will wait until timeout.
+      .option("kafkaConsumer.pollTimeoutMs", 5000)
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+    val clock = new StreamManualClock
+
+    // Wait until the manual clock is waiting on further instructions to move 
forward. Then we can
+    // ensure all batches we are waiting for have been processed.
+    val waitUntilBatchProcessed = Execute { q =>
+      eventually(Timeout(streamingTimeout)) {
+        if (!q.exception.isDefined) {
+          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+        }
+      }
+      if (q.exception.isDefined) {
+        throw q.exception.get
+      }
+    }
+
+    // The message values are the same as their offsets to make the test easy 
to follow
+    testUtils.withTranscationalProducer { producer =>
+      testStream(mapped)(
+        StartStream(ProcessingTime(100), clock),
+        waitUntilBatchProcessed,
+        CheckNewAnswer(),
+        WithOffsetSync(topic) { () =>
+          // Send 5 messages. They should be visible only after being 
committed.
+          producer.beginTransaction()
+          (0 to 4).foreach { i =>
+            producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+          }
+        },
+        AdvanceManualClock(100),
+        waitUntilBatchProcessed,
+        CheckNewAnswer(0, 1, 2), // offset 0, 1, 2
+        WithOffsetSync(topic) { () =>
+          producer.commitTransaction()
+        },
+        AdvanceManualClock(100),
+        waitUntilBatchProcessed,
+        CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a 
committed data message]
+        WithOffsetSync(topic) { () =>
+          // Send 5 messages and abort the transaction. They should not be 
read.
+          producer.beginTransaction()
+          (6 to 10).foreach { i =>
+            producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+          }
+          producer.abortTransaction()
+        },
+        AdvanceManualClock(100),
+        waitUntilBatchProcessed,
+        CheckNewAnswer(6, 7, 8), // offset: 6, 7, 8
+        AdvanceManualClock(100),
+        waitUntilBatchProcessed,
+        CheckNewAnswer(9, 10), // offset: 9, 10, 11*
+        WithOffsetSync(topic) { () =>
+          // Send 5 messages again. The consumer should skip the above aborted 
messages and read
+          // them.
+          producer.beginTransaction()
+          (12 to 16).foreach { i =>
+            producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+          }
+          producer.commitTransaction()
+        },
+        AdvanceManualClock(100),
+        waitUntilBatchProcessed,
+        CheckNewAnswer(12, 13, 14), // offset: 12, 13, 14
+        AdvanceManualClock(100),
+        waitUntilBatchProcessed,
+        CheckNewAnswer(15, 16),  // offset: 15, 16, 17*
+        WithOffsetSync(topic) { () =>
+          producer.beginTransaction()
+          producer.send(new ProducerRecord[String, String](topic, "18")).get()
+          producer.commitTransaction()
+          producer.beginTransaction()
+          producer.send(new ProducerRecord[String, String](topic, "20")).get()
+          producer.commitTransaction()
+          producer.beginTransaction()
+          producer.send(new ProducerRecord[String, String](topic, "22")).get()
+          producer.send(new ProducerRecord[String, String](topic, "23")).get()
+          producer.commitTransaction()
+        },
+        AdvanceManualClock(100),
+        waitUntilBatchProcessed,
+        CheckNewAnswer(18, 20), // offset: 18, 19*, 20
+        AdvanceManualClock(100),
+        waitUntilBatchProcessed,
+        CheckNewAnswer(22, 23), // offset: 21*, 22, 23
+        AdvanceManualClock(100),
+        waitUntilBatchProcessed,
+        CheckNewAnswer() // offset: 24*
+      )
+    }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1149c4ef/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
index 688e9c4..93dba18 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.kafka010
 import java.util.Locale
 import java.util.concurrent.atomic.AtomicInteger
 
+import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.sql.QueryTest
@@ -234,4 +235,96 @@ class KafkaRelationSuite extends QueryTest with 
SharedSQLContext with KafkaTest
     testBadOptions("subscribe" -> "")("no topics to subscribe")
     testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
   }
+
+  test("read Kafka transactional messages: read_committed") {
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+    testUtils.withTranscationalProducer { producer =>
+      val df = spark
+        .read
+        .format("kafka")
+        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+        .option("kafka.isolation.level", "read_committed")
+        .option("subscribe", topic)
+        .load()
+        .selectExpr("CAST(value AS STRING)")
+
+      producer.beginTransaction()
+      (1 to 5).foreach { i =>
+        producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+      }
+
+      // Should not read any messages before they are committed
+      assert(df.isEmpty)
+
+      producer.commitTransaction()
+
+      // Should read all committed messages
+      checkAnswer(df, (1 to 5).map(_.toString).toDF)
+
+      producer.beginTransaction()
+      (6 to 10).foreach { i =>
+        producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+      }
+      producer.abortTransaction()
+
+      // Should not read aborted messages
+      checkAnswer(df, (1 to 5).map(_.toString).toDF)
+
+      producer.beginTransaction()
+      (11 to 15).foreach { i =>
+        producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+      }
+      producer.commitTransaction()
+
+      // Should skip aborted messages and read new committed ones.
+      checkAnswer(df, ((1 to 5) ++ (11 to 15)).map(_.toString).toDF)
+    }
+  }
+
+  test("read Kafka transactional messages: read_uncommitted") {
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+    testUtils.withTranscationalProducer { producer =>
+      val df = spark
+        .read
+        .format("kafka")
+        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+        .option("kafka.isolation.level", "read_uncommitted")
+        .option("subscribe", topic)
+        .load()
+        .selectExpr("CAST(value AS STRING)")
+
+      producer.beginTransaction()
+      (1 to 5).foreach { i =>
+        producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+      }
+
+      // "read_uncommitted" should see all messages including uncommitted ones
+      checkAnswer(df, (1 to 5).map(_.toString).toDF)
+
+      producer.commitTransaction()
+
+      // Should read all committed messages
+      checkAnswer(df, (1 to 5).map(_.toString).toDF)
+
+      producer.beginTransaction()
+      (6 to 10).foreach { i =>
+        producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+      }
+      producer.abortTransaction()
+
+      // "read_uncommitted" should see all messages including uncommitted or 
aborted ones
+      checkAnswer(df, (1 to 10).map(_.toString).toDF)
+
+      producer.beginTransaction()
+      (11 to 15).foreach { i =>
+        producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+      }
+      producer.commitTransaction()
+
+      // Should read all messages
+      checkAnswer(df, (1 to 15).map(_.toString).toDF)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1149c4ef/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 55d61ef..7b742a3 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
 import java.io.{File, IOException}
 import java.lang.{Integer => JInt}
 import java.net.InetSocketAddress
-import java.util.{Map => JMap, Properties}
+import java.util.{Map => JMap, Properties, UUID}
 import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters._
@@ -323,9 +323,14 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] 
= Map.empty) extends L
     props.put("log.flush.interval.messages", "1")
     props.put("replica.socket.timeout.ms", "1500")
     props.put("delete.topic.enable", "true")
+    props.put("group.initial.rebalance.delay.ms", "10")
+
+    // Change the following settings as we have only 1 broker
     props.put("offsets.topic.num.partitions", "1")
     props.put("offsets.topic.replication.factor", "1")
-    props.put("group.initial.rebalance.delay.ms", "10")
+    props.put("transaction.state.log.replication.factor", "1")
+    props.put("transaction.state.log.min.isr", "1")
+
     // Can not use properties.putAll(propsMap.asJava) in scala-2.12
     // See https://github.com/scala/bug/issues/10418
     withBrokerProps.foreach { case (k, v) => props.put(k, v) }
@@ -342,6 +347,19 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] 
= Map.empty) extends L
     props
   }
 
+  /** Call `f` with a `KafkaProducer` that has initialized transactions. */
+  def withTranscationalProducer(f: KafkaProducer[String, String] => Unit): 
Unit = {
+    val props = producerConfiguration
+    props.put("transactional.id", UUID.randomUUID().toString)
+    val producer = new KafkaProducer[String, String](props)
+    try {
+      producer.initTransactions()
+      f(producer)
+    } finally {
+      producer.close()
+    }
+  }
+
   private def consumerConfiguration: Properties = {
     val props = new Properties()
     props.put("bootstrap.servers", brokerAddress)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to