Hi folks, I have a strange issue. Trying to read a 7G file and do failry
simple stuff with it:

I can read the file/do simple operations on it. However, I'd prefer to
increase the number of partitions in preparation for more memory-intensive
operations (I'm happy to wait, I just need the job to complete).
Repartition seems to cause an OOM for me?
Could someone shed light/or speculate/ why this would happen -- I thought
we repartition higher to relieve memory pressure?

Im using Spark1.4.1 CDH4 if that makes a difference

This works

val res2 = sqlContext.parquetFile(lst:_*).where($"customer_id"===lit(254))
res2.count
res1: Long = 77885925

scala> res2.explain
== Physical Plan ==
Filter (customer_id#314 = 254)
 PhysicalRDD [....4], MapPartitionsRDD[11] at

scala> res2.rdd.partitions.size
res3: Int = 59

​


This doesnt:

scala> res2.repartition(60).count
[Stage 2:>                                                        (1 +
45) / 59]15/09/05 10:17:21 WARN TaskSetManager: Lost task 2.0 in stage
2.0 (TID 62, fqdn): java.lang.OutOfMemoryError: Java heap space
        at 
parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:729)
        at 
parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:490)
        at 
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:116)
        at 
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
        at 
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
        at 
org.apache.spark.sql.sources.SqlNewHadoopRDD$anon$1.hasNext(SqlNewHadoopRDD.scala:163)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)
        at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
        at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:207)
        at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

​

Reply via email to