You can write the data to local hdfs (or local disk) and just load it from there.
On Mon, Oct 5, 2015 at 4:37 PM, Jegan <jega...@gmail.com> wrote: > Thanks for your suggestion Ted. > > Unfortunately at this point of time I cannot go beyond 1000 partitions. I > am writing this data to BigQuery and it has a limit of 1000 jobs per day > for a table(they have some limits on this) I currently create 1 load job > per partition. Is there any other work-around? > > Thanks again. > > Regards, > Jegan > > On Mon, Oct 5, 2015 at 3:53 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> As a workaround, can you set the number of partitions higher in the >> sc.textFile method ? >> >> Cheers >> >> On Mon, Oct 5, 2015 at 3:31 PM, Jegan <jega...@gmail.com> wrote: >> >>> Hi All, >>> >>> I am facing the below exception when the size of the file being read in >>> a partition is above 2GB. This is apparently because Java's limitation on >>> memory mapped files. It supports mapping only 2GB files. >>> >>> Caused by: java.lang.IllegalArgumentException: Size exceeds >>> Integer.MAX_VALUE >>> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) >>> at >>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) >>> at >>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) >>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1207) >>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) >>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) >>> at >>> org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:102) >>> at >>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791) >>> at >>> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) >>> at >>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153) >>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>> at org.apache.spark.scheduler.Task.run(Task.scala:88) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> My use case is to read the files from S3 and do some processing. I am >>> caching the data like below in order to avoid SocketTimeoutExceptions from >>> another library I am using for the processing. >>> >>> val rdd1 = sc.textFile("*******").coalesce(1000) >>> rdd1.persist(DISK_ONLY_2) // replication factor 2 >>> rdd1.foreachPartition { iter => } // one pass over the data to download >>> >>> The 3rd line fails with the above error when a partition contains a file >>> of size more than 2GB file. >>> >>> Do you think this needs to be fixed in Spark? One idea may be is to use >>> a wrapper class (something called BigByteBuffer) which keeps an array of >>> ByteBuffers and keeps the index of the current buffer being read etc. Below >>> is the modified DiskStore.scala. >>> >>> private def getBytes(file: File, offset: Long, length: Long): >>> Option[ByteBuffer] = { >>> val channel = new RandomAccessFile(file, "r").getChannel >>> Utils.tryWithSafeFinally { >>> // For small files, directly read rather than memory map >>> if (length < minMemoryMapBytes) { >>> // Map small file in Memory >>> } else { >>> // TODO Create a BigByteBuffer >>> >>> } >>> } { >>> channel.close() >>> } >>> } >>> >>> class BigByteBuffer extends ByteBuffer { >>> val buffers: Array[ByteBuffer] >>> var currentIndex = 0 >>> >>> ... // Other methods >>> } >>> >>> Please let me know if there is any other work-around for the same. Thanks >>> for your time. >>> >>> Regards, >>> Jegan >>> >> >> >