Thanks for your suggestion Ted. Unfortunately at this point of time I cannot go beyond 1000 partitions. I am writing this data to BigQuery and it has a limit of 1000 jobs per day for a table(they have some limits on this) I currently create 1 load job per partition. Is there any other work-around?
Thanks again. Regards, Jegan On Mon, Oct 5, 2015 at 3:53 PM, Ted Yu <yuzhih...@gmail.com> wrote: > As a workaround, can you set the number of partitions higher in the > sc.textFile method ? > > Cheers > > On Mon, Oct 5, 2015 at 3:31 PM, Jegan <jega...@gmail.com> wrote: > >> Hi All, >> >> I am facing the below exception when the size of the file being read in a >> partition is above 2GB. This is apparently because Java's limitation on >> memory mapped files. It supports mapping only 2GB files. >> >> Caused by: java.lang.IllegalArgumentException: Size exceeds >> Integer.MAX_VALUE >> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) >> at >> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) >> at >> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) >> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1207) >> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) >> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) >> at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:102) >> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791) >> at >> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) >> at >> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153) >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >> at org.apache.spark.scheduler.Task.run(Task.scala:88) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> My use case is to read the files from S3 and do some processing. I am >> caching the data like below in order to avoid SocketTimeoutExceptions from >> another library I am using for the processing. >> >> val rdd1 = sc.textFile("*******").coalesce(1000) >> rdd1.persist(DISK_ONLY_2) // replication factor 2 >> rdd1.foreachPartition { iter => } // one pass over the data to download >> >> The 3rd line fails with the above error when a partition contains a file >> of size more than 2GB file. >> >> Do you think this needs to be fixed in Spark? One idea may be is to use a >> wrapper class (something called BigByteBuffer) which keeps an array of >> ByteBuffers and keeps the index of the current buffer being read etc. Below >> is the modified DiskStore.scala. >> >> private def getBytes(file: File, offset: Long, length: Long): >> Option[ByteBuffer] = { >> val channel = new RandomAccessFile(file, "r").getChannel >> Utils.tryWithSafeFinally { >> // For small files, directly read rather than memory map >> if (length < minMemoryMapBytes) { >> // Map small file in Memory >> } else { >> // TODO Create a BigByteBuffer >> >> } >> } { >> channel.close() >> } >> } >> >> class BigByteBuffer extends ByteBuffer { >> val buffers: Array[ByteBuffer] >> var currentIndex = 0 >> >> ... // Other methods >> } >> >> Please let me know if there is any other work-around for the same. Thanks >> for your time. >> >> Regards, >> Jegan >> > >