Re: IllegalArgumentException: Size exceeds Integer.MAX_VALUE
I meant to say just copy everything to a local hdfs, and then don't use caching ... On Mon, Oct 5, 2015 at 4:52 PM, Jegan wrote: > I am sorry, I didn't understand it completely. Are you suggesting to copy > the files from S3 to HDFS? Actually, that is what I am doing. I am reading > the files using Spark and persisting it locally. > > Or did you actually mean to ask the producer to write the files directly > to HDFS instead of S3? I am not sure I can do this now either. > > Please clarify me if I misunderstood what you meant. > > Thanks, > Jegan > > On Mon, Oct 5, 2015 at 4:42 PM, Reynold Xin wrote: > >> You can write the data to local hdfs (or local disk) and just load it >> from there. >> >> >> On Mon, Oct 5, 2015 at 4:37 PM, Jegan wrote: >> >>> Thanks for your suggestion Ted. >>> >>> Unfortunately at this point of time I cannot go beyond 1000 partitions. >>> I am writing this data to BigQuery and it has a limit of 1000 jobs per day >>> for a table(they have some limits on this) I currently create 1 load job >>> per partition. Is there any other work-around? >>> >>> Thanks again. >>> >>> Regards, >>> Jegan >>> >>> On Mon, Oct 5, 2015 at 3:53 PM, Ted Yu wrote: >>> As a workaround, can you set the number of partitions higher in the sc.textFile method ? Cheers On Mon, Oct 5, 2015 at 3:31 PM, Jegan wrote: > Hi All, > > I am facing the below exception when the size of the file being read > in a partition is above 2GB. This is apparently because Java's limitation > on memory mapped files. It supports mapping only 2GB files. > > Caused by: java.lang.IllegalArgumentException: Size exceeds > Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) > at > org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1207) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) > at > org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:102) > at > org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791) > at > org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) > at > org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153) > at > org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > My use case is to read the files from S3 and do some processing. I am > caching the data like below in order to avoid SocketTimeoutExceptions from > another library I am using for the processing. > > val rdd1 = sc.textFile("***").coalesce(1000) > rdd1.persist(DISK_ONLY_2) // replication factor 2 > rdd1.foreachPartition { iter => } // one pass over the data to download > > The 3rd line fails with the above error when a partition contains a > file of size more than 2GB file. > > Do you think this needs to be fixed in Spark? One idea may be is to > use a wrapper class (something called BigByteBuffer) which keeps an array > of ByteBuffers and keeps the index of the current buffer being read etc. > Below is the modified DiskStore.scala. > > private def getBytes(file: File, offset: Long, length: Long): > Option[ByteBuffer] = { > val channel = new RandomAccessFile(file, "r").getChannel > Utils.tryWithSafeFinally { > // For small files, directly read rather than memory map > if (length < minMemoryMapBytes) { > // Map small file in Memory > } else { > // TODO Create a BigByteBuffer > > } > } { > channel.close() > } > } > > class Bi
Re: IllegalArgumentException: Size exceeds Integer.MAX_VALUE
I am sorry, I didn't understand it completely. Are you suggesting to copy the files from S3 to HDFS? Actually, that is what I am doing. I am reading the files using Spark and persisting it locally. Or did you actually mean to ask the producer to write the files directly to HDFS instead of S3? I am not sure I can do this now either. Please clarify me if I misunderstood what you meant. Thanks, Jegan On Mon, Oct 5, 2015 at 4:42 PM, Reynold Xin wrote: > You can write the data to local hdfs (or local disk) and just load it from > there. > > > On Mon, Oct 5, 2015 at 4:37 PM, Jegan wrote: > >> Thanks for your suggestion Ted. >> >> Unfortunately at this point of time I cannot go beyond 1000 partitions. I >> am writing this data to BigQuery and it has a limit of 1000 jobs per day >> for a table(they have some limits on this) I currently create 1 load job >> per partition. Is there any other work-around? >> >> Thanks again. >> >> Regards, >> Jegan >> >> On Mon, Oct 5, 2015 at 3:53 PM, Ted Yu wrote: >> >>> As a workaround, can you set the number of partitions higher in the >>> sc.textFile method ? >>> >>> Cheers >>> >>> On Mon, Oct 5, 2015 at 3:31 PM, Jegan wrote: >>> Hi All, I am facing the below exception when the size of the file being read in a partition is above 2GB. This is apparently because Java's limitation on memory mapped files. It supports mapping only 2GB files. Caused by: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1207) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:102) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) My use case is to read the files from S3 and do some processing. I am caching the data like below in order to avoid SocketTimeoutExceptions from another library I am using for the processing. val rdd1 = sc.textFile("***").coalesce(1000) rdd1.persist(DISK_ONLY_2) // replication factor 2 rdd1.foreachPartition { iter => } // one pass over the data to download The 3rd line fails with the above error when a partition contains a file of size more than 2GB file. Do you think this needs to be fixed in Spark? One idea may be is to use a wrapper class (something called BigByteBuffer) which keeps an array of ByteBuffers and keeps the index of the current buffer being read etc. Below is the modified DiskStore.scala. private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = { val channel = new RandomAccessFile(file, "r").getChannel Utils.tryWithSafeFinally { // For small files, directly read rather than memory map if (length < minMemoryMapBytes) { // Map small file in Memory } else { // TODO Create a BigByteBuffer } } { channel.close() } } class BigByteBuffer extends ByteBuffer { val buffers: Array[ByteBuffer] var currentIndex = 0 ... // Other methods } Please let me know if there is any other work-around for the same. Thanks for your time. Regards, Jegan
Re: IllegalArgumentException: Size exceeds Integer.MAX_VALUE
You can write the data to local hdfs (or local disk) and just load it from there. On Mon, Oct 5, 2015 at 4:37 PM, Jegan wrote: > Thanks for your suggestion Ted. > > Unfortunately at this point of time I cannot go beyond 1000 partitions. I > am writing this data to BigQuery and it has a limit of 1000 jobs per day > for a table(they have some limits on this) I currently create 1 load job > per partition. Is there any other work-around? > > Thanks again. > > Regards, > Jegan > > On Mon, Oct 5, 2015 at 3:53 PM, Ted Yu wrote: > >> As a workaround, can you set the number of partitions higher in the >> sc.textFile method ? >> >> Cheers >> >> On Mon, Oct 5, 2015 at 3:31 PM, Jegan wrote: >> >>> Hi All, >>> >>> I am facing the below exception when the size of the file being read in >>> a partition is above 2GB. This is apparently because Java's limitation on >>> memory mapped files. It supports mapping only 2GB files. >>> >>> Caused by: java.lang.IllegalArgumentException: Size exceeds >>> Integer.MAX_VALUE >>> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) >>> at >>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) >>> at >>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) >>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1207) >>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) >>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) >>> at >>> org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:102) >>> at >>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791) >>> at >>> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) >>> at >>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153) >>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>> at org.apache.spark.scheduler.Task.run(Task.scala:88) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> My use case is to read the files from S3 and do some processing. I am >>> caching the data like below in order to avoid SocketTimeoutExceptions from >>> another library I am using for the processing. >>> >>> val rdd1 = sc.textFile("***").coalesce(1000) >>> rdd1.persist(DISK_ONLY_2) // replication factor 2 >>> rdd1.foreachPartition { iter => } // one pass over the data to download >>> >>> The 3rd line fails with the above error when a partition contains a file >>> of size more than 2GB file. >>> >>> Do you think this needs to be fixed in Spark? One idea may be is to use >>> a wrapper class (something called BigByteBuffer) which keeps an array of >>> ByteBuffers and keeps the index of the current buffer being read etc. Below >>> is the modified DiskStore.scala. >>> >>> private def getBytes(file: File, offset: Long, length: Long): >>> Option[ByteBuffer] = { >>> val channel = new RandomAccessFile(file, "r").getChannel >>> Utils.tryWithSafeFinally { >>> // For small files, directly read rather than memory map >>> if (length < minMemoryMapBytes) { >>> // Map small file in Memory >>> } else { >>> // TODO Create a BigByteBuffer >>> >>> } >>> } { >>> channel.close() >>> } >>> } >>> >>> class BigByteBuffer extends ByteBuffer { >>> val buffers: Array[ByteBuffer] >>> var currentIndex = 0 >>> >>> ... // Other methods >>> } >>> >>> Please let me know if there is any other work-around for the same. Thanks >>> for your time. >>> >>> Regards, >>> Jegan >>> >> >> >
Re: IllegalArgumentException: Size exceeds Integer.MAX_VALUE
Thanks for your suggestion Ted. Unfortunately at this point of time I cannot go beyond 1000 partitions. I am writing this data to BigQuery and it has a limit of 1000 jobs per day for a table(they have some limits on this) I currently create 1 load job per partition. Is there any other work-around? Thanks again. Regards, Jegan On Mon, Oct 5, 2015 at 3:53 PM, Ted Yu wrote: > As a workaround, can you set the number of partitions higher in the > sc.textFile method ? > > Cheers > > On Mon, Oct 5, 2015 at 3:31 PM, Jegan wrote: > >> Hi All, >> >> I am facing the below exception when the size of the file being read in a >> partition is above 2GB. This is apparently because Java's limitation on >> memory mapped files. It supports mapping only 2GB files. >> >> Caused by: java.lang.IllegalArgumentException: Size exceeds >> Integer.MAX_VALUE >> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) >> at >> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) >> at >> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) >> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1207) >> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) >> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) >> at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:102) >> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791) >> at >> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) >> at >> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153) >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >> at org.apache.spark.scheduler.Task.run(Task.scala:88) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> My use case is to read the files from S3 and do some processing. I am >> caching the data like below in order to avoid SocketTimeoutExceptions from >> another library I am using for the processing. >> >> val rdd1 = sc.textFile("***").coalesce(1000) >> rdd1.persist(DISK_ONLY_2) // replication factor 2 >> rdd1.foreachPartition { iter => } // one pass over the data to download >> >> The 3rd line fails with the above error when a partition contains a file >> of size more than 2GB file. >> >> Do you think this needs to be fixed in Spark? One idea may be is to use a >> wrapper class (something called BigByteBuffer) which keeps an array of >> ByteBuffers and keeps the index of the current buffer being read etc. Below >> is the modified DiskStore.scala. >> >> private def getBytes(file: File, offset: Long, length: Long): >> Option[ByteBuffer] = { >> val channel = new RandomAccessFile(file, "r").getChannel >> Utils.tryWithSafeFinally { >> // For small files, directly read rather than memory map >> if (length < minMemoryMapBytes) { >> // Map small file in Memory >> } else { >> // TODO Create a BigByteBuffer >> >> } >> } { >> channel.close() >> } >> } >> >> class BigByteBuffer extends ByteBuffer { >> val buffers: Array[ByteBuffer] >> var currentIndex = 0 >> >> ... // Other methods >> } >> >> Please let me know if there is any other work-around for the same. Thanks >> for your time. >> >> Regards, >> Jegan >> > >
Re: IllegalArgumentException: Size exceeds Integer.MAX_VALUE
As a workaround, can you set the number of partitions higher in the sc.textFile method ? Cheers On Mon, Oct 5, 2015 at 3:31 PM, Jegan wrote: > Hi All, > > I am facing the below exception when the size of the file being read in a > partition is above 2GB. This is apparently because Java's limitation on > memory mapped files. It supports mapping only 2GB files. > > Caused by: java.lang.IllegalArgumentException: Size exceeds > Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) > at > org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1207) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) > at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:102) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791) > at > org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) > at > org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > My use case is to read the files from S3 and do some processing. I am > caching the data like below in order to avoid SocketTimeoutExceptions from > another library I am using for the processing. > > val rdd1 = sc.textFile("***").coalesce(1000) > rdd1.persist(DISK_ONLY_2) // replication factor 2 > rdd1.foreachPartition { iter => } // one pass over the data to download > > The 3rd line fails with the above error when a partition contains a file > of size more than 2GB file. > > Do you think this needs to be fixed in Spark? One idea may be is to use a > wrapper class (something called BigByteBuffer) which keeps an array of > ByteBuffers and keeps the index of the current buffer being read etc. Below > is the modified DiskStore.scala. > > private def getBytes(file: File, offset: Long, length: Long): > Option[ByteBuffer] = { > val channel = new RandomAccessFile(file, "r").getChannel > Utils.tryWithSafeFinally { > // For small files, directly read rather than memory map > if (length < minMemoryMapBytes) { > // Map small file in Memory > } else { > // TODO Create a BigByteBuffer > > } > } { > channel.close() > } > } > > class BigByteBuffer extends ByteBuffer { > val buffers: Array[ByteBuffer] > var currentIndex = 0 > > ... // Other methods > } > > Please let me know if there is any other work-around for the same. Thanks for > your time. > > Regards, > Jegan >
IllegalArgumentException: Size exceeds Integer.MAX_VALUE
Hi All, I am facing the below exception when the size of the file being read in a partition is above 2GB. This is apparently because Java's limitation on memory mapped files. It supports mapping only 2GB files. Caused by: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1207) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:102) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) My use case is to read the files from S3 and do some processing. I am caching the data like below in order to avoid SocketTimeoutExceptions from another library I am using for the processing. val rdd1 = sc.textFile("***").coalesce(1000) rdd1.persist(DISK_ONLY_2) // replication factor 2 rdd1.foreachPartition { iter => } // one pass over the data to download The 3rd line fails with the above error when a partition contains a file of size more than 2GB file. Do you think this needs to be fixed in Spark? One idea may be is to use a wrapper class (something called BigByteBuffer) which keeps an array of ByteBuffers and keeps the index of the current buffer being read etc. Below is the modified DiskStore.scala. private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = { val channel = new RandomAccessFile(file, "r").getChannel Utils.tryWithSafeFinally { // For small files, directly read rather than memory map if (length < minMemoryMapBytes) { // Map small file in Memory } else { // TODO Create a BigByteBuffer } } { channel.close() } } class BigByteBuffer extends ByteBuffer { val buffers: Array[ByteBuffer] var currentIndex = 0 ... // Other methods } Please let me know if there is any other work-around for the same. Thanks for your time. Regards, Jegan