Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20767#discussion_r173602480
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
    @@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
         }
       }
     
    -  def releaseKafkaConsumer(
    -      topic: String,
    -      partition: Int,
    -      kafkaParams: ju.Map[String, Object]): Unit = {
    -    val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    -    val topicPartition = new TopicPartition(topic, partition)
    -    val key = CacheKey(groupId, topicPartition)
    -
    +  private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = {
         synchronized {
    -      val consumer = cache.get(key)
    -      if (consumer != null) {
    -        consumer.inuse = false
    -      } else {
    -        logWarning(s"Attempting to release consumer that does not exist")
    -      }
    -    }
    -  }
     
    -  /**
    -   * Removes (and closes) the Kafka Consumer for the given topic, 
partition and group id.
    -   */
    -  def removeKafkaConsumer(
    -      topic: String,
    -      partition: Int,
    -      kafkaParams: ju.Map[String, Object]): Unit = {
    -    val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    -    val topicPartition = new TopicPartition(topic, partition)
    -    val key = CacheKey(groupId, topicPartition)
    -
    -    synchronized {
    -      val removedConsumer = cache.remove(key)
    -      if (removedConsumer != null) {
    -        removedConsumer.close()
    +      // If it has been marked for close, then do it any way
    +      if (intConsumer.inuse && intConsumer.markedForClose) 
intConsumer.close()
    +      intConsumer.inuse = false
    +
    +      // Clear the consumer from the cache if this is indeed the consumer 
present in the cache
    +      val key = new CacheKey(intConsumer.topicPartition, 
intConsumer.kafkaParams)
    +      val cachedIntConsumer = cache.get(key)
    +      if (cachedIntConsumer != null) {
    +        if (cachedIntConsumer.eq(intConsumer)) {
    +          // The released consumer is indeed the cached one.
    +          cache.remove(key)
    +        } else {
    +          // The released consumer is not the cached one. Don't do 
anything.
    +          // This should not happen as long as we maintain the invariant 
mentioned above.
    +          logWarning(
    +            s"Cached consumer not the same one as the one being release" +
    +              s"\ncached = $cachedIntConsumer 
[${System.identityHashCode(cachedIntConsumer)}]" +
    +              s"\nreleased = $intConsumer 
[${System.identityHashCode(intConsumer)}]")
    +        }
    +      } else {
    +        // The released consumer is not in the cache. Don't do anything.
    +        // This should not happen as long as we maintain the invariant 
mentioned above.
    +        logWarning(s"Attempting to release consumer that is not in the 
cache")
           }
         }
       }
     
       /**
        * Get a cached consumer for groupId, assigned to topic and partition.
        * If matching consumer doesn't already exist, will be created using 
kafkaParams.
    +   * The returned consumer must be released explicitly using 
[[KafkaDataConsumer.release()]].
    +   *
    +   * Note: This method guarantees that the consumer returned is not 
currently in use by any one
    +   * else. Within this guarantee, this will make a best effort attempt to 
re-use consumers by
    +   * caching them and tracking when they are in use.
        */
    -  def getOrCreate(
    -      topic: String,
    -      partition: Int,
    -      kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = 
synchronized {
    -    val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    -    val topicPartition = new TopicPartition(topic, partition)
    -    val key = CacheKey(groupId, topicPartition)
    -
    -    // If this is reattempt at running the task, then invalidate cache and 
start with
    -    // a new consumer
    +  def acquire(
    +      topicPartition: TopicPartition,
    +      kafkaParams: ju.Map[String, Object],
    +      useCache: Boolean): KafkaDataConsumer = synchronized {
    +    val key = new CacheKey(topicPartition, kafkaParams)
    +    val existingInternalConsumer = cache.get(key)
    +
    +    lazy val newInternalConsumer = new 
InternalKafkaConsumer(topicPartition, kafkaParams)
    +
         if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
    -      removeKafkaConsumer(topic, partition, kafkaParams)
    -      val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
    -      consumer.inuse = true
    -      cache.put(key, consumer)
    -      consumer
    -    } else {
    -      if (!cache.containsKey(key)) {
    -        cache.put(key, new CachedKafkaConsumer(topicPartition, 
kafkaParams))
    +      // If this is reattempt at running the task, then invalidate cached 
consumer if any and
    +      // start with a new one.
    +      if (existingInternalConsumer != null) {
    +        if (existingInternalConsumer.inuse) {
    +          // Consumer exists in cache and is somehow in use. Don't close 
it immediately, but
    +          // mark it for being closed when it is released.
    +          existingInternalConsumer.markedForClose = true
    +          NonCachedKafkaDataConsumer(newInternalConsumer)
    +
    +        } else {
    +          // Consumer exists in cache and is not in use, so close it 
immediately and replace
    +          // it with a new one.
    +          existingInternalConsumer.close()
    +          cache.put(key, newInternalConsumer)
    +          CachedKafkaDataConsumer(newInternalConsumer)
    +
    +        }
    +      } else {
    +        // Consumer is not cached, put the new one in the cache
    +        cache.put(key, newInternalConsumer)
    +        CachedKafkaDataConsumer(newInternalConsumer)
    +
           }
    -      val consumer = cache.get(key)
    -      consumer.inuse = true
    -      consumer
    +    } else if (!useCache) {
    +      // If planner asks to not reuse consumers, then do not use it, 
return a new consumer
    +      NonCachedKafkaDataConsumer(newInternalConsumer)
    +
    +    } else if (existingInternalConsumer == null) {
    +      // If consumer is not already cached, then put a new in the cache 
and return it
    +      newInternalConsumer.inuse = true
    +      cache.put(key, newInternalConsumer)
    +      CachedKafkaDataConsumer(newInternalConsumer)
    +
    +    } else if (existingInternalConsumer.inuse) {
    +      // If consumer is already cached but is currently in use, then 
return a new consumer
    +      NonCachedKafkaDataConsumer(newInternalConsumer)
    +
    +    } else {
    +      // If consumer is already cached and is currently not in use, then 
return that consumer
    +      CachedKafkaDataConsumer(existingInternalConsumer)
    --- End diff --
    
    I wonder why this was not caught in the stress test.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to