Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22138#discussion_r214853362
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
    @@ -18,222 +18,247 @@
     package org.apache.spark.sql.kafka010
     
     import java.{util => ju}
    +import java.io.Closeable
     import java.util.concurrent.TimeoutException
     
     import scala.collection.JavaConverters._
     
     import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
     import org.apache.kafka.common.TopicPartition
     
    -import org.apache.spark.{SparkEnv, SparkException, TaskContext}
    +import org.apache.spark.TaskContext
     import org.apache.spark.internal.Logging
    -import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
    +import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, 
CacheKey, UNKNOWN_OFFSET}
     import org.apache.spark.sql.kafka010.KafkaSourceProvider._
    -import org.apache.spark.util.UninterruptibleThread
    +import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
    +
    +/**
    + * This class simplifies the usages of Kafka consumer in Spark SQL Kafka 
connector.
    + *
    + * NOTE: Like KafkaConsumer, this class is not thread-safe.
    + * NOTE for contributors: It is possible for the instance to be used from 
multiple callers,
    + * so all the methods should not rely on current cursor and use seek 
manually.
    + */
    +private[kafka010] class InternalKafkaConsumer(
    +    val topicPartition: TopicPartition,
    +    val kafkaParams: ju.Map[String, Object]) extends Closeable with 
Logging {
    +
    +  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    +
    +  private val consumer = createConsumer
     
    -private[kafka010] sealed trait KafkaDataConsumer {
       /**
    -   * Get the record for the given offset if available.
    -   *
    -   * If the record is invisible (either a
    -   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
    -   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
    -   * within [offset, untilOffset).
    -   *
    -   * This method also will try its best to detect data loss. If 
`failOnDataLoss` is `true`, it will
    -   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
    -   * method will try to fetch next available record within [offset, 
untilOffset).
    -   *
    -   * When this method tries to skip offsets due to either invisible 
messages or data loss and
    -   * reaches `untilOffset`, it will return `null`.
    +   * Poll messages from Kafka starting from `offset` and returns a pair of 
"list of consumer record"
    +   * and "offset after poll". The list of consumer record may be empty if 
the Kafka consumer fetches
    +   * some messages but all of them are not visible messages (either 
transaction messages,
    +   * or aborted messages when `isolation.level` is `read_committed`).
        *
    -   * @param offset         the offset to fetch.
    -   * @param untilOffset    the max offset to fetch. Exclusive.
    -   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
    -   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
    -   *                       offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
    -   *                       this method will either return record at offset 
if available, or return
    -   *                       the next earliest available record less than 
untilOffset, or null. It
    -   *                       will not throw any exception.
    +   * @throws OffsetOutOfRangeException if `offset` is out of range.
    +   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
    +   *                          consumer polls nothing before timeout.
        */
    -  def get(
    -      offset: Long,
    -      untilOffset: Long,
    -      pollTimeoutMs: Long,
    -      failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
    -    internalConsumer.get(offset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
    +  def fetch(offset: Long, pollTimeoutMs: Long)
    +  : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
    +    // Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
    +    seek(offset)
    +    val p = consumer.poll(pollTimeoutMs)
    +    val r = p.records(topicPartition)
    +    logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
    +    val offsetAfterPoll = consumer.position(topicPartition)
    +    logDebug(s"Offset changed from $offset to $offsetAfterPoll after 
polling")
    +    val fetchedData = (r, offsetAfterPoll)
    +    if (r.isEmpty) {
    +      // We cannot fetch anything after `poll`. Two possible cases:
    +      // - `offset` is out of range so that Kafka returns nothing. 
`OffsetOutOfRangeException` will
    +      //   be thrown.
    +      // - Cannot fetch any data before timeout. `TimeoutException` will 
be thrown.
    +      // - Fetched something but all of them are not invisible. This is a 
valid case and let the
    +      //   caller handles this.
    +      val range = getAvailableOffsetRange()
    +      if (offset < range.earliest || offset >= range.latest) {
    +        throw new OffsetOutOfRangeException(
    +          Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
    +      } else if (offset == offsetAfterPoll) {
    +        throw new TimeoutException(
    +          s"Cannot fetch record for offset $offset in $pollTimeoutMs 
milliseconds")
    +      }
    +    }
    +    fetchedData
       }
     
       /**
        * Return the available offset range of the current partition. It's a 
pair of the earliest offset
        * and the latest offset.
        */
    -  def getAvailableOffsetRange(): AvailableOffsetRange = 
internalConsumer.getAvailableOffsetRange()
    +  def getAvailableOffsetRange(): AvailableOffsetRange = {
    +    consumer.seekToBeginning(Set(topicPartition).asJava)
    +    val earliestOffset = consumer.position(topicPartition)
    +    consumer.seekToEnd(Set(topicPartition).asJava)
    +    val latestOffset = consumer.position(topicPartition)
    +    AvailableOffsetRange(earliestOffset, latestOffset)
    +  }
     
    -  /**
    -   * Release this consumer from being further used. Depending on its 
implementation,
    -   * this consumer will be either finalized, or reset for reuse later.
    -   */
    -  def release(): Unit
    +  override def close(): Unit = {
    +    consumer.close()
    +  }
     
    -  /** Reference to the internal implementation that this wrapper delegates 
to */
    -  protected def internalConsumer: InternalKafkaConsumer
    -}
    +  /** Create a KafkaConsumer to fetch records for `topicPartition` */
    +  private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
    +    val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
    +    val tps = new ju.ArrayList[TopicPartition]()
    +    tps.add(topicPartition)
    +    c.assign(tps)
    +    c
    +  }
     
    +  private def seek(offset: Long): Unit = {
    +    logDebug(s"Seeking to $groupId $topicPartition $offset")
    +    consumer.seek(topicPartition, offset)
    +  }
    +}
     
     /**
    - * A wrapper around Kafka's KafkaConsumer that throws error when data loss 
is detected.
    - * This is not for direct use outside this file.
    + * The internal object to store the fetched data from Kafka consumer and 
the next offset to poll.
    + *
    + * @param _records the pre-fetched Kafka records.
    + * @param _nextOffsetInFetchedData the next offset in `records`. We use 
this to verify if we
    + *                                 should check if the pre-fetched data is 
still valid.
    + * @param _offsetAfterPoll the Kafka offset after calling `poll`. We will 
use this offset to
    + *                           poll when `records` is drained.
      */
    -private[kafka010] case class InternalKafkaConsumer(
    -    topicPartition: TopicPartition,
    -    kafkaParams: ju.Map[String, Object]) extends Logging {
    -  import InternalKafkaConsumer._
    -
    -  /**
    -   * The internal object to store the fetched data from Kafka consumer and 
the next offset to poll.
    -   *
    -   * @param _records the pre-fetched Kafka records.
    -   * @param _nextOffsetInFetchedData the next offset in `records`. We use 
this to verify if we
    -   *                                 should check if the pre-fetched data 
is still valid.
    -   * @param _offsetAfterPoll the Kafka offset after calling `poll`. We 
will use this offset to
    -   *                           poll when `records` is drained.
    -   */
    -  private case class FetchedData(
    -      private var _records: ju.ListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
    -      private var _nextOffsetInFetchedData: Long,
    -      private var _offsetAfterPoll: Long) {
    -
    -    def withNewPoll(
    -        records: ju.ListIterator[ConsumerRecord[Array[Byte], Array[Byte]]],
    -        offsetAfterPoll: Long): FetchedData = {
    -      this._records = records
    -      this._nextOffsetInFetchedData = UNKNOWN_OFFSET
    -      this._offsetAfterPoll = offsetAfterPoll
    -      this
    -    }
    -
    -    /** Whether there are more elements */
    -    def hasNext: Boolean = _records.hasNext
    -
    -    /** Move `records` forward and return the next record. */
    -    def next(): ConsumerRecord[Array[Byte], Array[Byte]] = {
    -      val record = _records.next()
    -      _nextOffsetInFetchedData = record.offset + 1
    -      record
    -    }
    +private[kafka010] case class FetchedData(
    --- End diff --
    
    I know it's old code but but this blocks to solve the multiple consumer 
extra seek problem.
    If this would be readonly it doesn't matter which task gets which consumer. 
Consumer just takes a look at the offset indexed data pool for data, gives it 
back and makes prefetch again.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to