[jira] [Assigned] (SPARK-24987) Kafka Cached Consumer Leaking File Descriptors
[ https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cody Koeninger reassigned SPARK-24987: -- Assignee: Yuval Itzchakov > Kafka Cached Consumer Leaking File Descriptors > -- > > Key: SPARK-24987 > URL: https://issues.apache.org/jira/browse/SPARK-24987 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Java(TM) SE Runtime Environment (build 1.8.0_112-b15) > Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) > >Reporter: Yuval Itzchakov >Assignee: Yuval Itzchakov >Priority: Critical > > Setup: > * Spark 2.3.1 > * Java 1.8.0 (112) > * Standalone Cluster Manager > * 3 Nodes, 1 Executor per node. > Spark 2.3.0 introduced a new mechanism for caching Kafka consumers > (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) > via KafkaDataConsumer.acquire. > It seems that there are situations (I've been trying to debug it, haven't > been able to find the root cause as of yet) where cached consumers remain "in > use" throughout the life time of the task and are never released. This can be > identified by the following line of the stack trace: > at > org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460) > Which points to: > {code:java} > } else if (existingInternalConsumer.inUse) { > // If consumer is already cached but is currently in use, then return a new > consumer > NonCachedKafkaDataConsumer(newInternalConsumer) > {code} > Meaning the existing consumer created for that `TopicPartition` is still in > use for some reason. The weird thing is that you can see this for very old > tasks which have already finished successfully. > I've traced down this leak using file leak detector, attaching it to the > running Executor JVM process. I've emitted the list of open file descriptors > which [you can find > here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], > and you can see that the majority of them are epoll FD used by Kafka > Consumers, indicating that they aren't closing. > Spark graph: > {code:java} > kafkaStream > .load() > .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") > .as[(String, String)] > .flatMap {...} > .groupByKey(...) > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) > .foreach(...) > .outputMode(OutputMode.Update) > .option("checkpointLocation", > sparkConfiguration.properties.checkpointDirectory) > .start() > .awaitTermination(){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24987) Kafka Cached Consumer Leaking File Descriptors
[ https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24987: Assignee: (was: Apache Spark) > Kafka Cached Consumer Leaking File Descriptors > -- > > Key: SPARK-24987 > URL: https://issues.apache.org/jira/browse/SPARK-24987 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Java(TM) SE Runtime Environment (build 1.8.0_112-b15) > Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) > >Reporter: Yuval Itzchakov >Priority: Critical > > Setup: > * Spark 2.3.1 > * Java 1.8.0 (112) > * Standalone Cluster Manager > * 3 Nodes, 1 Executor per node. > Spark 2.3.0 introduced a new mechanism for caching Kafka consumers > (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) > via KafkaDataConsumer.acquire. > It seems that there are situations (I've been trying to debug it, haven't > been able to find the root cause as of yet) where cached consumers remain "in > use" throughout the life time of the task and are never released. This can be > identified by the following line of the stack trace: > at > org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460) > Which points to: > {code:java} > } else if (existingInternalConsumer.inUse) { > // If consumer is already cached but is currently in use, then return a new > consumer > NonCachedKafkaDataConsumer(newInternalConsumer) > {code} > Meaning the existing consumer created for that `TopicPartition` is still in > use for some reason. The weird thing is that you can see this for very old > tasks which have already finished successfully. > I've traced down this leak using file leak detector, attaching it to the > running Executor JVM process. I've emitted the list of open file descriptors > which [you can find > here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], > and you can see that the majority of them are epoll FD used by Kafka > Consumers, indicating that they aren't closing. > Spark graph: > {code:java} > kafkaStream > .load() > .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") > .as[(String, String)] > .flatMap {...} > .groupByKey(...) > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) > .foreach(...) > .outputMode(OutputMode.Update) > .option("checkpointLocation", > sparkConfiguration.properties.checkpointDirectory) > .start() > .awaitTermination(){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24987) Kafka Cached Consumer Leaking File Descriptors
[ https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-24987: Assignee: Apache Spark > Kafka Cached Consumer Leaking File Descriptors > -- > > Key: SPARK-24987 > URL: https://issues.apache.org/jira/browse/SPARK-24987 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0, 2.3.1 > Environment: Spark 2.3.1 > Java(TM) SE Runtime Environment (build 1.8.0_112-b15) > Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) > >Reporter: Yuval Itzchakov >Assignee: Apache Spark >Priority: Critical > > Setup: > * Spark 2.3.1 > * Java 1.8.0 (112) > * Standalone Cluster Manager > * 3 Nodes, 1 Executor per node. > Spark 2.3.0 introduced a new mechanism for caching Kafka consumers > (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) > via KafkaDataConsumer.acquire. > It seems that there are situations (I've been trying to debug it, haven't > been able to find the root cause as of yet) where cached consumers remain "in > use" throughout the life time of the task and are never released. This can be > identified by the following line of the stack trace: > at > org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460) > Which points to: > {code:java} > } else if (existingInternalConsumer.inUse) { > // If consumer is already cached but is currently in use, then return a new > consumer > NonCachedKafkaDataConsumer(newInternalConsumer) > {code} > Meaning the existing consumer created for that `TopicPartition` is still in > use for some reason. The weird thing is that you can see this for very old > tasks which have already finished successfully. > I've traced down this leak using file leak detector, attaching it to the > running Executor JVM process. I've emitted the list of open file descriptors > which [you can find > here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d], > and you can see that the majority of them are epoll FD used by Kafka > Consumers, indicating that they aren't closing. > Spark graph: > {code:java} > kafkaStream > .load() > .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") > .as[(String, String)] > .flatMap {...} > .groupByKey(...) > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...) > .foreach(...) > .outputMode(OutputMode.Update) > .option("checkpointLocation", > sparkConfiguration.properties.checkpointDirectory) > .start() > .awaitTermination(){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org