Repository: spark Updated Branches: refs/heads/master 592e3a42c -> 1149c4efb
[SPARK-25005][SS] Support non-consecutive offsets for Kafka ## What changes were proposed in this pull request? As the user uses Kafka transactions to write data, the offsets in Kafka will be non-consecutive. It will contains some transaction (commit or abort) markers. In addition, if the consumer's `isolation.level` is `read_committed`, `poll` will not return aborted messages either. Hence, we will see non-consecutive offsets in the date returned by `poll`. However, as `seekToEnd` may move the offset point to these missing offsets, there are 4 possible corner cases we need to support: - The whole batch contains no data messages - The first offset in a batch is not a committed data message - The last offset in a batch is not a committed data message - There is a gap in the middle of a batch They are all covered by the new unit tests. ## How was this patch tested? The new unit tests. Closes #22042 from zsxwing/kafka-transaction-read. Authored-by: Shixiong Zhu <zsxw...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1149c4ef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1149c4ef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1149c4ef Branch: refs/heads/master Commit: 1149c4efbc5ebe5b412d8f9c61558fef59179a9e Parents: 592e3a4 Author: Shixiong Zhu <zsxw...@gmail.com> Authored: Tue Aug 28 08:38:07 2018 -0700 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Tue Aug 28 08:38:07 2018 -0700 ---------------------------------------------------------------------- .../kafka010/KafkaContinuousReadSupport.scala | 2 +- .../spark/sql/kafka010/KafkaDataConsumer.scala | 273 ++++++++++++++----- .../kafka010/KafkaContinuousSourceSuite.scala | 149 +++++++++- .../kafka010/KafkaMicroBatchSourceSuite.scala | 255 ++++++++++++++++- .../spark/sql/kafka010/KafkaRelationSuite.scala | 93 +++++++ .../spark/sql/kafka010/KafkaTestUtils.scala | 22 +- 6 files changed, 720 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1149c4ef/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala index 4a18839..1753a28 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala @@ -227,7 +227,7 @@ class KafkaContinuousPartitionReader( // This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range, // or if it's the endpoint of the data range (i.e. the "true" next offset). - case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] => + case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] => val range = consumer.getAvailableOffsetRange() if (range.latest >= nextKafkaOffset && range.earliest <= nextKafkaOffset) { // retry http://git-wip-us.apache.org/repos/asf/spark/blob/1149c4ef/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index 65046c1..ceb9e31 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -33,9 +33,19 @@ import org.apache.spark.util.UninterruptibleThread private[kafka010] sealed trait KafkaDataConsumer { /** - * Get the record for the given offset if available. Otherwise it will either throw error - * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset), - * or null. + * Get the record for the given offset if available. + * + * If the record is invisible (either a + * transaction message, or an aborted message when the consumer's `isolation.level` is + * `read_committed`), it will be skipped and this method will try to fetch next available record + * within [offset, untilOffset). + * + * This method also will try its best to detect data loss. If `failOnDataLoss` is `true`, it will + * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this + * method will try to fetch next available record within [offset, untilOffset). + * + * When this method tries to skip offsets due to either invisible messages or data loss and + * reaches `untilOffset`, it will return `null`. * * @param offset the offset to fetch. * @param untilOffset the max offset to fetch. Exclusive. @@ -80,6 +90,83 @@ private[kafka010] case class InternalKafkaConsumer( kafkaParams: ju.Map[String, Object]) extends Logging { import InternalKafkaConsumer._ + /** + * The internal object to store the fetched data from Kafka consumer and the next offset to poll. + * + * @param _records the pre-fetched Kafka records. + * @param _nextOffsetInFetchedData the next offset in `records`. We use this to verify if we + * should check if the pre-fetched data is still valid. + * @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to + * poll when `records` is drained. + */ + private case class FetchedData( + private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], + private var _nextOffsetInFetchedData: Long, + private var _offsetAfterPoll: Long) { + + def withNewPoll( + records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], + offsetAfterPoll: Long): FetchedData = { + this._records = records + this._nextOffsetInFetchedData = UNKNOWN_OFFSET + this._offsetAfterPoll = offsetAfterPoll + this + } + + /** Whether there are more elements */ + def hasNext: Boolean = _records.hasNext + + /** Move `records` forward and return the next record. */ + def next(): ConsumerRecord[Array[Byte], Array[Byte]] = { + val record = _records.next() + _nextOffsetInFetchedData = record.offset + 1 + record + } + + /** Move `records` backward and return the previous record. */ + def previous(): ConsumerRecord[Array[Byte], Array[Byte]] = { + assert(_records.hasPrevious, "fetchedData cannot move back") + val record = _records.previous() + _nextOffsetInFetchedData = record.offset + record + } + + /** Reset the internal pre-fetched data. */ + def reset(): Unit = { + _records = ju.Collections.emptyListIterator() + } + + /** + * Returns the next offset in `records`. We use this to verify if we should check if the + * pre-fetched data is still valid. + */ + def nextOffsetInFetchedData: Long = _nextOffsetInFetchedData + + /** + * Returns the next offset to poll after draining the pre-fetched records. + */ + def offsetAfterPoll: Long = _offsetAfterPoll + } + + /** + * The internal object returned by the `fetchRecord` method. If `record` is empty, it means it is + * invisible (either a transaction message, or an aborted message when the consumer's + * `isolation.level` is `read_committed`), and the caller should use `nextOffsetToFetch` to fetch + * instead. + */ + private case class FetchedRecord( + var record: ConsumerRecord[Array[Byte], Array[Byte]], + var nextOffsetToFetch: Long) { + + def withRecord( + record: ConsumerRecord[Array[Byte], Array[Byte]], + nextOffsetToFetch: Long): FetchedRecord = { + this.record = record + this.nextOffsetToFetch = nextOffsetToFetch + this + } + } + private val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] @volatile private var consumer = createConsumer @@ -90,10 +177,21 @@ private[kafka010] case class InternalKafkaConsumer( /** indicate whether this consumer is going to be stopped in the next release */ @volatile var markedForClose = false - /** Iterator to the already fetch data */ - @volatile private var fetchedData = - ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] - @volatile private var nextOffsetInFetchedData = UNKNOWN_OFFSET + /** + * The fetched data returned from Kafka consumer. This is a reusable private object to avoid + * memory allocation. + */ + private val fetchedData = FetchedData( + ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], + UNKNOWN_OFFSET, + UNKNOWN_OFFSET) + + /** + * The fetched record returned from the `fetchRecord` method. This is a reusable private object to + * avoid memory allocation. + */ + private val fetchedRecord: FetchedRecord = FetchedRecord(null, UNKNOWN_OFFSET) + /** Create a KafkaConsumer to fetch records for `topicPartition` */ private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = { @@ -125,20 +223,7 @@ private[kafka010] case class InternalKafkaConsumer( AvailableOffsetRange(earliestOffset, latestOffset) } - /** - * Get the record for the given offset if available. Otherwise it will either throw error - * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset), - * or null. - * - * @param offset the offset to fetch. - * @param untilOffset the max offset to fetch. Exclusive. - * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. - * @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at - * offset if available, or throw exception.when `failOnDataLoss` is `false`, - * this method will either return record at offset if available, or return - * the next earliest available record less than untilOffset, or null. It - * will not throw any exception. - */ + /** @see [[KafkaDataConsumer.get]] */ def get( offset: Long, untilOffset: Long, @@ -147,21 +232,32 @@ private[kafka010] case class InternalKafkaConsumer( ConsumerRecord[Array[Byte], Array[Byte]] = runUninterruptiblyIfPossible { require(offset < untilOffset, s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]") - logDebug(s"Get $groupId $topicPartition nextOffset $nextOffsetInFetchedData requested $offset") + logDebug(s"Get $groupId $topicPartition nextOffset ${fetchedData.nextOffsetInFetchedData} " + + s"requested $offset") // The following loop is basically for `failOnDataLoss = false`. When `failOnDataLoss` is // `false`, first, we will try to fetch the record at `offset`. If no such record exists, then // we will move to the next available offset within `[offset, untilOffset)` and retry. // If `failOnDataLoss` is `true`, the loop body will be executed only once. var toFetchOffset = offset - var consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]] = null + var fetchedRecord: FetchedRecord = null // We want to break out of the while loop on a successful fetch to avoid using "return" // which may cause a NonLocalReturnControl exception when this method is used as a function. var isFetchComplete = false while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) { try { - consumerRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) - isFetchComplete = true + fetchedRecord = fetchRecord(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) + if (fetchedRecord.record != null) { + isFetchComplete = true + } else { + toFetchOffset = fetchedRecord.nextOffsetToFetch + if (toFetchOffset >= untilOffset) { + fetchedData.reset() + toFetchOffset = UNKNOWN_OFFSET + } else { + logDebug(s"Skipped offsets [$offset, $toFetchOffset]") + } + } } catch { case e: OffsetOutOfRangeException => // When there is some error thrown, it's better to use a new consumer to drop all cached @@ -174,9 +270,9 @@ private[kafka010] case class InternalKafkaConsumer( } if (isFetchComplete) { - consumerRecord + fetchedRecord.record } else { - resetFetchedData() + fetchedData.reset() null } } @@ -239,57 +335,73 @@ private[kafka010] case class InternalKafkaConsumer( } /** - * Get the record for the given offset if available. Otherwise it will either throw error - * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset), - * or null. + * Get the fetched record for the given offset if available. + * + * If the record is invisible (either a transaction message, or an aborted message when the + * consumer's `isolation.level` is `read_committed`), it will return a `FetchedRecord` with the + * next offset to fetch. + * + * This method also will try the best to detect data loss. If `failOnDataLoss` is true`, it will + * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this + * method will return `null` if the next available record is within [offset, untilOffset). * * @throws OffsetOutOfRangeException if `offset` is out of range * @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds. */ - private def fetchData( + private def fetchRecord( offset: Long, untilOffset: Long, pollTimeoutMs: Long, - failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { - if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) { - // This is the first fetch, or the last pre-fetched data has been drained. - // Seek to the offset because we may call seekToBeginning or seekToEnd before this. - seek(offset) - poll(pollTimeoutMs) - } - - if (!fetchedData.hasNext()) { - // We cannot fetch anything after `poll`. Two possible cases: - // - `offset` is out of range so that Kafka returns nothing. Just throw - // `OffsetOutOfRangeException` to let the caller handle it. - // - Cannot fetch any data before timeout. TimeoutException will be thrown. - val range = getAvailableOffsetRange() - if (offset < range.earliest || offset >= range.latest) { - throw new OffsetOutOfRangeException( - Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) + failOnDataLoss: Boolean): FetchedRecord = { + if (offset != fetchedData.nextOffsetInFetchedData) { + // This is the first fetch, or the fetched data has been reset. + // Fetch records from Kafka and update `fetchedData`. + fetchData(offset, pollTimeoutMs) + } else if (!fetchedData.hasNext) { // The last pre-fetched data has been drained. + if (offset < fetchedData.offsetAfterPoll) { + // Offsets in [offset, fetchedData.offsetAfterPoll) are invisible. Return a record to ask + // the next call to start from `fetchedData.offsetAfterPoll`. + fetchedData.reset() + return fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll) } else { - throw new TimeoutException( - s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds") + // Fetch records from Kafka and update `fetchedData`. + fetchData(offset, pollTimeoutMs) } + } + + if (!fetchedData.hasNext) { + // When we reach here, we have already tried to poll from Kafka. As `fetchedData` is still + // empty, all messages in [offset, fetchedData.offsetAfterPoll) are invisible. Return a + // record to ask the next call to start from `fetchedData.offsetAfterPoll`. + assert(offset <= fetchedData.offsetAfterPoll, + s"seek to $offset and poll but the offset was reset to ${fetchedData.offsetAfterPoll}") + fetchedRecord.withRecord(null, fetchedData.offsetAfterPoll) } else { val record = fetchedData.next() - nextOffsetInFetchedData = record.offset + 1 // In general, Kafka uses the specified offset as the start point, and tries to fetch the next // available offset. Hence we need to handle offset mismatch. if (record.offset > offset) { + val range = getAvailableOffsetRange() + if (range.earliest <= offset) { + // `offset` is still valid but the corresponding message is invisible. We should skip it + // and jump to `record.offset`. Here we move `fetchedData` back so that the next call of + // `fetchRecord` can just return `record` directly. + fetchedData.previous() + return fetchedRecord.withRecord(null, record.offset) + } // This may happen when some records aged out but their offsets already got verified if (failOnDataLoss) { reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})") // Never happen as "reportDataLoss" will throw an exception - null + throw new IllegalStateException( + "reportDataLoss didn't throw an exception when 'failOnDataLoss' is true") + } else if (record.offset >= untilOffset) { + reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)") + // Set `nextOffsetToFetch` to `untilOffset` to finish the current batch. + fetchedRecord.withRecord(null, untilOffset) } else { - if (record.offset >= untilOffset) { - reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)") - null - } else { - reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})") - record - } + reportDataLoss(false, s"Skip missing records in [$offset, ${record.offset})") + fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData) } } else if (record.offset < offset) { // This should not happen. If it does happen, then we probably misunderstand Kafka internal @@ -297,7 +409,7 @@ private[kafka010] case class InternalKafkaConsumer( throw new IllegalStateException( s"Tried to fetch $offset but the returned record offset was ${record.offset}") } else { - record + fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData) } } } @@ -306,13 +418,7 @@ private[kafka010] case class InternalKafkaConsumer( private def resetConsumer(): Unit = { consumer.close() consumer = createConsumer - resetFetchedData() - } - - /** Reset the internal pre-fetched data. */ - private def resetFetchedData(): Unit = { - nextOffsetInFetchedData = UNKNOWN_OFFSET - fetchedData = ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]] + fetchedData.reset() } /** @@ -346,11 +452,40 @@ private[kafka010] case class InternalKafkaConsumer( consumer.seek(topicPartition, offset) } - private def poll(pollTimeoutMs: Long): Unit = { + /** + * Poll messages from Kafka starting from `offset` and update `fetchedData`. `fetchedData` may be + * empty if the Kafka consumer fetches some messages but all of them are not visible messages + * (either transaction messages, or aborted messages when `isolation.level` is `read_committed`). + * + * @throws OffsetOutOfRangeException if `offset` is out of range. + * @throws TimeoutException if the consumer position is not changed after polling. It means the + * consumer polls nothing before timeout. + */ + private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = { + // Seek to the offset because we may call seekToBeginning or seekToEnd before this. + seek(offset) val p = consumer.poll(pollTimeoutMs) val r = p.records(topicPartition) logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") - fetchedData = r.iterator + val offsetAfterPoll = consumer.position(topicPartition) + logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling") + fetchedData.withNewPoll(r.listIterator, offsetAfterPoll) + if (!fetchedData.hasNext) { + // We cannot fetch anything after `poll`. Two possible cases: + // - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will + // be thrown. + // - Cannot fetch any data before timeout. `TimeoutException` will be thrown. + // - Fetched something but all of them are not invisible. This is a valid case and let the + // caller handles this. + val range = getAvailableOffsetRange() + if (offset < range.earliest || offset >= range.latest) { + throw new OffsetOutOfRangeException( + Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) + } else if (offset == offsetAfterPoll) { + throw new TimeoutException( + s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds") + } + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/1149c4ef/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index 5d68a14..af51021 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -17,12 +17,159 @@ package org.apache.spark.sql.kafka010 +import org.apache.kafka.clients.producer.ProducerRecord + import org.apache.spark.sql.Dataset import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec +import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger import org.apache.spark.sql.streaming.Trigger // Run tests in KafkaSourceSuiteBase in continuous execution mode. -class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuousTest +class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuousTest { + import testImplicits._ + + test("read Kafka transactional messages: read_committed") { + val table = "kafka_continuous_source_test" + withTable(table) { + val topic = newTopic() + testUtils.createTopic(topic) + testUtils.withTranscationalProducer { producer => + val df = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.isolation.level", "read_committed") + .option("startingOffsets", "earliest") + .option("subscribe", topic) + .load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + .map(kv => kv._2.toInt) + + val q = df + .writeStream + .format("memory") + .queryName(table) + .trigger(ContinuousTrigger(100)) + .start() + try { + producer.beginTransaction() + (1 to 5).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + + // Should not read any messages before they are committed + assert(spark.table(table).isEmpty) + + producer.commitTransaction() + + eventually(timeout(streamingTimeout)) { + // Should read all committed messages + checkAnswer(spark.table(table), (1 to 5).toDF) + } + + producer.beginTransaction() + (6 to 10).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.abortTransaction() + + // Should not read aborted messages + checkAnswer(spark.table(table), (1 to 5).toDF) + + producer.beginTransaction() + (11 to 15).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() + + eventually(timeout(streamingTimeout)) { + // Should skip aborted messages and read new committed ones. + checkAnswer(spark.table(table), ((1 to 5) ++ (11 to 15)).toDF) + } + } finally { + q.stop() + } + } + } + } + + test("read Kafka transactional messages: read_uncommitted") { + val table = "kafka_continuous_source_test" + withTable(table) { + val topic = newTopic() + testUtils.createTopic(topic) + testUtils.withTranscationalProducer { producer => + val df = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.isolation.level", "read_uncommitted") + .option("startingOffsets", "earliest") + .option("subscribe", topic) + .load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + .map(kv => kv._2.toInt) + + val q = df + .writeStream + .format("memory") + .queryName(table) + .trigger(ContinuousTrigger(100)) + .start() + try { + producer.beginTransaction() + (1 to 5).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + + eventually(timeout(streamingTimeout)) { + // Should read uncommitted messages + checkAnswer(spark.table(table), (1 to 5).toDF) + } + + producer.commitTransaction() + + eventually(timeout(streamingTimeout)) { + // Should read all committed messages + checkAnswer(spark.table(table), (1 to 5).toDF) + } + + producer.beginTransaction() + (6 to 10).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.abortTransaction() + + eventually(timeout(streamingTimeout)) { + // Should read aborted messages + checkAnswer(spark.table(table), (1 to 10).toDF) + } + + producer.beginTransaction() + (11 to 15).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + + eventually(timeout(streamingTimeout)) { + // Should read all messages including committed, aborted and uncommitted messages + checkAnswer(spark.table(table), (1 to 15).toDF) + } + + producer.commitTransaction() + + eventually(timeout(streamingTimeout)) { + // Should read all messages including committed and aborted messages + checkAnswer(spark.table(table), (1 to 15).toDF) + } + } finally { + q.stop() + } + } + } + } +} class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { import testImplicits._ http://git-wip-us.apache.org/repos/asf/spark/blob/1149c4ef/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 1d14550..eb66cca 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -28,7 +28,7 @@ import scala.collection.JavaConverters._ import scala.io.Source import scala.util.Random -import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} import org.apache.kafka.common.TopicPartition import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods._ @@ -159,6 +159,19 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf s"AddKafkaData(topics = $topics, data = $data, message = $message)" } + object WithOffsetSync { + def apply(topic: String)(func: () => Unit): StreamAction = { + Execute("Run Kafka Producer")(_ => { + func() + // This is a hack for the race condition that the committed message may be not visible to + // consumer for a short time. + // Looks like after the following call returns, the consumer can always read the committed + // messages. + testUtils.getLatestOffsets(Set(topic)) + }) + } + } + private val topicId = new AtomicInteger(0) protected def newTopic(): String = s"topic-${topicId.getAndIncrement()}" } @@ -596,6 +609,246 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } ) } + + test("read Kafka transactional messages: read_committed") { + // This test will cover the following cases: + // 1. the whole batch contains no data messages + // 2. the first offset in a batch is not a committed data message + // 3. the last offset in a batch is not a committed data message + // 4. there is a gap in the middle of a batch + + val topic = newTopic() + testUtils.createTopic(topic, partitions = 1) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_committed") + .option("maxOffsetsPerTrigger", 3) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // Set a short timeout to make the test fast. When a batch doesn't contain any visible data + // messages, "poll" will wait until timeout. + .option("kafkaConsumer.pollTimeoutMs", 5000) + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + + val clock = new StreamManualClock + + // Wait until the manual clock is waiting on further instructions to move forward. Then we can + // ensure all batches we are waiting for have been processed. + val waitUntilBatchProcessed = Execute { q => + eventually(Timeout(streamingTimeout)) { + if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) + } + } + if (q.exception.isDefined) { + throw q.exception.get + } + } + + // The message values are the same as their offsets to make the test easy to follow + testUtils.withTranscationalProducer { producer => + testStream(mapped)( + StartStream(ProcessingTime(100), clock), + waitUntilBatchProcessed, + CheckAnswer(), + WithOffsetSync(topic) { () => + // Send 5 messages. They should be visible only after being committed. + producer.beginTransaction() + (0 to 4).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + // Should not see any uncommitted messages + CheckNewAnswer(), + WithOffsetSync(topic) { () => + producer.commitTransaction() + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckNewAnswer(0, 1, 2), // offset 0, 1, 2 + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a committed data message] + WithOffsetSync(topic) { () => + // Send 5 messages and abort the transaction. They should not be read. + producer.beginTransaction() + (6 to 10).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.abortTransaction() + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckNewAnswer(), // offset: 6*, 7*, 8* + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckNewAnswer(), // offset: 9*, 10*, 11* + WithOffsetSync(topic) { () => + // Send 5 messages again. The consumer should skip the above aborted messages and read + // them. + producer.beginTransaction() + (12 to 16).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckNewAnswer(12, 13, 14), // offset: 12, 13, 14 + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckNewAnswer(15, 16), // offset: 15, 16, 17* + WithOffsetSync(topic) { () => + producer.beginTransaction() + producer.send(new ProducerRecord[String, String](topic, "18")).get() + producer.commitTransaction() + producer.beginTransaction() + producer.send(new ProducerRecord[String, String](topic, "20")).get() + producer.commitTransaction() + producer.beginTransaction() + producer.send(new ProducerRecord[String, String](topic, "22")).get() + producer.send(new ProducerRecord[String, String](topic, "23")).get() + producer.commitTransaction() + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckNewAnswer(18, 20), // offset: 18, 19*, 20 + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckNewAnswer(22, 23), // offset: 21*, 22, 23 + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckNewAnswer() // offset: 24* + ) + } + } + + test("read Kafka transactional messages: read_uncommitted") { + // This test will cover the following cases: + // 1. the whole batch contains no data messages + // 2. the first offset in a batch is not a committed data message + // 3. the last offset in a batch is not a committed data message + // 4. there is a gap in the middle of a batch + + val topic = newTopic() + testUtils.createTopic(topic, partitions = 1) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_uncommitted") + .option("maxOffsetsPerTrigger", 3) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // Set a short timeout to make the test fast. When a batch doesn't contain any visible data + // messages, "poll" will wait until timeout. + .option("kafkaConsumer.pollTimeoutMs", 5000) + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + + val clock = new StreamManualClock + + // Wait until the manual clock is waiting on further instructions to move forward. Then we can + // ensure all batches we are waiting for have been processed. + val waitUntilBatchProcessed = Execute { q => + eventually(Timeout(streamingTimeout)) { + if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) + } + } + if (q.exception.isDefined) { + throw q.exception.get + } + } + + // The message values are the same as their offsets to make the test easy to follow + testUtils.withTranscationalProducer { producer => + testStream(mapped)( + StartStream(ProcessingTime(100), clock), + waitUntilBatchProcessed, + CheckNewAnswer(), + WithOffsetSync(topic) { () => + // Send 5 messages. They should be visible only after being committed. + producer.beginTransaction() + (0 to 4).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckNewAnswer(0, 1, 2), // offset 0, 1, 2 + WithOffsetSync(topic) { () => + producer.commitTransaction() + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a committed data message] + WithOffsetSync(topic) { () => + // Send 5 messages and abort the transaction. They should not be read. + producer.beginTransaction() + (6 to 10).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.abortTransaction() + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckNewAnswer(6, 7, 8), // offset: 6, 7, 8 + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckNewAnswer(9, 10), // offset: 9, 10, 11* + WithOffsetSync(topic) { () => + // Send 5 messages again. The consumer should skip the above aborted messages and read + // them. + producer.beginTransaction() + (12 to 16).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckNewAnswer(12, 13, 14), // offset: 12, 13, 14 + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckNewAnswer(15, 16), // offset: 15, 16, 17* + WithOffsetSync(topic) { () => + producer.beginTransaction() + producer.send(new ProducerRecord[String, String](topic, "18")).get() + producer.commitTransaction() + producer.beginTransaction() + producer.send(new ProducerRecord[String, String](topic, "20")).get() + producer.commitTransaction() + producer.beginTransaction() + producer.send(new ProducerRecord[String, String](topic, "22")).get() + producer.send(new ProducerRecord[String, String](topic, "23")).get() + producer.commitTransaction() + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckNewAnswer(18, 20), // offset: 18, 19*, 20 + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckNewAnswer(22, 23), // offset: 21*, 22, 23 + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckNewAnswer() // offset: 24* + ) + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/1149c4ef/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala index 688e9c4..93dba18 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.kafka010 import java.util.Locale import java.util.concurrent.atomic.AtomicInteger +import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition import org.apache.spark.sql.QueryTest @@ -234,4 +235,96 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest testBadOptions("subscribe" -> "")("no topics to subscribe") testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty") } + + test("read Kafka transactional messages: read_committed") { + val topic = newTopic() + testUtils.createTopic(topic) + testUtils.withTranscationalProducer { producer => + val df = spark + .read + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.isolation.level", "read_committed") + .option("subscribe", topic) + .load() + .selectExpr("CAST(value AS STRING)") + + producer.beginTransaction() + (1 to 5).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + + // Should not read any messages before they are committed + assert(df.isEmpty) + + producer.commitTransaction() + + // Should read all committed messages + checkAnswer(df, (1 to 5).map(_.toString).toDF) + + producer.beginTransaction() + (6 to 10).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.abortTransaction() + + // Should not read aborted messages + checkAnswer(df, (1 to 5).map(_.toString).toDF) + + producer.beginTransaction() + (11 to 15).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() + + // Should skip aborted messages and read new committed ones. + checkAnswer(df, ((1 to 5) ++ (11 to 15)).map(_.toString).toDF) + } + } + + test("read Kafka transactional messages: read_uncommitted") { + val topic = newTopic() + testUtils.createTopic(topic) + testUtils.withTranscationalProducer { producer => + val df = spark + .read + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.isolation.level", "read_uncommitted") + .option("subscribe", topic) + .load() + .selectExpr("CAST(value AS STRING)") + + producer.beginTransaction() + (1 to 5).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + + // "read_uncommitted" should see all messages including uncommitted ones + checkAnswer(df, (1 to 5).map(_.toString).toDF) + + producer.commitTransaction() + + // Should read all committed messages + checkAnswer(df, (1 to 5).map(_.toString).toDF) + + producer.beginTransaction() + (6 to 10).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.abortTransaction() + + // "read_uncommitted" should see all messages including uncommitted or aborted ones + checkAnswer(df, (1 to 10).map(_.toString).toDF) + + producer.beginTransaction() + (11 to 15).foreach { i => + producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() + + // Should read all messages + checkAnswer(df, (1 to 15).map(_.toString).toDF) + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/1149c4ef/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 55d61ef..7b742a3 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010 import java.io.{File, IOException} import java.lang.{Integer => JInt} import java.net.InetSocketAddress -import java.util.{Map => JMap, Properties} +import java.util.{Map => JMap, Properties, UUID} import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -323,9 +323,14 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L props.put("log.flush.interval.messages", "1") props.put("replica.socket.timeout.ms", "1500") props.put("delete.topic.enable", "true") + props.put("group.initial.rebalance.delay.ms", "10") + + // Change the following settings as we have only 1 broker props.put("offsets.topic.num.partitions", "1") props.put("offsets.topic.replication.factor", "1") - props.put("group.initial.rebalance.delay.ms", "10") + props.put("transaction.state.log.replication.factor", "1") + props.put("transaction.state.log.min.isr", "1") + // Can not use properties.putAll(propsMap.asJava) in scala-2.12 // See https://github.com/scala/bug/issues/10418 withBrokerProps.foreach { case (k, v) => props.put(k, v) } @@ -342,6 +347,19 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L props } + /** Call `f` with a `KafkaProducer` that has initialized transactions. */ + def withTranscationalProducer(f: KafkaProducer[String, String] => Unit): Unit = { + val props = producerConfiguration + props.put("transactional.id", UUID.randomUUID().toString) + val producer = new KafkaProducer[String, String](props) + try { + producer.initTransactions() + f(producer) + } finally { + producer.close() + } + } + private def consumerConfiguration: Properties = { val props = new Properties() props.put("bootstrap.servers", brokerAddress) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org