Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20767#discussion_r173340037
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
    @@ -342,80 +401,65 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
         }
       }
     
    -  def releaseKafkaConsumer(
    -      topic: String,
    -      partition: Int,
    -      kafkaParams: ju.Map[String, Object]): Unit = {
    -    val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    -    val topicPartition = new TopicPartition(topic, partition)
    -    val key = CacheKey(groupId, topicPartition)
    +  private def releaseKafkaConsumer(
    +    topicPartition: TopicPartition,
    +    kafkaParams: ju.Map[String, Object]): Unit = {
    +    val key = new CacheKey(topicPartition, kafkaParams)
     
         synchronized {
           val consumer = cache.get(key)
           if (consumer != null) {
    -        consumer.inuse = false
    +        if (consumer.markedForClose) {
    +          consumer.close()
    +          cache.remove(key)
    +        } else {
    +          consumer.inuse = false
    +        }
           } else {
             logWarning(s"Attempting to release consumer that does not exist")
           }
         }
       }
     
    -  /**
    -   * Removes (and closes) the Kafka Consumer for the given topic, 
partition and group id.
    -   */
    -  def removeKafkaConsumer(
    -      topic: String,
    -      partition: Int,
    -      kafkaParams: ju.Map[String, Object]): Unit = {
    -    val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    -    val topicPartition = new TopicPartition(topic, partition)
    -    val key = CacheKey(groupId, topicPartition)
    -
    -    synchronized {
    -      val removedConsumer = cache.remove(key)
    -      if (removedConsumer != null) {
    -        removedConsumer.close()
    -      }
    -    }
    -  }
     
       /**
        * Get a cached consumer for groupId, assigned to topic and partition.
        * If matching consumer doesn't already exist, will be created using 
kafkaParams.
        */
    -  def getOrCreate(
    -      topic: String,
    -      partition: Int,
    -      kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = 
synchronized {
    -    val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    -    val topicPartition = new TopicPartition(topic, partition)
    -    val key = CacheKey(groupId, topicPartition)
    -
    -    // If this is reattempt at running the task, then invalidate cache and 
start with
    -    // a new consumer
    +  def acquire(
    +      topicPartition: TopicPartition,
    +      kafkaParams: ju.Map[String, Object],
    +      useCache: Boolean): KafkaDataConsumer = synchronized {
    +    val key = new CacheKey(topicPartition, kafkaParams)
    +    val existingInternalConsumer = cache.get(key)
    +
    +    lazy val newNonCachedConsumer =
    +      new NonCachedKafkaDataConsumer(new 
InternalKafkaConsumer(topicPartition, kafkaParams))
    +
         if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
    -      removeKafkaConsumer(topic, partition, kafkaParams)
    -      val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
    -      consumer.inuse = true
    -      cache.put(key, consumer)
    -      consumer
    -    } else {
    -      if (!cache.containsKey(key)) {
    -        cache.put(key, new CachedKafkaConsumer(topicPartition, 
kafkaParams))
    +      // If this is reattempt at running the task, then invalidate cache 
and start with
    +      // a new consumer
    +      if (existingInternalConsumer != null) {
    --- End diff --
    
    This is indeed better. What I was doing was always deferring to a later 
point. But that would lead to it being used one more time before being closed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to