[ https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Shixiong Zhu resolved SPARK-23623. ---------------------------------- Resolution: Fixed Fix Version/s: 2.4.0 > Avoid concurrent use of cached KafkaConsumer in CachedKafkaConsumer > (kafka-0-10-sql) > ------------------------------------------------------------------------------------ > > Key: SPARK-23623 > URL: https://issues.apache.org/jira/browse/SPARK-23623 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.3.0 > Reporter: Tathagata Das > Assignee: Tathagata Das > Priority: Critical > Fix For: 2.4.0 > > > CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a > pool of KafkaConsumers that can be reused. However, it was built with the > assumption there will be only one task using trying to read the same Kafka > TopicPartition at the same time. Hence, the cache was keyed by the > TopicPartition a consumer is supposed to read. And any cases where this > assumption may not be true, we have SparkPlan flag to disable the use of a > cache. So it was up to the planner to correctly identify when it was not safe > to use the cache and set the flag accordingly. > Fundamentally, this is the wrong way to approach the problem. It is HARD for > a high-level planner to reason about the low-level execution model, whether > there will be multiple tasks in the same query trying to read the same > partition. Case in point, 2.3.0 introduced stream-stream joins, and you can > build a streaming self-join query on Kafka. It's pretty non-trivial to figure > out how this leads to two tasks reading the same partition twice, possibly > concurrently. And due to the non-triviality, it is hard to figure this out in > the planner and set the flag to avoid the cache / consumer pool. And this can > inadvertently lead to {{ConcurrentModificationException}} ,or worse, silent > reading of incorrect data. > Here is a better way to design this. The planner shouldnt have to understand > these low-level optimizations. Rather the consumer pool should be smart > enough avoid concurrent use of a cached consumer. Currently, it tries to do > so but incorrectly (the flag {{inuse}} is not checked when returning a cached > consumer, see > [this|[https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403]).] > If there is another request for the same partition as a currently in-use > consumer, the pool should automatically return a fresh consumer that should > be closed when the task is done. > > > > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org