Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r218719952 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -18,222 +18,247 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} +import java.io.Closeable import java.util.concurrent.TimeoutException import scala.collection.JavaConverters._ import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetOutOfRangeException} import org.apache.kafka.common.TopicPartition -import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.internal.Logging -import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange +import org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, CacheKey, UNKNOWN_OFFSET} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ -import org.apache.spark.util.UninterruptibleThread +import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread} + +/** + * This class simplifies the usages of Kafka consumer in Spark SQL Kafka connector. + * + * NOTE: Like KafkaConsumer, this class is not thread-safe. + * NOTE for contributors: It is possible for the instance to be used from multiple callers, + * so all the methods should not rely on current cursor and use seek manually. + */ +private[kafka010] class InternalKafkaConsumer( + val topicPartition: TopicPartition, + val kafkaParams: ju.Map[String, Object]) extends Closeable with Logging { + + val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + + private val consumer = createConsumer -private[kafka010] sealed trait KafkaDataConsumer { /** - * Get the record for the given offset if available. - * - * If the record is invisible (either a - * transaction message, or an aborted message when the consumer's `isolation.level` is - * `read_committed`), it will be skipped and this method will try to fetch next available record - * within [offset, untilOffset). - * - * This method also will try its best to detect data loss. If `failOnDataLoss` is `true`, it will - * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this - * method will try to fetch next available record within [offset, untilOffset). - * - * When this method tries to skip offsets due to either invisible messages or data loss and - * reaches `untilOffset`, it will return `null`. + * Poll messages from Kafka starting from `offset` and returns a pair of "list of consumer record" + * and "offset after poll". The list of consumer record may be empty if the Kafka consumer fetches + * some messages but all of them are not visible messages (either transaction messages, + * or aborted messages when `isolation.level` is `read_committed`). * - * @param offset the offset to fetch. - * @param untilOffset the max offset to fetch. Exclusive. - * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. - * @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at - * offset if available, or throw exception.when `failOnDataLoss` is `false`, - * this method will either return record at offset if available, or return - * the next earliest available record less than untilOffset, or null. It - * will not throw any exception. + * @throws OffsetOutOfRangeException if `offset` is out of range. + * @throws TimeoutException if the consumer position is not changed after polling. It means the + * consumer polls nothing before timeout. */ - def get( - offset: Long, - untilOffset: Long, - pollTimeoutMs: Long, - failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { - internalConsumer.get(offset, untilOffset, pollTimeoutMs, failOnDataLoss) + def fetch(offset: Long, pollTimeoutMs: Long) + : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = { + // Seek to the offset because we may call seekToBeginning or seekToEnd before this. + seek(offset) + val p = consumer.poll(pollTimeoutMs) + val r = p.records(topicPartition) + logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") + val offsetAfterPoll = consumer.position(topicPartition) + logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling") + val fetchedData = (r, offsetAfterPoll) + if (r.isEmpty) { + // We cannot fetch anything after `poll`. Two possible cases: + // - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will + // be thrown. + // - Cannot fetch any data before timeout. `TimeoutException` will be thrown. + // - Fetched something but all of them are not invisible. This is a valid case and let the + // caller handles this. + val range = getAvailableOffsetRange() + if (offset < range.earliest || offset >= range.latest) { + throw new OffsetOutOfRangeException( + Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) + } else if (offset == offsetAfterPoll) { + throw new TimeoutException( + s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds") + } + } + fetchedData } /** * Return the available offset range of the current partition. It's a pair of the earliest offset * and the latest offset. */ - def getAvailableOffsetRange(): AvailableOffsetRange = internalConsumer.getAvailableOffsetRange() + def getAvailableOffsetRange(): AvailableOffsetRange = { + consumer.seekToBeginning(Set(topicPartition).asJava) + val earliestOffset = consumer.position(topicPartition) + consumer.seekToEnd(Set(topicPartition).asJava) + val latestOffset = consumer.position(topicPartition) + AvailableOffsetRange(earliestOffset, latestOffset) + } - /** - * Release this consumer from being further used. Depending on its implementation, - * this consumer will be either finalized, or reset for reuse later. - */ - def release(): Unit + override def close(): Unit = { + consumer.close() + } - /** Reference to the internal implementation that this wrapper delegates to */ - protected def internalConsumer: InternalKafkaConsumer -} + /** Create a KafkaConsumer to fetch records for `topicPartition` */ + private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = { + val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams) + val tps = new ju.ArrayList[TopicPartition]() + tps.add(topicPartition) + c.assign(tps) + c + } + private def seek(offset: Long): Unit = { + logDebug(s"Seeking to $groupId $topicPartition $offset") + consumer.seek(topicPartition, offset) + } +} /** - * A wrapper around Kafka's KafkaConsumer that throws error when data loss is detected. - * This is not for direct use outside this file. + * The internal object to store the fetched data from Kafka consumer and the next offset to poll. + * + * @param _records the pre-fetched Kafka records. + * @param _nextOffsetInFetchedData the next offset in `records`. We use this to verify if we + * should check if the pre-fetched data is still valid. + * @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to + * poll when `records` is drained. */ -private[kafka010] case class InternalKafkaConsumer( - topicPartition: TopicPartition, - kafkaParams: ju.Map[String, Object]) extends Logging { - import InternalKafkaConsumer._ - - /** - * The internal object to store the fetched data from Kafka consumer and the next offset to poll. - * - * @param _records the pre-fetched Kafka records. - * @param _nextOffsetInFetchedData the next offset in `records`. We use this to verify if we - * should check if the pre-fetched data is still valid. - * @param _offsetAfterPoll the Kafka offset after calling `poll`. We will use this offset to - * poll when `records` is drained. - */ - private case class FetchedData( - private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], - private var _nextOffsetInFetchedData: Long, - private var _offsetAfterPoll: Long) { - - def withNewPoll( - records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], - offsetAfterPoll: Long): FetchedData = { - this._records = records - this._nextOffsetInFetchedData = UNKNOWN_OFFSET - this._offsetAfterPoll = offsetAfterPoll - this - } - - /** Whether there are more elements */ - def hasNext: Boolean = _records.hasNext - - /** Move `records` forward and return the next record. */ - def next(): ConsumerRecord[Array[Byte], Array[Byte]] = { - val record = _records.next() - _nextOffsetInFetchedData = record.offset + 1 - record - } +private[kafka010] case class FetchedData( --- End diff -- Yeah, I think the tests are fine with minor suggestions. Thanks!
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org