[ 
https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417474#comment-16417474
 ] 

Andrew Korzhuev edited comment on SPARK-23682 at 3/28/18 2:56 PM:
------------------------------------------------------------------

I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on:
 * AWS S3 checkpoint
 * Spark 2.3.0 on k8s
 * Structured stream - stream join

I managed to track the leak down to

[https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_
{code:java}
private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code}
_,_ which appears not to clean up _UnsafeRow_ coming from:
{code:java}
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, 
UnsafeRow]{code}
I noticed that memory leaks slower if data is buffered to disk:
{code:java}
spark.hadoop.fs.s3a.fast.upload                       true
spark.hadoop.fs.s3a.fast.upload.buffer                disk
{code}
It also seems that the state persisted to S3 is never cleaned up, as both 
number of objects and volume grows indefinitely.

!Screen Shot 2018-03-28 at 16.44.20.png!


was (Author: akorzhuev):
I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on:
 * AWS S3 checkpoint
 * Spark 2.3.0 on k8s
 * Structured stream - stream join

I managed to track the leak down to

[https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_
{code:java}
private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code}
_,_ which appears not to clean up _UnsafeRow_s coming from: 
{code:java}
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, 
UnsafeRow]{code}
I noticed that memory leaks slower if data is buffered to disk:
{code:java}
spark.hadoop.fs.s3a.fast.upload                       true
spark.hadoop.fs.s3a.fast.upload.buffer                disk
{code}
It also seems that the state persisted to S3 is never cleaned up, as both 
number of objects and volume grows indefinitely.

!Screen Shot 2018-03-28 at 16.44.20.png!

> Memory issue with Spark structured streaming
> --------------------------------------------
>
>                 Key: SPARK-23682
>                 URL: https://issues.apache.org/jira/browse/SPARK-23682
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.2.0
>         Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
> |spark.blacklist.decommissioning.enabled|true|
> |spark.blacklist.decommissioning.timeout|1h|
> |spark.cleaner.periodicGC.interval|10min|
> |spark.default.parallelism|18|
> |spark.dynamicAllocation.enabled|false|
> |spark.eventLog.enabled|true|
> |spark.executor.cores|3|
> |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails 
> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC 
> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 
> -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'|
> |spark.executor.id|driver|
> |spark.executor.instances|3|
> |spark.executor.memory|22G|
> |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2|
> |spark.hadoop.parquet.enable.summary-metadata|false|
> |spark.hadoop.yarn.timeline-service.enabled|false|
> |spark.jars| |
> |spark.master|yarn|
> |spark.memory.fraction|0.9|
> |spark.memory.storageFraction|0.3|
> |spark.memory.useLegacyMode|false|
> |spark.rdd.compress|true|
> |spark.resourceManager.cleanupExpiredHost|true|
> |spark.scheduler.mode|FIFO|
> |spark.serializer|org.apache.spark.serializer.KryoSerializer|
> |spark.shuffle.service.enabled|true|
> |spark.speculation|false|
> |spark.sql.parquet.filterPushdown|true|
> |spark.sql.parquet.mergeSchema|false|
> |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse|
> |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true|
> |spark.submit.deployMode|client|
> |spark.yarn.am.cores|1|
> |spark.yarn.am.memory|2G|
> |spark.yarn.am.memoryOverhead|1G|
> |spark.yarn.executor.memoryOverhead|3G|
>            Reporter: Yuriy Bondaruk
>            Priority: Major
>              Labels: Memory, memory, memory-leak
>         Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot 
> 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen 
> Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, 
> Spark executors GC time.png, image-2018-03-22-14-46-31-960.png
>
>
> It seems like there is an issue with memory in structured streaming. A stream 
> with aggregation (dropDuplicates()) and data partitioning constantly 
> increases memory usage and finally executors fails with exit code 137:
> {quote}ExecutorLostFailure (executor 2 exited caused by one of the running 
> tasks) Reason: Container marked as failed: 
> container_1520214726510_0001_01_000003 on host: 
> ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: 
> Container killed on request. Exit code is 137
> Container exited with a non-zero exit code 137
> Killed by external signal{quote}
> Stream creating looks something like this:
> {quote}session
>     .readStream()
>     .schema(inputSchema)
>     .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
>     .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
>     .csv("s3://test-bucket/input")
>     .as(Encoders.bean(TestRecord.class))
>     .flatMap(mf, Encoders.bean(TestRecord.class))
>     .dropDuplicates("testId", "testName")
>     .withColumn("year", 
> functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), 
> "YYYY"))
>     .writeStream()
>     .option("path", "s3://test-bucket/output")
>     .option("checkpointLocation", "s3://test-bucket/checkpoint")
>     .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
>     .partitionBy("year")
>     .format("parquet")
>     .outputMode(OutputMode.Append())
>     .queryName("test-stream")
>     .start();{quote}
> Analyzing the heap dump I found that most of the memory used by 
> {{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}}
>  that is referenced from 
> [StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196]
>  
> On the first glance it looks normal since that is how Spark keeps aggregation 
> keys in memory. However I did my testing by renaming files in source folder, 
> so that they could be picked up by spark again. Since input records are the 
> same all further rows should be rejected as duplicates and memory consumption 
> shouldn't increase but it's not true. Moreover, GC time took more than 30% of 
> total processing time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to