As a workaround, can you set the number of partitions higher in the sc.textFile method ?
Cheers On Mon, Oct 5, 2015 at 3:31 PM, Jegan <jega...@gmail.com> wrote: > Hi All, > > I am facing the below exception when the size of the file being read in a > partition is above 2GB. This is apparently because Java's limitation on > memory mapped files. It supports mapping only 2GB files. > > Caused by: java.lang.IllegalArgumentException: Size exceeds > Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1207) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) > at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:102) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791) > at > org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) > at > org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > My use case is to read the files from S3 and do some processing. I am > caching the data like below in order to avoid SocketTimeoutExceptions from > another library I am using for the processing. > > val rdd1 = sc.textFile("*******").coalesce(1000) > rdd1.persist(DISK_ONLY_2) // replication factor 2 > rdd1.foreachPartition { iter => } // one pass over the data to download > > The 3rd line fails with the above error when a partition contains a file > of size more than 2GB file. > > Do you think this needs to be fixed in Spark? One idea may be is to use a > wrapper class (something called BigByteBuffer) which keeps an array of > ByteBuffers and keeps the index of the current buffer being read etc. Below > is the modified DiskStore.scala. > > private def getBytes(file: File, offset: Long, length: Long): > Option[ByteBuffer] = { > val channel = new RandomAccessFile(file, "r").getChannel > Utils.tryWithSafeFinally { > // For small files, directly read rather than memory map > if (length < minMemoryMapBytes) { > // Map small file in Memory > } else { > // TODO Create a BigByteBuffer > > } > } { > channel.close() > } > } > > class BigByteBuffer extends ByteBuffer { > val buffers: Array[ByteBuffer] > var currentIndex = 0 > > ... // Other methods > } > > Please let me know if there is any other work-around for the same. Thanks for > your time. > > Regards, > Jegan >