[ 
https://issues.apache.org/jira/browse/SPARK-1001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-1001.
------------------------------
    Resolution: Cannot Reproduce

> Memory leak when reading sequence file and then sorting
> -------------------------------------------------------
>
>                 Key: SPARK-1001
>                 URL: https://issues.apache.org/jira/browse/SPARK-1001
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle, Spark Core
>    Affects Versions: 0.8.0
>            Reporter: Matthew Cheah
>              Labels: Hadoop, Memory
>
> Spark appears to build up a backlog of unreachable byte arrays when an RDD is 
> constructed from a sequence file, and then that RDD is sorted.
> I have a class that wraps a Java ArrayList, that can be serialized and 
> written to a Hadoop SequenceFile (I.e. Implements the Writable interface). 
> Let's call it WritableDataRow. It can take a Java List as its argument to 
> wrap around, and also has a copy constructor.
> Setup: 10 slaves, launched via EC2, 65.9GB RAM each, dataset is 100GB of 
> text, 120GB when in sequence file format (not using compression to compact 
> the bytes). CDH4.2.0-backed hadoop cluster.
> First, building the RDD from a CSV and then sorting on index 1 works fine:
> {code}
> scala> import scala.collection.JavaConversions._ // Other imports here as well
> import scala.collection.JavaConversions._
> scala> val rddAsTextFile = sc.textFile("s3n://some-bucket/events-*.csv")
> rddAsTextFile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at 
> <console>:14
> scala> val rddAsWritableDataRows = rddAsTextFile.map(x => new 
> WritableDataRow(x.split("\\|").toList))
> rddAsWritableDataRows: 
> org.apache.spark.rdd.RDD[com.palantir.finance.datatable.server.spark.WritableDataRow]
>  = MappedRDD[2] at map at <console>:19
> scala> val rddAsKeyedWritableDataRows = rddAsWritableDataRows.map(x => 
> (x.getContents().get(1).toString(), x));
> rddAsKeyedWritableDataRows: org.apache.spark.rdd.RDD[(String, 
> com.palantir.finance.datatable.server.spark.WritableDataRow)] = MappedRDD[4] 
> at map at <console>:22
> scala> val orderedFunct = new 
> org.apache.spark.rdd.OrderedRDDFunctions[String, WritableDataRow, (String, 
> WritableDataRow)](rddAsKeyedWritableDataRows)
> orderedFunct: 
> org.apache.spark.rdd.OrderedRDDFunctions[String,com.palantir.finance.datatable.server.spark.WritableDataRow,(String,
>  com.palantir.finance.datatable.server.spark.WritableDataRow)] = 
> org.apache.spark.rdd.OrderedRDDFunctions@587acb54
> scala> orderedFunct.sortByKey(true).count(); // Actually triggers the 
> computation, as stated in a different e-mail thread
> res0: org.apache.spark.rdd.RDD[(String, 
> com.palantir.finance.datatable.server.spark.WritableDataRow)] = 
> MapPartitionsRDD[8] at sortByKey at <console>:27
> {code}
> The above works without too many surprises. I then save it as a Sequence File 
> (using JavaPairRDD as a way to more easily call saveAsHadoopFile(), and this 
> is how it's done in our Java-based application):
> {code}
> scala> val pairRDD = new JavaPairRDD(rddAsWritableDataRows.map(x => 
> (NullWritable.get(), x)));
> pairRDD: 
> org.apache.spark.api.java.JavaPairRDD[org.apache.hadoop.io.NullWritable,com.palantir.finance.datatable.server.spark.WritableDataRow]
>  = org.apache.spark.api.java.JavaPairRDD@8d2e9d9
> scala> pairRDD.saveAsHadoopFile("hdfs://<hdfs-master-url>:9010/blah", 
> classOf[NullWritable], classOf[WritableDataRow], 
> classOf[org.apache.hadoop.mapred.SequenceFileOutputFormat[NullWritable, 
> WritableDataRow]]);
> …
> 2013-12-11 20:09:14,444 [main] INFO  org.apache.spark.SparkContext - Job 
> finished: saveAsHadoopFile at <console>:26, took 1052.116712748 s
> {code}
> And now I want to get the RDD from the sequence file and sort THAT, and this 
> is when I monitor Ganglia and "ps aux" and notice the memory usage climbing 
> ridiculously:
> {code}
> scala> val rddAsSequenceFile = 
> sc.sequenceFile("hdfs://<hdfs-master-url>:9010/blah", classOf[NullWritable], 
> classOf[WritableDataRow]).map(x => new WritableDataRow(x._2)); // Invokes 
> copy constructor to get around re-use of writable objects
> rddAsSequenceFile: 
> org.apache.spark.rdd.RDD[com.palantir.finance.datatable.server.spark.WritableDataRow]
>  = MappedRDD[19] at map at <console>:19
> scala> val orderedFunct = new 
> org.apache.spark.rdd.OrderedRDDFunctions[String, WritableDataRow, (String, 
> WritableDataRow)](rddAsSequenceFile.map(x => 
> (x.getContents().get(1).toString(), x)))
> orderedFunct: 
> org.apache.spark.rdd.OrderedRDDFunctions[String,com.palantir.finance.datatable.server.spark.WritableDataRow,(String,
>  com.palantir.finance.datatable.server.spark.WritableDataRow)] = 
> org.apache.spark.rdd.OrderedRDDFunctions@6262a9a6
> scala>orderedFunct.sortByKey().count();
> {code}
> (On the necessity to copy writables from hadoop RDDs, see: 
> https://mail-archives.apache.org/mod_mbox/spark-user/201308.mbox/%3ccaf_kkpzrq4otyqvwcoc6plaz9x9_sfo33u4ysatki5ptqoy...@mail.gmail.com%3E
>  )
> I got a memory dump from one worker node but can't share it. I've attached a 
> screenshot from YourKit. At the point where around 5GB of RAM is being used 
> on the worker, 3GB of unreachable byte arrays have accumulated. Furthermore, 
> they're all exactly the same size, and seem to be the same size as most of 
> the byte arrays that are strong reachable. The strong reachable byte arrays 
> are referenced from output streams in the block output writers.
> Let me know if you require any more information. Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to