Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22138#discussion_r219367280
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
    @@ -18,222 +18,247 @@
     package org.apache.spark.sql.kafka010
     
     import java.{util => ju}
    +import java.io.Closeable
     import java.util.concurrent.TimeoutException
     
     import scala.collection.JavaConverters._
     
     import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
     import org.apache.kafka.common.TopicPartition
     
    -import org.apache.spark.{SparkEnv, SparkException, TaskContext}
    +import org.apache.spark.TaskContext
     import org.apache.spark.internal.Logging
    -import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
    +import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, 
CacheKey, UNKNOWN_OFFSET}
     import org.apache.spark.sql.kafka010.KafkaSourceProvider._
    -import org.apache.spark.util.UninterruptibleThread
    +import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
    +
    +/**
    + * This class simplifies the usages of Kafka consumer in Spark SQL Kafka 
connector.
    + *
    + * NOTE: Like KafkaConsumer, this class is not thread-safe.
    + * NOTE for contributors: It is possible for the instance to be used from 
multiple callers,
    + * so all the methods should not rely on current cursor and use seek 
manually.
    + */
    +private[kafka010] class InternalKafkaConsumer(
    +    val topicPartition: TopicPartition,
    +    val kafkaParams: ju.Map[String, Object]) extends Closeable with 
Logging {
    +
    +  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    +
    +  private val consumer = createConsumer
     
    -private[kafka010] sealed trait KafkaDataConsumer {
       /**
    -   * Get the record for the given offset if available.
    -   *
    -   * If the record is invisible (either a
    -   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
    -   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
    -   * within [offset, untilOffset).
    -   *
    -   * This method also will try its best to detect data loss. If 
`failOnDataLoss` is `true`, it will
    -   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
    -   * method will try to fetch next available record within [offset, 
untilOffset).
    -   *
    -   * When this method tries to skip offsets due to either invisible 
messages or data loss and
    -   * reaches `untilOffset`, it will return `null`.
    +   * Poll messages from Kafka starting from `offset` and returns a pair of 
"list of consumer record"
    +   * and "offset after poll". The list of consumer record may be empty if 
the Kafka consumer fetches
    +   * some messages but all of them are not visible messages (either 
transaction messages,
    +   * or aborted messages when `isolation.level` is `read_committed`).
        *
    -   * @param offset         the offset to fetch.
    -   * @param untilOffset    the max offset to fetch. Exclusive.
    -   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
    -   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
    -   *                       offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
    -   *                       this method will either return record at offset 
if available, or return
    -   *                       the next earliest available record less than 
untilOffset, or null. It
    -   *                       will not throw any exception.
    +   * @throws OffsetOutOfRangeException if `offset` is out of range.
    +   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
    +   *                          consumer polls nothing before timeout.
        */
    -  def get(
    -      offset: Long,
    -      untilOffset: Long,
    -      pollTimeoutMs: Long,
    -      failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
    -    internalConsumer.get(offset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
    +  def fetch(offset: Long, pollTimeoutMs: Long)
    +  : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
    +    // Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
    +    seek(offset)
    +    val p = consumer.poll(pollTimeoutMs)
    +    val r = p.records(topicPartition)
    +    logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
    +    val offsetAfterPoll = consumer.position(topicPartition)
    +    logDebug(s"Offset changed from $offset to $offsetAfterPoll after 
polling")
    +    val fetchedData = (r, offsetAfterPoll)
    +    if (r.isEmpty) {
    +      // We cannot fetch anything after `poll`. Two possible cases:
    +      // - `offset` is out of range so that Kafka returns nothing. 
`OffsetOutOfRangeException` will
    +      //   be thrown.
    +      // - Cannot fetch any data before timeout. `TimeoutException` will 
be thrown.
    +      // - Fetched something but all of them are not invisible. This is a 
valid case and let the
    +      //   caller handles this.
    +      val range = getAvailableOffsetRange()
    +      if (offset < range.earliest || offset >= range.latest) {
    +        throw new OffsetOutOfRangeException(
    +          Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
    +      } else if (offset == offsetAfterPoll) {
    +        throw new TimeoutException(
    +          s"Cannot fetch record for offset $offset in $pollTimeoutMs 
milliseconds")
    +      }
    +    }
    +    fetchedData
       }
     
       /**
        * Return the available offset range of the current partition. It's a 
pair of the earliest offset
        * and the latest offset.
        */
    -  def getAvailableOffsetRange(): AvailableOffsetRange = 
internalConsumer.getAvailableOffsetRange()
    +  def getAvailableOffsetRange(): AvailableOffsetRange = {
    +    consumer.seekToBeginning(Set(topicPartition).asJava)
    +    val earliestOffset = consumer.position(topicPartition)
    +    consumer.seekToEnd(Set(topicPartition).asJava)
    +    val latestOffset = consumer.position(topicPartition)
    +    AvailableOffsetRange(earliestOffset, latestOffset)
    +  }
     
    -  /**
    -   * Release this consumer from being further used. Depending on its 
implementation,
    -   * this consumer will be either finalized, or reset for reuse later.
    -   */
    -  def release(): Unit
    +  override def close(): Unit = {
    +    consumer.close()
    +  }
     
    -  /** Reference to the internal implementation that this wrapper delegates 
to */
    -  protected def internalConsumer: InternalKafkaConsumer
    -}
    +  /** Create a KafkaConsumer to fetch records for `topicPartition` */
    +  private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
    +    val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
    +    val tps = new ju.ArrayList[TopicPartition]()
    +    tps.add(topicPartition)
    +    c.assign(tps)
    +    c
    +  }
     
    +  private def seek(offset: Long): Unit = {
    +    logDebug(s"Seeking to $groupId $topicPartition $offset")
    +    consumer.seek(topicPartition, offset)
    +  }
    +}
     
     /**
    - * A wrapper around Kafka's KafkaConsumer that throws error when data loss 
is detected.
    - * This is not for direct use outside this file.
    + * The internal object to store the fetched data from Kafka consumer and 
the next offset to poll.
    + *
    + * @param _records the pre-fetched Kafka records.
    + * @param _nextOffsetInFetchedData the next offset in `records`. We use 
this to verify if we
    + *                                 should check if the pre-fetched data is 
still valid.
    + * @param _offsetAfterPoll the Kafka offset after calling `poll`. We will 
use this offset to
    + *                           poll when `records` is drained.
      */
    -private[kafka010] case class InternalKafkaConsumer(
    -    topicPartition: TopicPartition,
    -    kafkaParams: ju.Map[String, Object]) extends Logging {
    -  import InternalKafkaConsumer._
    -
    -  /**
    -   * The internal object to store the fetched data from Kafka consumer and 
the next offset to poll.
    -   *
    -   * @param _records the pre-fetched Kafka records.
    -   * @param _nextOffsetInFetchedData the next offset in `records`. We use 
this to verify if we
    -   *                                 should check if the pre-fetched data 
is still valid.
    -   * @param _offsetAfterPoll the Kafka offset after calling `poll`. We 
will use this offset to
    -   *                           poll when `records` is drained.
    -   */
    -  private case class FetchedData(
    -      private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
    -      private var _nextOffsetInFetchedData: Long,
    -      private var _offsetAfterPoll: Long) {
    -
    -    def withNewPoll(
    -        records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
    -        offsetAfterPoll: Long): FetchedData = {
    -      this._records = records
    -      this._nextOffsetInFetchedData = UNKNOWN_OFFSET
    -      this._offsetAfterPoll = offsetAfterPoll
    -      this
    -    }
    -
    -    /** Whether there are more elements */
    -    def hasNext: Boolean = _records.hasNext
    -
    -    /** Move `records` forward and return the next record. */
    -    def next(): ConsumerRecord[Array[Byte], Array[Byte]] = {
    -      val record = _records.next()
    -      _nextOffsetInFetchedData = record.offset + 1
    -      record
    -    }
    +private[kafka010] case class FetchedData(
    +    private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
    +    private var _nextOffsetInFetchedData: Long,
    +    private var _offsetAfterPoll: Long) {
    +
    +  def withNewPoll(
    +      records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
    +      offsetAfterPoll: Long): FetchedData = {
    +    this._records = records
    +    this._nextOffsetInFetchedData = UNKNOWN_OFFSET
    +    this._offsetAfterPoll = offsetAfterPoll
    +    this
    +  }
     
    -    /** Move `records` backward and return the previous record. */
    -    def previous(): ConsumerRecord[Array[Byte], Array[Byte]] = {
    -      assert(_records.hasPrevious, "fetchedData cannot move back")
    -      val record = _records.previous()
    -      _nextOffsetInFetchedData = record.offset
    -      record
    -    }
    +  /** Whether there are more elements */
    +  def hasNext: Boolean = _records.hasNext
     
    -    /** Reset the internal pre-fetched data. */
    -    def reset(): Unit = {
    -      _records = ju.Collections.emptyListIterator()
    -    }
    +  /** Move `records` forward and return the next record. */
    +  def next(): ConsumerRecord[Array[Byte], Array[Byte]] = {
    +    val record = _records.next()
    +    _nextOffsetInFetchedData = record.offset + 1
    +    record
    +  }
     
    -    /**
    -     * Returns the next offset in `records`. We use this to verify if we 
should check if the
    -     * pre-fetched data is still valid.
    -     */
    -    def nextOffsetInFetchedData: Long = _nextOffsetInFetchedData
    +  /** Move `records` backward and return the previous record. */
    +  def previous(): ConsumerRecord[Array[Byte], Array[Byte]] = {
    +    assert(_records.hasPrevious, "fetchedData cannot move back")
    +    val record = _records.previous()
    +    _nextOffsetInFetchedData = record.offset
    +    record
    +  }
     
    -    /**
    -     * Returns the next offset to poll after draining the pre-fetched 
records.
    -     */
    -    def offsetAfterPoll: Long = _offsetAfterPoll
    +  /** Reset the internal pre-fetched data. */
    +  def reset(): Unit = {
    +    _records = ju.Collections.emptyListIterator()
    --- End diff --
    
    Should reflect #22507 when #22507 is merged.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to