Github user koeninger commented on the issue:

    https://github.com/apache/spark/pull/20997
  
    I think if we can't come up with a pool design now that solves most of the
    issues, we should switch back to the one cached consumer approach that the
    SQL code is using.
    
    On Mon, Apr 16, 2018 at 3:25 AM, Gabor Somogyi <notificati...@github.com>
    wrote:
    
    > *@gaborgsomogyi* commented on this pull request.
    > ------------------------------
    >
    > In external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/
    > KafkaDataConsumer.scala
    > <https://github.com/apache/spark/pull/20997#discussion_r181655790>:
    >
    > > +   * If matching consumer doesn't already exist, will be created using 
kafkaParams.
    > +   * The returned consumer must be released explicitly using 
[[KafkaDataConsumer.release()]].
    > +   *
    > +   * Note: This method guarantees that the consumer returned is not 
currently in use by anyone
    > +   * else. Within this guarantee, this method will make a best effort 
attempt to re-use consumers by
    > +   * caching them and tracking when they are in use.
    > +   */
    > +  def acquire[K, V](
    > +      groupId: String,
    > +      topicPartition: TopicPartition,
    > +      kafkaParams: ju.Map[String, Object],
    > +      context: TaskContext,
    > +      useCache: Boolean): KafkaDataConsumer[K, V] = synchronized {
    > +    val key = new CacheKey(groupId, topicPartition)
    > +    val existingInternalConsumers = Option(cache.get(key))
    > +      .getOrElse(new ju.LinkedList[InternalKafkaConsumer[_, _]])
    >
    > That's correct, the SQL part isn't keeping a linked list pool but a single
    > cached consumer. I was considering your suggestion and came to the same
    > conclusion:
    >
    > Can you clarify why you want to allow only 1 cached consumer per 
topicpartition, closing any others at task end?
    >
    > It seems like opening and closing consumers would be less efficient than 
allowing a pool of more than one consumer per topicpartition.
    >
    > Though limiting the number of cached consumers per groupId/TopicPartition
    > is a must as you've pointed out. On the other side if we go the SQL way
    > it's definitely less risky. Do you think we should switch back to the one
    > cached consumer approach?
    >
    > —
    > You are receiving this because you were mentioned.
    > Reply to this email directly, view it on GitHub
    > <https://github.com/apache/spark/pull/20997#discussion_r181655790>, or 
mute
    > the thread
    > 
<https://github.com/notifications/unsubscribe-auth/AAGABzOM08a0IoWTJWOi204fvKoyXc6xks5tpFWDgaJpZM4TKDOs>
    > .
    >



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to