[jira] [Updated] (SPARK-23623) Avoid concurrent use of cached KafkaConsumer in CachedKafkaConsumer (kafka-0-10-sql)
[ https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shixiong Zhu updated SPARK-23623: - Fix Version/s: 2.3.1 > Avoid concurrent use of cached KafkaConsumer in CachedKafkaConsumer > (kafka-0-10-sql) > > > Key: SPARK-23623 > URL: https://issues.apache.org/jira/browse/SPARK-23623 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Tathagata Das >Assignee: Tathagata Das >Priority: Critical > Fix For: 2.3.1, 2.4.0 > > > CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a > pool of KafkaConsumers that can be reused. However, it was built with the > assumption there will be only one task using trying to read the same Kafka > TopicPartition at the same time. Hence, the cache was keyed by the > TopicPartition a consumer is supposed to read. And any cases where this > assumption may not be true, we have SparkPlan flag to disable the use of a > cache. So it was up to the planner to correctly identify when it was not safe > to use the cache and set the flag accordingly. > Fundamentally, this is the wrong way to approach the problem. It is HARD for > a high-level planner to reason about the low-level execution model, whether > there will be multiple tasks in the same query trying to read the same > partition. Case in point, 2.3.0 introduced stream-stream joins, and you can > build a streaming self-join query on Kafka. It's pretty non-trivial to figure > out how this leads to two tasks reading the same partition twice, possibly > concurrently. And due to the non-triviality, it is hard to figure this out in > the planner and set the flag to avoid the cache / consumer pool. And this can > inadvertently lead to {{ConcurrentModificationException}} ,or worse, silent > reading of incorrect data. > Here is a better way to design this. The planner shouldnt have to understand > these low-level optimizations. Rather the consumer pool should be smart > enough avoid concurrent use of a cached consumer. Currently, it tries to do > so but incorrectly (the flag {{inuse}} is not checked when returning a cached > consumer, see > [this|[https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403]).] > If there is another request for the same partition as a currently in-use > consumer, the pool should automatically return a fresh consumer that should > be closed when the task is done. > > > > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23623) Avoid concurrent use of cached KafkaConsumer in CachedKafkaConsumer (kafka-0-10-sql)
[ https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tathagata Das updated SPARK-23623: -- Description: CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one task using trying to read the same Kafka TopicPartition at the same time. Hence, the cache was keyed by the TopicPartition a consumer is supposed to read. And any cases where this assumption may not be true, we have SparkPlan flag to disable the use of a cache. So it was up to the planner to correctly identify when it was not safe to use the cache and set the flag accordingly. Fundamentally, this is the wrong way to approach the problem. It is HARD for a high-level planner to reason about the low-level execution model, whether there will be multiple tasks in the same query trying to read the same partition. Case in point, 2.3.0 introduced stream-stream joins, and you can build a streaming self-join query on Kafka. It's pretty non-trivial to figure out how this leads to two tasks reading the same partition twice, possibly concurrently. And due to the non-triviality, it is hard to figure this out in the planner and set the flag to avoid the cache / consumer pool. And this can inadvertently lead to {{ConcurrentModificationException}} ,or worse, silent reading of incorrect data. Here is a better way to design this. The planner shouldnt have to understand these low-level optimizations. Rather the consumer pool should be smart enough avoid concurrent use of a cached consumer. Currently, it tries to do so but incorrectly (the flag {{inuse}} is not checked when returning a cached consumer, see [this|[https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403]).] If there is another request for the same partition as a currently in-use consumer, the pool should automatically return a fresh consumer that should be closed when the task is done. was: CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one task using trying to read the same Kafka TopicPartition at the same time. Hence, the cache was keyed by the TopicPartition a consumer is supposed to read. And any cases where this assumption may not be true, we have SparkPlan flag to disable the use of a cache. So it was up to the planner to correctly identify when it was not safe to use the cache and set the flag accordingly. Fundamentally, this is the wrong way to approach the problem. It is HARD for a high-level planner to reason about the low-level execution model, whether there will be multiple tasks in the same query trying to read the same partition. Case in point, 2.3.0 introduced stream-stream joins, and you can build a streaming self-join query on Kafka. It's pretty non-trivial to figure out how this leads to two tasks reading the same partition twice, possibly concurrently. And due to the non-triviality, it is hard to figure this out in the planner and set the flag to avoid the cache / consumer pool. And this can inadvertantly lead to Here is a better way to design this. The planner shouldnt have to understand these low-level optimizations. Rather the consumer pool should be smart enough avoid concurrent use of a cached consumer. Currently, it tries to do so but incorrectly (the flag {{inuse}} is not checked when returning a cached consumer, see [this|[https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403]).] If there is another request for the same partition as a currently in-use consumer, the pool should automatically return a fresh consumer that should be closed when the task is done. > Avoid concurrent use of cached KafkaConsumer in CachedKafkaConsumer > (kafka-0-10-sql) > > > Key: SPARK-23623 > URL: https://issues.apache.org/jira/browse/SPARK-23623 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Tathagata Das >Assignee: Tathagata Das >Priority: Critical > > CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a > pool of KafkaConsumers that can be reused. However, it was built with the > assumption there will be only one task using trying to read the same Kafka > TopicPartition at the same time. Hence, the cache was keyed by the > TopicPartition a consumer is supposed to read. And any cases whe
[jira] [Updated] (SPARK-23623) Avoid concurrent use of cached KafkaConsumer in CachedKafkaConsumer (kafka-0-10-sql)
[ https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tathagata Das updated SPARK-23623: -- Description: CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one task using trying to read the same Kafka TopicPartition at the same time. Hence, the cache was keyed by the TopicPartition a consumer is supposed to read. And any cases where this assumption may not be true, we have SparkPlan flag to disable the use of a cache. So it was up to the planner to correctly identify when it was not safe to use the cache and set the flag accordingly. Fundamentally, this is the wrong way to approach the problem. It is HARD for a high-level planner to reason about the low-level execution model, whether there will be multiple tasks in the same query trying to read the same partition. Case in point, 2.3.0 introduced stream-stream joins, and you can build a streaming self-join query on Kafka. It's pretty non-trivial to figure out how this leads to two tasks reading the same partition twice, possibly concurrently. And due to the non-triviality, it is hard to figure this out in the planner and set the flag to avoid the cache / consumer pool. And this can inadvertantly lead to Here is a better way to design this. The planner shouldnt have to understand these low-level optimizations. Rather the consumer pool should be smart enough avoid concurrent use of a cached consumer. Currently, it tries to do so but incorrectly (the flag {{inuse}} is not checked when returning a cached consumer, see [this|[https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403]).] If there is another request for the same partition as a currently in-use consumer, the pool should automatically return a fresh consumer that should be closed when the task is done. was: CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one task using trying to read the same Kafka TopicPartition at the same time. Hence, the cache was keyed by the TopicPartition a consumer is supposed to read. And any cases where this assumption may not be true, we have SparkPlan flag to disable the use of a cache. So it was up to the planner to correctly identify when it was not safe to use the cache and set the flag accordingly. Fundamentally, this is the wrong way to approach the problem. It is HARD for a high-level planner to reason about the low-level execution model, whether there will be multiple tasks in the same query trying to read the same partition. Case in point, 2.3.0 introduced stream-stream joins, and you can build a streaming self-join query on Kafka. It's pretty non-trivial to figure out how this leads to two tasks reading the same partition twice, possibly concurrently. And due to the non-triviality, it is hard to figure this out in the planner and set the flag to avoid the cache / consumer pool. Here is a better way to design this. The planner shouldnt have to understand these low-level optimizations. Rather the consumer pool should be smart enough avoid concurrent use of a cached consumer. Currently, it tries to do so but incorrectly (the flag {{inuse}} is not checked when returning a cached consumer, see [this|[https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala#L403]).] If there is another request for the same partition as a currently in-use consumer, the pool should automatically return a fresh consumer that should be closed when the task is done. > Avoid concurrent use of cached KafkaConsumer in CachedKafkaConsumer > (kafka-0-10-sql) > > > Key: SPARK-23623 > URL: https://issues.apache.org/jira/browse/SPARK-23623 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Tathagata Das >Assignee: Tathagata Das >Priority: Critical > > CacheKafkaConsumer in the project `kafka-0-10-sql` is designed to maintain a > pool of KafkaConsumers that can be reused. However, it was built with the > assumption there will be only one task using trying to read the same Kafka > TopicPartition at the same time. Hence, the cache was keyed by the > TopicPartition a consumer is supposed to read. And any cases where this > assumption may not be true, we have SparkPlan flag to disable the use of a > cache. So it was up to the pl