[ https://issues.apache.org/jira/browse/SPARK-1001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-1001. ------------------------------ Resolution: Cannot Reproduce > Memory leak when reading sequence file and then sorting > ------------------------------------------------------- > > Key: SPARK-1001 > URL: https://issues.apache.org/jira/browse/SPARK-1001 > Project: Spark > Issue Type: Bug > Components: Shuffle, Spark Core > Affects Versions: 0.8.0 > Reporter: Matthew Cheah > Labels: Hadoop, Memory > > Spark appears to build up a backlog of unreachable byte arrays when an RDD is > constructed from a sequence file, and then that RDD is sorted. > I have a class that wraps a Java ArrayList, that can be serialized and > written to a Hadoop SequenceFile (I.e. Implements the Writable interface). > Let's call it WritableDataRow. It can take a Java List as its argument to > wrap around, and also has a copy constructor. > Setup: 10 slaves, launched via EC2, 65.9GB RAM each, dataset is 100GB of > text, 120GB when in sequence file format (not using compression to compact > the bytes). CDH4.2.0-backed hadoop cluster. > First, building the RDD from a CSV and then sorting on index 1 works fine: > {code} > scala> import scala.collection.JavaConversions._ // Other imports here as well > import scala.collection.JavaConversions._ > scala> val rddAsTextFile = sc.textFile("s3n://some-bucket/events-*.csv") > rddAsTextFile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at > <console>:14 > scala> val rddAsWritableDataRows = rddAsTextFile.map(x => new > WritableDataRow(x.split("\\|").toList)) > rddAsWritableDataRows: > org.apache.spark.rdd.RDD[com.palantir.finance.datatable.server.spark.WritableDataRow] > = MappedRDD[2] at map at <console>:19 > scala> val rddAsKeyedWritableDataRows = rddAsWritableDataRows.map(x => > (x.getContents().get(1).toString(), x)); > rddAsKeyedWritableDataRows: org.apache.spark.rdd.RDD[(String, > com.palantir.finance.datatable.server.spark.WritableDataRow)] = MappedRDD[4] > at map at <console>:22 > scala> val orderedFunct = new > org.apache.spark.rdd.OrderedRDDFunctions[String, WritableDataRow, (String, > WritableDataRow)](rddAsKeyedWritableDataRows) > orderedFunct: > org.apache.spark.rdd.OrderedRDDFunctions[String,com.palantir.finance.datatable.server.spark.WritableDataRow,(String, > com.palantir.finance.datatable.server.spark.WritableDataRow)] = > org.apache.spark.rdd.OrderedRDDFunctions@587acb54 > scala> orderedFunct.sortByKey(true).count(); // Actually triggers the > computation, as stated in a different e-mail thread > res0: org.apache.spark.rdd.RDD[(String, > com.palantir.finance.datatable.server.spark.WritableDataRow)] = > MapPartitionsRDD[8] at sortByKey at <console>:27 > {code} > The above works without too many surprises. I then save it as a Sequence File > (using JavaPairRDD as a way to more easily call saveAsHadoopFile(), and this > is how it's done in our Java-based application): > {code} > scala> val pairRDD = new JavaPairRDD(rddAsWritableDataRows.map(x => > (NullWritable.get(), x))); > pairRDD: > org.apache.spark.api.java.JavaPairRDD[org.apache.hadoop.io.NullWritable,com.palantir.finance.datatable.server.spark.WritableDataRow] > = org.apache.spark.api.java.JavaPairRDD@8d2e9d9 > scala> pairRDD.saveAsHadoopFile("hdfs://<hdfs-master-url>:9010/blah", > classOf[NullWritable], classOf[WritableDataRow], > classOf[org.apache.hadoop.mapred.SequenceFileOutputFormat[NullWritable, > WritableDataRow]]); > … > 2013-12-11 20:09:14,444 [main] INFO org.apache.spark.SparkContext - Job > finished: saveAsHadoopFile at <console>:26, took 1052.116712748 s > {code} > And now I want to get the RDD from the sequence file and sort THAT, and this > is when I monitor Ganglia and "ps aux" and notice the memory usage climbing > ridiculously: > {code} > scala> val rddAsSequenceFile = > sc.sequenceFile("hdfs://<hdfs-master-url>:9010/blah", classOf[NullWritable], > classOf[WritableDataRow]).map(x => new WritableDataRow(x._2)); // Invokes > copy constructor to get around re-use of writable objects > rddAsSequenceFile: > org.apache.spark.rdd.RDD[com.palantir.finance.datatable.server.spark.WritableDataRow] > = MappedRDD[19] at map at <console>:19 > scala> val orderedFunct = new > org.apache.spark.rdd.OrderedRDDFunctions[String, WritableDataRow, (String, > WritableDataRow)](rddAsSequenceFile.map(x => > (x.getContents().get(1).toString(), x))) > orderedFunct: > org.apache.spark.rdd.OrderedRDDFunctions[String,com.palantir.finance.datatable.server.spark.WritableDataRow,(String, > com.palantir.finance.datatable.server.spark.WritableDataRow)] = > org.apache.spark.rdd.OrderedRDDFunctions@6262a9a6 > scala>orderedFunct.sortByKey().count(); > {code} > (On the necessity to copy writables from hadoop RDDs, see: > https://mail-archives.apache.org/mod_mbox/spark-user/201308.mbox/%3ccaf_kkpzrq4otyqvwcoc6plaz9x9_sfo33u4ysatki5ptqoy...@mail.gmail.com%3E > ) > I got a memory dump from one worker node but can't share it. I've attached a > screenshot from YourKit. At the point where around 5GB of RAM is being used > on the worker, 3GB of unreachable byte arrays have accumulated. Furthermore, > they're all exactly the same size, and seem to be the same size as most of > the byte arrays that are strong reachable. The strong reachable byte arrays > are referenced from output streams in the block output writers. > Let me know if you require any more information. Thanks. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org