Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20767#discussion_r173602442
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
    @@ -27,30 +27,73 @@ import org.apache.kafka.common.TopicPartition
     
     import org.apache.spark.{SparkEnv, SparkException, TaskContext}
     import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
     import org.apache.spark.sql.kafka010.KafkaSourceProvider._
     import org.apache.spark.util.UninterruptibleThread
     
    +private[kafka010] sealed trait KafkaDataConsumer {
    +  /**
    +   * Get the record for the given offset if available. Otherwise it will 
either throw error
    +   * (if failOnDataLoss = true), or return the next available offset 
within [offset, untilOffset),
    +   * or null.
    +   *
    +   * @param offset         the offset to fetch.
    +   * @param untilOffset    the max offset to fetch. Exclusive.
    +   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
    +   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
    +   *                       offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
    +   *                       this method will either return record at offset 
if available, or return
    +   *                       the next earliest available record less than 
untilOffset, or null. It
    +   *                       will not throw any exception.
    +   */
    +  def get(
    +      offset: Long,
    +      untilOffset: Long,
    +      pollTimeoutMs: Long,
    +      failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
    +    internalConsumer.get(offset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
    +  }
    +
    +  /**
    +   * Return the available offset range of the current partition. It's a 
pair of the earliest offset
    +   * and the latest offset.
    +   */
    +  def getAvailableOffsetRange(): AvailableOffsetRange = 
internalConsumer.getAvailableOffsetRange()
    +
    +  /**
    +   * Release this consumer from being further used. Depending on its 
implementation,
    +   * this consumer will be either finalized, or reset for reuse later.
    +   */
    +  def release(): Unit
    +
    +  /** Reference to the internal implementation that this wrapper delegates 
to */
    +  protected def internalConsumer: InternalKafkaConsumer
    +}
    +
     
     /**
    - * Consumer of single topicpartition, intended for cached reuse.
    - * Underlying consumer is not threadsafe, so neither is this,
    - * but processing the same topicpartition and group id in multiple threads 
is usually bad anyway.
    + * A wrapper around Kafka's KafkaConsumer that throws error when data loss 
is detected.
    + * This is not for direct use outside this file.
      */
    -private[kafka010] case class CachedKafkaConsumer private(
    +private[kafka010] case class InternalKafkaConsumer(
         topicPartition: TopicPartition,
         kafkaParams: ju.Map[String, Object]) extends Logging {
    -  import CachedKafkaConsumer._
    +  import InternalKafkaConsumer._
     
       private val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
     
    -  private var consumer = createConsumer
    +  @volatile private var consumer = createConsumer
    --- End diff --
    
    yeah, i just added them to be safer. one less thing to worry about. 



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to