[ https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16623052#comment-16623052 ]
Sahil Aggarwal commented on SPARK-23682: ---------------------------------------- Thanks [~kabhwan], will try that out. Was able to limit the memory usage by reducing spark.sql.streaming.minBatchesToRetain to 4 but the patch to remove the redundant keyValue will definitely help in this case where group by keys are major chunk in the message. > Memory issue with Spark structured streaming > -------------------------------------------- > > Key: SPARK-23682 > URL: https://issues.apache.org/jira/browse/SPARK-23682 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming > Affects Versions: 2.2.0 > Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3 > |spark.blacklist.decommissioning.enabled|true| > |spark.blacklist.decommissioning.timeout|1h| > |spark.cleaner.periodicGC.interval|10min| > |spark.default.parallelism|18| > |spark.dynamicAllocation.enabled|false| > |spark.eventLog.enabled|true| > |spark.executor.cores|3| > |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails > -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC > -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 > -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'| > |spark.executor.id|driver| > |spark.executor.instances|3| > |spark.executor.memory|22G| > |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2| > |spark.hadoop.parquet.enable.summary-metadata|false| > |spark.hadoop.yarn.timeline-service.enabled|false| > |spark.jars| | > |spark.master|yarn| > |spark.memory.fraction|0.9| > |spark.memory.storageFraction|0.3| > |spark.memory.useLegacyMode|false| > |spark.rdd.compress|true| > |spark.resourceManager.cleanupExpiredHost|true| > |spark.scheduler.mode|FIFO| > |spark.serializer|org.apache.spark.serializer.KryoSerializer| > |spark.shuffle.service.enabled|true| > |spark.speculation|false| > |spark.sql.parquet.filterPushdown|true| > |spark.sql.parquet.mergeSchema|false| > |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse| > |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true| > |spark.submit.deployMode|client| > |spark.yarn.am.cores|1| > |spark.yarn.am.memory|2G| > |spark.yarn.am.memoryOverhead|1G| > |spark.yarn.executor.memoryOverhead|3G| > Reporter: Yuriy Bondaruk > Priority: Major > Labels: Memory, memory, memory-leak > Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot > 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen > Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, > Spark executors GC time.png, image-2018-03-22-14-46-31-960.png, > screen_shot_2018-03-20_at_15.23.29.png > > > It seems like there is an issue with memory in structured streaming. A stream > with aggregation (dropDuplicates()) and data partitioning constantly > increases memory usage and finally executors fails with exit code 137: > {quote}ExecutorLostFailure (executor 2 exited caused by one of the running > tasks) Reason: Container marked as failed: > container_1520214726510_0001_01_000003 on host: > ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: > Container killed on request. Exit code is 137 > Container exited with a non-zero exit code 137 > Killed by external signal{quote} > Stream creating looks something like this: > {quote}session > .readStream() > .schema(inputSchema) > .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB) > .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF) > .csv("s3://test-bucket/input") > .as(Encoders.bean(TestRecord.class)) > .flatMap(mf, Encoders.bean(TestRecord.class)) > .dropDuplicates("testId", "testName") > .withColumn("year", > functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), > "YYYY")) > .writeStream() > .option("path", "s3://test-bucket/output") > .option("checkpointLocation", "s3://test-bucket/checkpoint") > .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS)) > .partitionBy("year") > .format("parquet") > .outputMode(OutputMode.Append()) > .queryName("test-stream") > .start();{quote} > Analyzing the heap dump I found that most of the memory used by > {{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}} > that is referenced from > [StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196] > > On the first glance it looks normal since that is how Spark keeps aggregation > keys in memory. However I did my testing by renaming files in source folder, > so that they could be picked up by spark again. Since input records are the > same all further rows should be rejected as duplicates and memory consumption > shouldn't increase but it's not true. Moreover, GC time took more than 30% of > total processing time. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org