[ https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15459021#comment-15459021 ]
Joao Duarte commented on SPARK-17381: ------------------------------------- Hi Sean. Thanks for commenting. I set to Blocker because I can't run the app in a production environment. I am not explicitly adding accumulators in my code. I am trying to replicate the issue with a simpler code such that I can post it here, but was unable to do that so far. Basically, for each row of the Dstream I collect a lot of data from different sources, transform the data into a dataframe and run some ML Pipelines. Again, I don't explicitly collect data or use accumulators. I started stripping down the code by removing all ML Pipelines and now I only have some functions that I run in RDD maps to collect the data I need (so, it runs in the executors), transform that data, create a dataframe and the perform an aggregation (just a dummy sum of a column). When I inspect the drivers heap, some data I loaded in the executors are there. The driver's heap "path" that contains the "unwanted" data is objSQLTaskMetrics.accumulatorUpdates[2]._2 where: -objSQLTaskMetrics is one of the several SQLTaskMetrics - _2 class is Collections$UnmodifiableRandomAccessList with size 1 which contains the GenericInternalRow with the "unwanted" data. > Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics > ------------------------------------------------------------- > > Key: SPARK-17381 > URL: https://issues.apache.org/jira/browse/SPARK-17381 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.0.0 > Environment: EMR 5.0.0 (submitted as yarn-client) > Java Version 1.8.0_101 (Oracle Corporation) > Scala Version version 2.11.8 > Problem also happens when I run locally with similar versions of java/scala. > OS: Ubuntu 16.04 > Reporter: Joao Duarte > > I am running a Spark Streaming application from a Kinesis stream. After some > hours running it gets out of memory. After a driver heap dump I found two > problems: > 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems > this was a problem before: > https://issues.apache.org/jira/browse/SPARK-11192); > To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just > needed to run the code below: > {code} > val dstream = ssc.union(kinesisStreams) > dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => { > //load data > val toyDF = streamInfo.map(_ => > (1, "data","more data " > )) > .toDF("Num", "Data", "MoreData" ) > toyDF.agg(sum("Num")).first().get(0) > } > ) > {code} > 2) huge amount of Array[Byte] (9Gb+) > After some analysis, I noticed that most of the Array[Byte] where being > referenced by objects that were being referenced by SQLTaskMetrics. The > strangest thing is that those Array[Byte] were basically text that were > loaded in the executors, so they should never be in the driver at all! > Still could not replicate the 2nd problem with a simple code (the original > was complex with data coming from S3, DynamoDB and other databases). However, > when I debug the application I can see that in Executor.scala, during > reportHeartBeat(), the data that should not be sent to the driver is being > added to "accumUpdates" which, as I understand, will be sent to the driver > for reporting. > To be more precise, one of the taskRunner in the loop "for (taskRunner <- > runningTasks.values().asScala)" contains a GenericInternalRow with a lot of > data that should not go to the driver. The path would be in my case: > taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if > not the same) to the data I see when I do a driver heap dump. > I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is > fixed I would have less of this undesirable data in the driver and I could > run my streaming app for a long period of time, but I think there will always > be some performance lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org