[
https://issues.apache.org/jira/browse/SPARK-24987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16569432#comment-16569432
]
Yuval Itzchakov edited comment on SPARK-24987 at 8/5/18 10:54 AM:
------------------------------------------------------------------
[[email protected]] Is there any chance this will make it in time for 2.3.2?
This is a critical fix for us.
was (Author: yuval.itzchakov):
Is there any chance this will make it in time for 2.3.2? This is a critical fix
for us.
> Kafka Cached Consumer Leaking File Descriptors
> ----------------------------------------------
>
> Key: SPARK-24987
> URL: https://issues.apache.org/jira/browse/SPARK-24987
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.3.0, 2.3.1
> Environment: Spark 2.3.1
> Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
> Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)
>
> Reporter: Yuval Itzchakov
> Assignee: Yuval Itzchakov
> Priority: Critical
> Fix For: 2.4.0
>
>
> Setup:
> * Spark 2.3.1
> * Java 1.8.0 (112)
> * Standalone Cluster Manager
> * 3 Nodes, 1 Executor per node.
> Spark 2.3.0 introduced a new mechanism for caching Kafka consumers
> (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel)
> via KafkaDataConsumer.acquire.
> It seems that there are situations (I've been trying to debug it, haven't
> been able to find the root cause as of yet) where cached consumers remain "in
> use" throughout the life time of the task and are never released. This can be
> identified by the following line of the stack trace:
> at
> org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460)
> Which points to:
> {code:java}
> } else if (existingInternalConsumer.inUse) {
> // If consumer is already cached but is currently in use, then return a new
> consumer
> NonCachedKafkaDataConsumer(newInternalConsumer)
> {code}
> Meaning the existing consumer created for that `TopicPartition` is still in
> use for some reason. The weird thing is that you can see this for very old
> tasks which have already finished successfully.
> I've traced down this leak using file leak detector, attaching it to the
> running Executor JVM process. I've emitted the list of open file descriptors
> which [you can find
> here|https://gist.github.com/YuvalItzchakov/cdbdd7f67604557fccfbcce673c49e5d],
> and you can see that the majority of them are epoll FD used by Kafka
> Consumers, indicating that they aren't closing.
> Spark graph:
> {code:java}
> kafkaStream
> .load()
> .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
> .as[(String, String)]
> .flatMap {...}
> .groupByKey(...)
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...)
> .foreach(...)
> .outputMode(OutputMode.Update)
> .option("checkpointLocation",
> sparkConfiguration.properties.checkpointDirectory)
> .start()
> .awaitTermination(){code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]