Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20767#discussion_r173341089
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
    @@ -342,80 +401,65 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
         }
       }
     
    -  def releaseKafkaConsumer(
    -      topic: String,
    -      partition: Int,
    -      kafkaParams: ju.Map[String, Object]): Unit = {
    -    val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    -    val topicPartition = new TopicPartition(topic, partition)
    -    val key = CacheKey(groupId, topicPartition)
    +  private def releaseKafkaConsumer(
    +    topicPartition: TopicPartition,
    +    kafkaParams: ju.Map[String, Object]): Unit = {
    +    val key = new CacheKey(topicPartition, kafkaParams)
     
         synchronized {
           val consumer = cache.get(key)
           if (consumer != null) {
    -        consumer.inuse = false
    +        if (consumer.markedForClose) {
    +          consumer.close()
    +          cache.remove(key)
    +        } else {
    +          consumer.inuse = false
    +        }
           } else {
             logWarning(s"Attempting to release consumer that does not exist")
    --- End diff --
    
    Aah. The warning was misleading. Will add comments to clarify that. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to