[jira] [Assigned] (SPARK-24987) Kafka Cached Consumer Leaking File Descriptors

2018-08-04 Thread Cody Koeninger (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cody Koeninger reassigned SPARK-24987:
--

Assignee: Yuval Itzchakov

> Kafka Cached Consumer Leaking File Descriptors
> --
>
> Key: SPARK-24987
> URL: https://issues.apache.org/jira/browse/SPARK-24987
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1
> Environment: Spark 2.3.1
> Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
>  Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)
>  
>Reporter: Yuval Itzchakov
>Assignee: Yuval Itzchakov
>Priority: Critical
>
> Setup:
>  * Spark 2.3.1
>  * Java 1.8.0 (112)
>  * Standalone Cluster Manager
>  * 3 Nodes, 1 Executor per node.
> Spark 2.3.0 introduced a new mechanism for caching Kafka consumers 
> (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel)
>  via KafkaDataConsumer.acquire.
> It seems that there are situations (I've been trying to debug it, haven't 
> been able to find the root cause as of yet) where cached consumers remain "in 
> use" throughout the life time of the task and are never released. This can be 
> identified by the following line of the stack trace:
> at 
> org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460)
> Which points to:
> {code:java}
> } else if (existingInternalConsumer.inUse) {
>   // If consumer is already cached but is currently in use, then return a new 
> consumer
>   NonCachedKafkaDataConsumer(newInternalConsumer)
> {code}
>  Meaning the existing consumer created for that `TopicPartition` is still in 
> use for some reason. The weird thing is that you can see this for very old 
> tasks which have already finished successfully.
> I've traced down this leak using file leak detector, attaching it to the 
> running Executor JVM process. I've emitted the list of open file descriptors 
> which [you can find 
> here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d],
>  and you can see that the majority of them are epoll FD used by Kafka 
> Consumers, indicating that they aren't closing.
>  Spark graph:
> {code:java}
> kafkaStream
>   .load()
>   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>   .as[(String, String)]
>   .flatMap {...}
>   .groupByKey(...)
>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...)
>   .foreach(...)
>   .outputMode(OutputMode.Update)
>   .option("checkpointLocation",
> sparkConfiguration.properties.checkpointDirectory)
>   .start()
>   .awaitTermination(){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24987) Kafka Cached Consumer Leaking File Descriptors

2018-08-03 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24987:


Assignee: (was: Apache Spark)

> Kafka Cached Consumer Leaking File Descriptors
> --
>
> Key: SPARK-24987
> URL: https://issues.apache.org/jira/browse/SPARK-24987
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1
> Environment: Spark 2.3.1
> Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
>  Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)
>  
>Reporter: Yuval Itzchakov
>Priority: Critical
>
> Setup:
>  * Spark 2.3.1
>  * Java 1.8.0 (112)
>  * Standalone Cluster Manager
>  * 3 Nodes, 1 Executor per node.
> Spark 2.3.0 introduced a new mechanism for caching Kafka consumers 
> (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel)
>  via KafkaDataConsumer.acquire.
> It seems that there are situations (I've been trying to debug it, haven't 
> been able to find the root cause as of yet) where cached consumers remain "in 
> use" throughout the life time of the task and are never released. This can be 
> identified by the following line of the stack trace:
> at 
> org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460)
> Which points to:
> {code:java}
> } else if (existingInternalConsumer.inUse) {
>   // If consumer is already cached but is currently in use, then return a new 
> consumer
>   NonCachedKafkaDataConsumer(newInternalConsumer)
> {code}
>  Meaning the existing consumer created for that `TopicPartition` is still in 
> use for some reason. The weird thing is that you can see this for very old 
> tasks which have already finished successfully.
> I've traced down this leak using file leak detector, attaching it to the 
> running Executor JVM process. I've emitted the list of open file descriptors 
> which [you can find 
> here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d],
>  and you can see that the majority of them are epoll FD used by Kafka 
> Consumers, indicating that they aren't closing.
>  Spark graph:
> {code:java}
> kafkaStream
>   .load()
>   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>   .as[(String, String)]
>   .flatMap {...}
>   .groupByKey(...)
>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...)
>   .foreach(...)
>   .outputMode(OutputMode.Update)
>   .option("checkpointLocation",
> sparkConfiguration.properties.checkpointDirectory)
>   .start()
>   .awaitTermination(){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-24987) Kafka Cached Consumer Leaking File Descriptors

2018-08-03 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-24987:


Assignee: Apache Spark

> Kafka Cached Consumer Leaking File Descriptors
> --
>
> Key: SPARK-24987
> URL: https://issues.apache.org/jira/browse/SPARK-24987
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0, 2.3.1
> Environment: Spark 2.3.1
> Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
>  Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)
>  
>Reporter: Yuval Itzchakov
>Assignee: Apache Spark
>Priority: Critical
>
> Setup:
>  * Spark 2.3.1
>  * Java 1.8.0 (112)
>  * Standalone Cluster Manager
>  * 3 Nodes, 1 Executor per node.
> Spark 2.3.0 introduced a new mechanism for caching Kafka consumers 
> (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel)
>  via KafkaDataConsumer.acquire.
> It seems that there are situations (I've been trying to debug it, haven't 
> been able to find the root cause as of yet) where cached consumers remain "in 
> use" throughout the life time of the task and are never released. This can be 
> identified by the following line of the stack trace:
> at 
> org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460)
> Which points to:
> {code:java}
> } else if (existingInternalConsumer.inUse) {
>   // If consumer is already cached but is currently in use, then return a new 
> consumer
>   NonCachedKafkaDataConsumer(newInternalConsumer)
> {code}
>  Meaning the existing consumer created for that `TopicPartition` is still in 
> use for some reason. The weird thing is that you can see this for very old 
> tasks which have already finished successfully.
> I've traced down this leak using file leak detector, attaching it to the 
> running Executor JVM process. I've emitted the list of open file descriptors 
> which [you can find 
> here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d],
>  and you can see that the majority of them are epoll FD used by Kafka 
> Consumers, indicating that they aren't closing.
>  Spark graph:
> {code:java}
> kafkaStream
>   .load()
>   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>   .as[(String, String)]
>   .flatMap {...}
>   .groupByKey(...)
>   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...)
>   .foreach(...)
>   .outputMode(OutputMode.Update)
>   .option("checkpointLocation",
> sparkConfiguration.properties.checkpointDirectory)
>   .start()
>   .awaitTermination(){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org