Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20767#discussion_r174973494
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
    @@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
         }
       }
     
    -  def releaseKafkaConsumer(
    -      topic: String,
    -      partition: Int,
    -      kafkaParams: ju.Map[String, Object]): Unit = {
    -    val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    -    val topicPartition = new TopicPartition(topic, partition)
    -    val key = CacheKey(groupId, topicPartition)
    -
    +  private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = {
         synchronized {
    -      val consumer = cache.get(key)
    -      if (consumer != null) {
    -        consumer.inuse = false
    -      } else {
    -        logWarning(s"Attempting to release consumer that does not exist")
    -      }
    -    }
    -  }
     
    -  /**
    -   * Removes (and closes) the Kafka Consumer for the given topic, 
partition and group id.
    -   */
    -  def removeKafkaConsumer(
    -      topic: String,
    -      partition: Int,
    -      kafkaParams: ju.Map[String, Object]): Unit = {
    -    val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    -    val topicPartition = new TopicPartition(topic, partition)
    -    val key = CacheKey(groupId, topicPartition)
    -
    -    synchronized {
    -      val removedConsumer = cache.remove(key)
    -      if (removedConsumer != null) {
    -        removedConsumer.close()
    +      // If it has been marked for close, then do it any way
    +      if (intConsumer.inuse && intConsumer.markedForClose) 
intConsumer.close()
    --- End diff --
    
    I rewrote the logic. Hopefully, it's simpler to reason about it now.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to