[
https://issues.apache.org/jira/browse/SPARK-27818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16847097#comment-16847097
]
Jungtaek Lim edited comment on SPARK-27818 at 5/23/19 10:45 PM:
----------------------------------------------------------------
I think this is already resolved via SPARK-24987 which is available in Spark
2.3.2 and higher. Please check with Spark 2.3.2 spark-sql-kafka artifact, and
close this if it helps.
was (Author: kabhwan):
I think this is already resolved via SPARK-24987 which is available in Spark
2.3.2 and higher.
> Spark Structured Streaming executors fails with OutOfMemoryError due to
> KafkaMbeans
> -----------------------------------------------------------------------------------
>
> Key: SPARK-27818
> URL: https://issues.apache.org/jira/browse/SPARK-27818
> Project: Spark
> Issue Type: Bug
> Components: Spark Core, Structured Streaming
> Affects Versions: 2.3.0
> Environment: OS: Linux
> HDP: 2.6.5.0-292
> Spark 2.3.0.2.6.5.0-292
> Kafka 1.0.0.2.6.5.0-292.
> Reporter: Ruslan Taran
> Priority: Major
>
> Checking the heap allocation with VirtualVM indicates that JMX Mbean Server
> memory usage grows linearly with time.
> After a further investigation it seems that JMX Mbean Server is filled with
> thousands of instances of KafkaMbean objects with metrics for consumer-\d+
> that goes into thousands (equal to the number of tasks created on the
> executor).
> {code:java}
> $KafkaMbean.objectName._canonicalName =
> kafka.consumer:client-id=consumer-\d+,type=consumer-metrics
> {code}
>
> Running Kafka consumer with DEBUG logs on the executor shows that the
> executor adds thousands of metrics sensors and often does not remove them at
> all or only removes some.
> I would expect KafkaMbeans to be cleaned once the task has been completed.
> But it seems that they are not cleaned when spark produces the following
> message:
>
> {code:java}
> [Executor task launch worker for task 42 |
> org.apache.spark.sql.kafka010.KafkaSourceRDD] INFO : Beginning offset 37 is
> the same as ending offset skipping extractBytesOutput 1
> {code}
>
> According to KafkaSourceRDD code consumer.release() is not called in this
> case eventually resulting in KafkaMetrics being retained in JMX Mbean Server
> for the corresponding task/consumer id.
>
> Here is how I initialise structured streaming:
> {code:java}
> sparkSession
> .readStream
> .format("kafka")
> .options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS,
> "subscribePattern" -> INPUT_TOPIC,
> "startingOffsets" -> "earliest",
> "failOnDataLoss" -> "false"))
> .mapPartitions(processData)
> .writeStream
> .format("kafka")
> .options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS,
> "checkpointLocation" -> CHECKPOINT_LOCATION))
> .queryName("Process Data")
> .outputMode("update")
> .trigger(Trigger.ProcessingTime(1000))
> .load()
> .start()
> .awaitTermination()
> {code}
>
> The Kafka partitions in question often have no data due to the sporadic
> nature of the producer.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]