[jira] [Commented] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics
[ https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467224#comment-15467224 ] Joao Duarte commented on SPARK-17381: - Oh, I see. I'll change the Issue type from Bug to Improvement and leave the suggestion that there should be an option to disabling sending these string to the driver, or at least truncate them to a reasonable size. Thanks again for your help! > Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics > - > > Key: SPARK-17381 > URL: https://issues.apache.org/jira/browse/SPARK-17381 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 > Environment: EMR 5.0.0 (submitted as yarn-client) > Java Version 1.8.0_101 (Oracle Corporation) > Scala Version version 2.11.8 > Problem also happens when I run locally with similar versions of java/scala. > OS: Ubuntu 16.04 >Reporter: Joao Duarte > > I am running a Spark Streaming application from a Kinesis stream. After some > hours running it gets out of memory. After a driver heap dump I found two > problems: > 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems > this was a problem before: > https://issues.apache.org/jira/browse/SPARK-11192); > To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just > needed to run the code below: > {code} > val dstream = ssc.union(kinesisStreams) > dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => { > val toyDF = streamInfo.map(_ => > (1, "data","more data " > )) > .toDF("Num", "Data", "MoreData" ) > toyDF.agg(sum("Num")).first().get(0) > } > ) > {code} > 2) huge amount of Array[Byte] (9Gb+) > After some analysis, I noticed that most of the Array[Byte] where being > referenced by objects that were being referenced by SQLTaskMetrics. The > strangest thing is that those Array[Byte] were basically text that were > loaded in the executors, so they should never be in the driver at all! > Still could not replicate the 2nd problem with a simple code (the original > was complex with data coming from S3, DynamoDB and other databases). However, > when I debug the application I can see that in Executor.scala, during > reportHeartBeat(), the data that should not be sent to the driver is being > added to "accumUpdates" which, as I understand, will be sent to the driver > for reporting. > To be more precise, one of the taskRunner in the loop "for (taskRunner <- > runningTasks.values().asScala)" contains a GenericInternalRow with a lot of > data that should not go to the driver. The path would be in my case: > taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if > not the same) to the data I see when I do a driver heap dump. > I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is > fixed I would have less of this undesirable data in the driver and I could > run my streaming app for a long period of time, but I think there will always > be some performance lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics
[ https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joao Duarte updated SPARK-17381: Issue Type: Improvement (was: Bug) > Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics > - > > Key: SPARK-17381 > URL: https://issues.apache.org/jira/browse/SPARK-17381 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.0 > Environment: EMR 5.0.0 (submitted as yarn-client) > Java Version 1.8.0_101 (Oracle Corporation) > Scala Version version 2.11.8 > Problem also happens when I run locally with similar versions of java/scala. > OS: Ubuntu 16.04 >Reporter: Joao Duarte > > I am running a Spark Streaming application from a Kinesis stream. After some > hours running it gets out of memory. After a driver heap dump I found two > problems: > 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems > this was a problem before: > https://issues.apache.org/jira/browse/SPARK-11192); > To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just > needed to run the code below: > {code} > val dstream = ssc.union(kinesisStreams) > dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => { > val toyDF = streamInfo.map(_ => > (1, "data","more data " > )) > .toDF("Num", "Data", "MoreData" ) > toyDF.agg(sum("Num")).first().get(0) > } > ) > {code} > 2) huge amount of Array[Byte] (9Gb+) > After some analysis, I noticed that most of the Array[Byte] where being > referenced by objects that were being referenced by SQLTaskMetrics. The > strangest thing is that those Array[Byte] were basically text that were > loaded in the executors, so they should never be in the driver at all! > Still could not replicate the 2nd problem with a simple code (the original > was complex with data coming from S3, DynamoDB and other databases). However, > when I debug the application I can see that in Executor.scala, during > reportHeartBeat(), the data that should not be sent to the driver is being > added to "accumUpdates" which, as I understand, will be sent to the driver > for reporting. > To be more precise, one of the taskRunner in the loop "for (taskRunner <- > runningTasks.values().asScala)" contains a GenericInternalRow with a lot of > data that should not go to the driver. The path would be in my case: > taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if > not the same) to the data I see when I do a driver heap dump. > I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is > fixed I would have less of this undesirable data in the driver and I could > run my streaming app for a long period of time, but I think there will always > be some performance lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics
[ https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15467102#comment-15467102 ] Joao Duarte commented on SPARK-17381: - Well, the application is stable after 24h+ (and running). If it is normal that 9GB of HTML pages processed by executors are sent to the driver in about two hours the problem is solved. Thank you for the suggestions. Should I close the issue? > Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics > - > > Key: SPARK-17381 > URL: https://issues.apache.org/jira/browse/SPARK-17381 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 > Environment: EMR 5.0.0 (submitted as yarn-client) > Java Version 1.8.0_101 (Oracle Corporation) > Scala Version version 2.11.8 > Problem also happens when I run locally with similar versions of java/scala. > OS: Ubuntu 16.04 >Reporter: Joao Duarte > > I am running a Spark Streaming application from a Kinesis stream. After some > hours running it gets out of memory. After a driver heap dump I found two > problems: > 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems > this was a problem before: > https://issues.apache.org/jira/browse/SPARK-11192); > To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just > needed to run the code below: > {code} > val dstream = ssc.union(kinesisStreams) > dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => { > val toyDF = streamInfo.map(_ => > (1, "data","more data " > )) > .toDF("Num", "Data", "MoreData" ) > toyDF.agg(sum("Num")).first().get(0) > } > ) > {code} > 2) huge amount of Array[Byte] (9Gb+) > After some analysis, I noticed that most of the Array[Byte] where being > referenced by objects that were being referenced by SQLTaskMetrics. The > strangest thing is that those Array[Byte] were basically text that were > loaded in the executors, so they should never be in the driver at all! > Still could not replicate the 2nd problem with a simple code (the original > was complex with data coming from S3, DynamoDB and other databases). However, > when I debug the application I can see that in Executor.scala, during > reportHeartBeat(), the data that should not be sent to the driver is being > added to "accumUpdates" which, as I understand, will be sent to the driver > for reporting. > To be more precise, one of the taskRunner in the loop "for (taskRunner <- > runningTasks.values().asScala)" contains a GenericInternalRow with a lot of > data that should not go to the driver. The path would be in my case: > taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if > not the same) to the data I see when I do a driver heap dump. > I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is > fixed I would have less of this undesirable data in the driver and I could > run my streaming app for a long period of time, but I think there will always > be some performance lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics
[ https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464740#comment-15464740 ] Joao Duarte edited comment on SPARK-17381 at 9/5/16 11:00 AM: -- Thank you for your suggestion [~srowen]! Setting spark.sql.ui.retainedExecutions to a low number seems to be a good workaround. I'm running my application for about one hour with spark.sql.ui.retainedExecutions=10 and the number of SQLTaskMetrics objects and the heap memory size of the driver seem to stabilise. I'll give an update at the end of the day or tomorrow to tell you if it remains stable. However, I think it is really strange that the driver is sent data that are supposed to be only in the executors. In my case, I am parsing HTML pages and some of those are being sent to the driver as part of ColumnStats (as you referred in you previous comment). Are they being sent as a summary by mistake? Do Spark really need this kind of information? The work around enables my application to run but sending unneeded data to the driver certainly reduces performance (some HTML pages I parse can be really big). Cheers was (Author: joaomaiaduarte): Hi Sean, Thank you for your suggestion! Setting spark.sql.ui.retainedExecutions to a low number seems to be a good workaround. I'm running my application for about one hour with spark.sql.ui.retainedExecutions=10 and the number of SQLTaskMetrics objects and the heap memory size of the driver seem to stabilise. I'll give an update at the end of the day or tomorrow to tell you if it remains stable. However, I think it is really strange that the driver is sent data that are supposed to be only in the executors. In my case, I am parsing HTML pages and some of those are being sent to the driver as part of ColumnStats (as you referred in you previous comment). Are they being sent as a summary by mistake? Do Spark really need this kind of information? The work around enables my application to run but sending unneeded data to the driver certainly reduces performance (some HTML pages I parse can be really big). Cheers > Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics > - > > Key: SPARK-17381 > URL: https://issues.apache.org/jira/browse/SPARK-17381 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 > Environment: EMR 5.0.0 (submitted as yarn-client) > Java Version 1.8.0_101 (Oracle Corporation) > Scala Version version 2.11.8 > Problem also happens when I run locally with similar versions of java/scala. > OS: Ubuntu 16.04 >Reporter: Joao Duarte > > I am running a Spark Streaming application from a Kinesis stream. After some > hours running it gets out of memory. After a driver heap dump I found two > problems: > 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems > this was a problem before: > https://issues.apache.org/jira/browse/SPARK-11192); > To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just > needed to run the code below: > {code} > val dstream = ssc.union(kinesisStreams) > dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => { > val toyDF = streamInfo.map(_ => > (1, "data","more data " > )) > .toDF("Num", "Data", "MoreData" ) > toyDF.agg(sum("Num")).first().get(0) > } > ) > {code} > 2) huge amount of Array[Byte] (9Gb+) > After some analysis, I noticed that most of the Array[Byte] where being > referenced by objects that were being referenced by SQLTaskMetrics. The > strangest thing is that those Array[Byte] were basically text that were > loaded in the executors, so they should never be in the driver at all! > Still could not replicate the 2nd problem with a simple code (the original > was complex with data coming from S3, DynamoDB and other databases). However, > when I debug the application I can see that in Executor.scala, during > reportHeartBeat(), the data that should not be sent to the driver is being > added to "accumUpdates" which, as I understand, will be sent to the driver > for reporting. > To be more precise, one of the taskRunner in the loop "for (taskRunner <- > runningTasks.values().asScala)" contains a GenericInternalRow with a lot of > data that should not go to the driver. The path would be in my case: > taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if > not the same) to the data I see when I do a driver heap dump. > I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is > fixed I would have less of this undesirable data in the driver and I could > run my streaming app for a long period of time, but I think there will always > be some performance lost. -- This message was sent by
[jira] [Commented] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics
[ https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464740#comment-15464740 ] Joao Duarte commented on SPARK-17381: - Hi Sean, Thank you for your suggestion! Setting spark.sql.ui.retainedExecutions to a low number seems to be a good workaround. I'm running my application for about one hour with spark.sql.ui.retainedExecutions=10 and the number of SQLTaskMetrics objects and the heap memory size of the driver seem to stabilise. I'll give an update at the end of the day or tomorrow to tell you if it remains stable. However, I think it is really strange that the driver is sent data that are supposed to be only in the executors. In my case, I am parsing HTML pages and some of those are being sent to the driver as part of ColumnStats (as you referred in you previous comment). Are they being sent as a summary by mistake? Do Spark really need this kind of information? The work around enables my application to run but sending unneeded data to the driver certainly reduces performance (some HTML pages I parse can be really big). Cheers > Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics > - > > Key: SPARK-17381 > URL: https://issues.apache.org/jira/browse/SPARK-17381 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 > Environment: EMR 5.0.0 (submitted as yarn-client) > Java Version 1.8.0_101 (Oracle Corporation) > Scala Version version 2.11.8 > Problem also happens when I run locally with similar versions of java/scala. > OS: Ubuntu 16.04 >Reporter: Joao Duarte > > I am running a Spark Streaming application from a Kinesis stream. After some > hours running it gets out of memory. After a driver heap dump I found two > problems: > 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems > this was a problem before: > https://issues.apache.org/jira/browse/SPARK-11192); > To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just > needed to run the code below: > {code} > val dstream = ssc.union(kinesisStreams) > dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => { > val toyDF = streamInfo.map(_ => > (1, "data","more data " > )) > .toDF("Num", "Data", "MoreData" ) > toyDF.agg(sum("Num")).first().get(0) > } > ) > {code} > 2) huge amount of Array[Byte] (9Gb+) > After some analysis, I noticed that most of the Array[Byte] where being > referenced by objects that were being referenced by SQLTaskMetrics. The > strangest thing is that those Array[Byte] were basically text that were > loaded in the executors, so they should never be in the driver at all! > Still could not replicate the 2nd problem with a simple code (the original > was complex with data coming from S3, DynamoDB and other databases). However, > when I debug the application I can see that in Executor.scala, during > reportHeartBeat(), the data that should not be sent to the driver is being > added to "accumUpdates" which, as I understand, will be sent to the driver > for reporting. > To be more precise, one of the taskRunner in the loop "for (taskRunner <- > runningTasks.values().asScala)" contains a GenericInternalRow with a lot of > data that should not go to the driver. The path would be in my case: > taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if > not the same) to the data I see when I do a driver heap dump. > I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is > fixed I would have less of this undesirable data in the driver and I could > run my streaming app for a long period of time, but I think there will always > be some performance lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics
[ https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joao Duarte updated SPARK-17381: Description: I am running a Spark Streaming application from a Kinesis stream. After some hours running it gets out of memory. After a driver heap dump I found two problems: 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems this was a problem before: https://issues.apache.org/jira/browse/SPARK-11192); To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just needed to run the code below: {code} val dstream = ssc.union(kinesisStreams) dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => { val toyDF = streamInfo.map(_ => (1, "data","more data " )) .toDF("Num", "Data", "MoreData" ) toyDF.agg(sum("Num")).first().get(0) } ) {code} 2) huge amount of Array[Byte] (9Gb+) After some analysis, I noticed that most of the Array[Byte] where being referenced by objects that were being referenced by SQLTaskMetrics. The strangest thing is that those Array[Byte] were basically text that were loaded in the executors, so they should never be in the driver at all! Still could not replicate the 2nd problem with a simple code (the original was complex with data coming from S3, DynamoDB and other databases). However, when I debug the application I can see that in Executor.scala, during reportHeartBeat(), the data that should not be sent to the driver is being added to "accumUpdates" which, as I understand, will be sent to the driver for reporting. To be more precise, one of the taskRunner in the loop "for (taskRunner <- runningTasks.values().asScala)" contains a GenericInternalRow with a lot of data that should not go to the driver. The path would be in my case: taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if not the same) to the data I see when I do a driver heap dump. I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is fixed I would have less of this undesirable data in the driver and I could run my streaming app for a long period of time, but I think there will always be some performance lost. was: I am running a Spark Streaming application from a Kinesis stream. After some hours running it gets out of memory. After a driver heap dump I found two problems: 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems this was a problem before: https://issues.apache.org/jira/browse/SPARK-11192); To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just needed to run the code below: {code} val dstream = ssc.union(kinesisStreams) dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => { //load data val toyDF = streamInfo.map(_ => (1, "data","more data " )) .toDF("Num", "Data", "MoreData" ) toyDF.agg(sum("Num")).first().get(0) } ) {code} 2) huge amount of Array[Byte] (9Gb+) After some analysis, I noticed that most of the Array[Byte] where being referenced by objects that were being referenced by SQLTaskMetrics. The strangest thing is that those Array[Byte] were basically text that were loaded in the executors, so they should never be in the driver at all! Still could not replicate the 2nd problem with a simple code (the original was complex with data coming from S3, DynamoDB and other databases). However, when I debug the application I can see that in Executor.scala, during reportHeartBeat(), the data that should not be sent to the driver is being added to "accumUpdates" which, as I understand, will be sent to the driver for reporting. To be more precise, one of the taskRunner in the loop "for (taskRunner <- runningTasks.values().asScala)" contains a GenericInternalRow with a lot of data that should not go to the driver. The path would be in my case: taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if not the same) to the data I see when I do a driver heap dump. I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is fixed I would have less of this undesirable data in the driver and I could run my streaming app for a long period of time, but I think there will always be some performance lost. > Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics > - > > Key: SPARK-17381 > URL: https://issues.apache.org/jira/browse/SPARK-17381 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 > Environment: EMR 5.0.0 (submitted as yarn-client) > Java Version 1.8.0_101 (Oracle Corporation) > Scala Version version 2.11.8 > Problem also happens when I run locally with similar versions of java/scala. > OS: Ubuntu 16.04 >
[jira] [Comment Edited] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics
[ https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15459021#comment-15459021 ] Joao Duarte edited comment on SPARK-17381 at 9/2/16 4:49 PM: - Hi Sean. Thanks for commenting. I set to Blocker because I can't run the app in a production environment. I am not explicitly adding accumulators in my code. I am trying to replicate the issue with a simpler code such that I can post it here, but was unable to do that so far. Basically, for each row of the Dstream I collect a lot of data from different sources, transform the data into a dataframe and run some ML Pipelines. Again, I don't explicitly collect data or use accumulators. I started stripping down the code by removing all ML Pipelines and now I only have some functions that I run in RDD maps to collect the data I need (so, it runs in the executors), transform that data, create a dataframe and the perform an aggregation (just a dummy sum of a column). When I inspect the drivers heap, some data I loaded in the executors are there. The driver's heap "path" that contains the "unwanted" data is objSQLTaskMetrics.accumulatorUpdates[2]._2 where: - objSQLTaskMetrics is one of the several SQLTaskMetrics - _2 class is Collections$UnmodifiableRandomAccessList with size 1 which contains the GenericInternalRow with the "unwanted" data. was (Author: joaomaiaduarte): Hi Sean. Thanks for commenting. I set to Blocker because I can't run the app in a production environment. I am not explicitly adding accumulators in my code. I am trying to replicate the issue with a simpler code such that I can post it here, but was unable to do that so far. Basically, for each row of the Dstream I collect a lot of data from different sources, transform the data into a dataframe and run some ML Pipelines. Again, I don't explicitly collect data or use accumulators. I started stripping down the code by removing all ML Pipelines and now I only have some functions that I run in RDD maps to collect the data I need (so, it runs in the executors), transform that data, create a dataframe and the perform an aggregation (just a dummy sum of a column). When I inspect the drivers heap, some data I loaded in the executors are there. The driver's heap "path" that contains the "unwanted" data is objSQLTaskMetrics.accumulatorUpdates[2]._2 where: -objSQLTaskMetrics is one of the several SQLTaskMetrics - _2 class is Collections$UnmodifiableRandomAccessList with size 1 which contains the GenericInternalRow with the "unwanted" data. > Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics > - > > Key: SPARK-17381 > URL: https://issues.apache.org/jira/browse/SPARK-17381 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 > Environment: EMR 5.0.0 (submitted as yarn-client) > Java Version 1.8.0_101 (Oracle Corporation) > Scala Version version 2.11.8 > Problem also happens when I run locally with similar versions of java/scala. > OS: Ubuntu 16.04 >Reporter: Joao Duarte > > I am running a Spark Streaming application from a Kinesis stream. After some > hours running it gets out of memory. After a driver heap dump I found two > problems: > 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems > this was a problem before: > https://issues.apache.org/jira/browse/SPARK-11192); > To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just > needed to run the code below: > {code} > val dstream = ssc.union(kinesisStreams) > dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => { > //load data > val toyDF = streamInfo.map(_ => > (1, "data","more data " > )) > .toDF("Num", "Data", "MoreData" ) > toyDF.agg(sum("Num")).first().get(0) > } > ) > {code} > 2) huge amount of Array[Byte] (9Gb+) > After some analysis, I noticed that most of the Array[Byte] where being > referenced by objects that were being referenced by SQLTaskMetrics. The > strangest thing is that those Array[Byte] were basically text that were > loaded in the executors, so they should never be in the driver at all! > Still could not replicate the 2nd problem with a simple code (the original > was complex with data coming from S3, DynamoDB and other databases). However, > when I debug the application I can see that in Executor.scala, during > reportHeartBeat(), the data that should not be sent to the driver is being > added to "accumUpdates" which, as I understand, will be sent to the driver > for reporting. > To be more precise, one of the taskRunner in the loop "for (taskRunner <- > runningTasks.values().asScala)" contains a GenericInternalRow with a lot of > data that should not go to the driver. The
[jira] [Commented] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics
[ https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15459021#comment-15459021 ] Joao Duarte commented on SPARK-17381: - Hi Sean. Thanks for commenting. I set to Blocker because I can't run the app in a production environment. I am not explicitly adding accumulators in my code. I am trying to replicate the issue with a simpler code such that I can post it here, but was unable to do that so far. Basically, for each row of the Dstream I collect a lot of data from different sources, transform the data into a dataframe and run some ML Pipelines. Again, I don't explicitly collect data or use accumulators. I started stripping down the code by removing all ML Pipelines and now I only have some functions that I run in RDD maps to collect the data I need (so, it runs in the executors), transform that data, create a dataframe and the perform an aggregation (just a dummy sum of a column). When I inspect the drivers heap, some data I loaded in the executors are there. The driver's heap "path" that contains the "unwanted" data is objSQLTaskMetrics.accumulatorUpdates[2]._2 where: -objSQLTaskMetrics is one of the several SQLTaskMetrics - _2 class is Collections$UnmodifiableRandomAccessList with size 1 which contains the GenericInternalRow with the "unwanted" data. > Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics > - > > Key: SPARK-17381 > URL: https://issues.apache.org/jira/browse/SPARK-17381 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 > Environment: EMR 5.0.0 (submitted as yarn-client) > Java Version 1.8.0_101 (Oracle Corporation) > Scala Version version 2.11.8 > Problem also happens when I run locally with similar versions of java/scala. > OS: Ubuntu 16.04 >Reporter: Joao Duarte > > I am running a Spark Streaming application from a Kinesis stream. After some > hours running it gets out of memory. After a driver heap dump I found two > problems: > 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems > this was a problem before: > https://issues.apache.org/jira/browse/SPARK-11192); > To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just > needed to run the code below: > {code} > val dstream = ssc.union(kinesisStreams) > dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => { > //load data > val toyDF = streamInfo.map(_ => > (1, "data","more data " > )) > .toDF("Num", "Data", "MoreData" ) > toyDF.agg(sum("Num")).first().get(0) > } > ) > {code} > 2) huge amount of Array[Byte] (9Gb+) > After some analysis, I noticed that most of the Array[Byte] where being > referenced by objects that were being referenced by SQLTaskMetrics. The > strangest thing is that those Array[Byte] were basically text that were > loaded in the executors, so they should never be in the driver at all! > Still could not replicate the 2nd problem with a simple code (the original > was complex with data coming from S3, DynamoDB and other databases). However, > when I debug the application I can see that in Executor.scala, during > reportHeartBeat(), the data that should not be sent to the driver is being > added to "accumUpdates" which, as I understand, will be sent to the driver > for reporting. > To be more precise, one of the taskRunner in the loop "for (taskRunner <- > runningTasks.values().asScala)" contains a GenericInternalRow with a lot of > data that should not go to the driver. The path would be in my case: > taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if > not the same) to the data I see when I do a driver heap dump. > I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is > fixed I would have less of this undesirable data in the driver and I could > run my streaming app for a long period of time, but I think there will always > be some performance lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17381) Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics
[ https://issues.apache.org/jira/browse/SPARK-17381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joao Duarte updated SPARK-17381: Description: I am running a Spark Streaming application from a Kinesis stream. After some hours running it gets out of memory. After a driver heap dump I found two problems: 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems this was a problem before: https://issues.apache.org/jira/browse/SPARK-11192); To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just needed to run the code below: {code} val dstream = ssc.union(kinesisStreams) dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => { //load data val toyDF = streamInfo.map(_ => (1, "data","more data " )) .toDF("Num", "Data", "MoreData" ) toyDF.agg(sum("Num")).first().get(0) } ) {code} 2) huge amount of Array[Byte] (9Gb+) After some analysis, I noticed that most of the Array[Byte] where being referenced by objects that were being referenced by SQLTaskMetrics. The strangest thing is that those Array[Byte] were basically text that were loaded in the executors, so they should never be in the driver at all! Still could not replicate the 2nd problem with a simple code (the original was complex with data coming from S3, DynamoDB and other databases). However, when I debug the application I can see that in Executor.scala, during reportHeartBeat(), the data that should not be sent to the driver is being added to "accumUpdates" which, as I understand, will be sent to the driver for reporting. To be more precise, one of the taskRunner in the loop "for (taskRunner <- runningTasks.values().asScala)" contains a GenericInternalRow with a lot of data that should not go to the driver. The path would be in my case: taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if not the same) to the data I see when I do a driver heap dump. I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is fixed I would have less of this undesirable data in the driver and I could run my streaming app for a long period of time, but I think there will always be some performance lost. was: I am running a Spark Streaming application from a Kinesis stream. After some hours running it gets out of memory. After a driver heap dump I found two problems: 1) huge amount of org.apache.spark.sql.execution.ui.SQLTaskMetrics (It seems this was a problem before: https://issues.apache.org/jira/browse/SPARK-11192); To replicate the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak just needed to run the code below: {code} val dstream = ssc.union(kinesisStreams) dstream.foreachRDD((streamInfo: RDD[Array[Byte]]) => { //load data val toyDF = streamInfo.map(_ => (1, "data","more data " )) .toDF("Num", "Data", "MoreData" ) toyDF.agg(sum("Num")).first().get(0) } ) {code} 2) huge amount of Array[Byte] (9Gb+) After some analysis, I noticed that most of the Array[Byte] where being referenced by objects that were bring referenced by SQLTaskMetrics. The strangest thing is that those Array[Byte] were basically text that were loaded in the executors so they should never be in the driver at all! Still could not replicate the 2nd problem with a simple code (the original was complex with data coming from S3, DynamoDB and other databases). However, when I debug the application I can see that in Executor.scala, during reportHeartBeat(), the data that should not be sent to the driver is being added to "accumUpdates" which, as I understand, will be sent to the driver for reporting. To be more precise, one of the taskRunner in the loop "for (taskRunner <- runningTasks.values().asScala)" contains a GenericInternalRow with a lot of data that should not go to the driver. The path would be in my case: taskRunner.task.metrics.externalAccums[2]._list[0]. This data is similar (if not the same) to the data I see when I do a driver heap dump. I guess that if the org.apache.spark.sql.execution.ui.SQLTaskMetrics leak is fixed I would have less of this undesirable data in the driver and I could run my streaming app for a long period of time, but I think there will be always some performance lost. > Memory leak org.apache.spark.sql.execution.ui.SQLTaskMetrics > - > > Key: SPARK-17381 > URL: https://issues.apache.org/jira/browse/SPARK-17381 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 > Environment: EMR 5.0.0 (submitted as yarn-client) > Java Version 1.8.0_101 (Oracle Corporation) > Scala Version version 2.11.8 > Problem also happens when I run locally with similar versions of java/scala. > OS: Ubuntu