Hi All, I am facing the below exception when the size of the file being read in a partition is above 2GB. This is apparently because Java's limitation on memory mapped files. It supports mapping only 2GB files.
Caused by: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1207) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:102) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) My use case is to read the files from S3 and do some processing. I am caching the data like below in order to avoid SocketTimeoutExceptions from another library I am using for the processing. val rdd1 = sc.textFile("*******").coalesce(1000) rdd1.persist(DISK_ONLY_2) // replication factor 2 rdd1.foreachPartition { iter => } // one pass over the data to download The 3rd line fails with the above error when a partition contains a file of size more than 2GB file. Do you think this needs to be fixed in Spark? One idea may be is to use a wrapper class (something called BigByteBuffer) which keeps an array of ByteBuffers and keeps the index of the current buffer being read etc. Below is the modified DiskStore.scala. private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = { val channel = new RandomAccessFile(file, "r").getChannel Utils.tryWithSafeFinally { // For small files, directly read rather than memory map if (length < minMemoryMapBytes) { // Map small file in Memory } else { // TODO Create a BigByteBuffer } } { channel.close() } } class BigByteBuffer extends ByteBuffer { val buffers: Array[ByteBuffer] var currentIndex = 0 ... // Other methods } Please let me know if there is any other work-around for the same. Thanks for your time. Regards, Jegan