Github user koeninger commented on the issue: https://github.com/apache/spark/pull/20997 I think if we can't come up with a pool design now that solves most of the issues, we should switch back to the one cached consumer approach that the SQL code is using. On Mon, Apr 16, 2018 at 3:25 AM, Gabor Somogyi <notificati...@github.com> wrote: > *@gaborgsomogyi* commented on this pull request. > ------------------------------ > > In external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ > KafkaDataConsumer.scala > <https://github.com/apache/spark/pull/20997#discussion_r181655790>: > > > + * If matching consumer doesn't already exist, will be created using kafkaParams. > + * The returned consumer must be released explicitly using [[KafkaDataConsumer.release()]]. > + * > + * Note: This method guarantees that the consumer returned is not currently in use by anyone > + * else. Within this guarantee, this method will make a best effort attempt to re-use consumers by > + * caching them and tracking when they are in use. > + */ > + def acquire[K, V]( > + groupId: String, > + topicPartition: TopicPartition, > + kafkaParams: ju.Map[String, Object], > + context: TaskContext, > + useCache: Boolean): KafkaDataConsumer[K, V] = synchronized { > + val key = new CacheKey(groupId, topicPartition) > + val existingInternalConsumers = Option(cache.get(key)) > + .getOrElse(new ju.LinkedList[InternalKafkaConsumer[_, _]]) > > That's correct, the SQL part isn't keeping a linked list pool but a single > cached consumer. I was considering your suggestion and came to the same > conclusion: > > Can you clarify why you want to allow only 1 cached consumer per topicpartition, closing any others at task end? > > It seems like opening and closing consumers would be less efficient than allowing a pool of more than one consumer per topicpartition. > > Though limiting the number of cached consumers per groupId/TopicPartition > is a must as you've pointed out. On the other side if we go the SQL way > it's definitely less risky. Do you think we should switch back to the one > cached consumer approach? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/20997#discussion_r181655790>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAGABzOM08a0IoWTJWOi204fvKoyXc6xks5tpFWDgaJpZM4TKDOs> > . >
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org