Hi, The error doesn't occur during saveAsTextFile but rather during the groupByKey as far as I can tell. We strongly urge users to not use groupByKey if they don't have to. What I would suggest is the following work-around: sc.textFile(baseFile)).map { line => val fields = line.split("\t") (fields(11), fields(6)) // extract (month, user_id) }.distinct().countByKey()
instead Best, Burak ----- Original Message ----- From: "SK" <skrishna...@gmail.com> To: u...@spark.incubator.apache.org Sent: Tuesday, August 26, 2014 12:38:00 PM Subject: OutofMemoryError when generating output Hi, I have the following piece of code that I am running on a cluster with 10 nodes with 2GB memory per node. The tasks seem to complete, but at the point where it is generating output (saveAsTextFile), the program freezes after some time and reports an out of memory error (error transcript attached below). I also tried using collect() and printing the output to console instead of a file, but got the same error. The program reads some logs for a month and extracts the number of unique users during the month. The reduced output is not very large, so not sure why the memory error occurs. I would appreciate any help in fixing this memory error to get the output. Thanks. def main (args: Array[String]) { val conf = new SparkConf().setAppName("App") val sc = new SparkContext(conf) // get the number of users per month val user_time = sc.union(sc.textFile(baseFile)) .map(line => { val fields = line.split("\t") (fields(11), fields(6)) }) // extract (month, user_id) .groupByKey // group by month as the key .map(g=> (g._1, g._2.toSet.size)) // get the unique id count per month // .collect() // user_time.foreach(f => println(f)) user_time.map(f => "%s, %s".format(f._1, f._2)).saveAsTextFile("app_output") sc.stop() } 14/08/26 15:21:15 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:121) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:107) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:106) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/OutofMemoryError-when-generating-output-tp12847.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org