[ https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15467108#comment-15467108 ]
Sean Owen commented on SPARK-17381: ----------------------------------- The issue is that it's maintaining min/max stats for columns, including string columns. If your elements are huge strings of HTML, then this will unfortunately pull back at least two of them per task to the driver. I don't know of a way to disable that. You might consider whether you can tune the job a little bit to have fewer tasks? if it has unnecessarily thousands of them you might gain in several ways by tuning it down. > Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics > ------------------------------------------------------------- > > Key: SPARK-17381 > URL: https://issues.apache.org/jira/browse/SPARK-17381 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.0.0 > Environment: EMR 5.0.0 (submitted as yarn-client) > Java Version 1.8.0_101 (Oracle Corporation) > Scala Version version 2.11.8 > Problem also happens when I run locally with similar versions of java/scala. > OS: Ubuntu 16.04 > Reporter: Joao Duarte > > I am running a Spark Streaming application from a Kinesis stream. After some > hours running it gets out of memory. After a driver heap dump I found two > problems: > 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems > this was a problem before: > https://issues.apache.org/jira/browse/SPARK-11192); > To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just > needed to run the code below: > {code} > val dstream = ssc.union(kinesisStreams) > dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => { > val toyDF = streamInfo.map(_ => > (1, "data","more data " > )) > .toDF("Num", "Data", "MoreData" ) > toyDF.agg(sum("Num")).first().get(0) > } > ) > {code} > 2) huge amount of Array[Byte] (9Gb+) > After some analysis, I noticed that most of the Array[Byte] where being > referenced by objects that were being referenced by SQLTaskMetrics. The > strangest thing is that those Array[Byte] were basically text that were > loaded in the executors, so they should never be in the driver at all! > Still could not replicate the 2nd problem with a simple code (the original > was complex with data coming from S3, DynamoDB and other databases). However, > when I debug the application I can see that in Executor.scala, during > reportHeartBeat(), the data that should not be sent to the driver is being > added to "accumUpdates" which, as I understand, will be sent to the driver > for reporting. > To be more precise, one of the taskRunner in the loop "for (taskRunner <- > runningTasks.values().asScala)" contains a GenericInternalRow with a lot of > data that should not go to the driver. The path would be in my case: > taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if > not the same) to the data I see when I do a driver heap dump. > I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is > fixed I would have less of this undesirable data in the driver and I could > run my streaming app for a long period of time, but I think there will always > be some performance lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org