I am sorry, I didn't understand it completely. Are you suggesting to copy the files from S3 to HDFS? Actually, that is what I am doing. I am reading the files using Spark and persisting it locally.
Or did you actually mean to ask the producer to write the files directly to HDFS instead of S3? I am not sure I can do this now either. Please clarify me if I misunderstood what you meant. Thanks, Jegan On Mon, Oct 5, 2015 at 4:42 PM, Reynold Xin <r...@databricks.com> wrote: > You can write the data to local hdfs (or local disk) and just load it from > there. > > > On Mon, Oct 5, 2015 at 4:37 PM, Jegan <jega...@gmail.com> wrote: > >> Thanks for your suggestion Ted. >> >> Unfortunately at this point of time I cannot go beyond 1000 partitions. I >> am writing this data to BigQuery and it has a limit of 1000 jobs per day >> for a table(they have some limits on this) I currently create 1 load job >> per partition. Is there any other work-around? >> >> Thanks again. >> >> Regards, >> Jegan >> >> On Mon, Oct 5, 2015 at 3:53 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> As a workaround, can you set the number of partitions higher in the >>> sc.textFile method ? >>> >>> Cheers >>> >>> On Mon, Oct 5, 2015 at 3:31 PM, Jegan <jega...@gmail.com> wrote: >>> >>>> Hi All, >>>> >>>> I am facing the below exception when the size of the file being read in >>>> a partition is above 2GB. This is apparently because Java's limitation on >>>> memory mapped files. It supports mapping only 2GB files. >>>> >>>> Caused by: java.lang.IllegalArgumentException: Size exceeds >>>> Integer.MAX_VALUE >>>> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) >>>> at >>>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) >>>> at >>>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) >>>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1207) >>>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) >>>> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) >>>> at >>>> org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:102) >>>> at >>>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791) >>>> at >>>> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638) >>>> at >>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153) >>>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>> at >>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>> at >>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:88) >>>> at >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> My use case is to read the files from S3 and do some processing. I am >>>> caching the data like below in order to avoid SocketTimeoutExceptions from >>>> another library I am using for the processing. >>>> >>>> val rdd1 = sc.textFile("*******").coalesce(1000) >>>> rdd1.persist(DISK_ONLY_2) // replication factor 2 >>>> rdd1.foreachPartition { iter => } // one pass over the data to download >>>> >>>> The 3rd line fails with the above error when a partition contains a >>>> file of size more than 2GB file. >>>> >>>> Do you think this needs to be fixed in Spark? One idea may be is to use >>>> a wrapper class (something called BigByteBuffer) which keeps an array of >>>> ByteBuffers and keeps the index of the current buffer being read etc. Below >>>> is the modified DiskStore.scala. >>>> >>>> private def getBytes(file: File, offset: Long, length: Long): >>>> Option[ByteBuffer] = { >>>> val channel = new RandomAccessFile(file, "r").getChannel >>>> Utils.tryWithSafeFinally { >>>> // For small files, directly read rather than memory map >>>> if (length < minMemoryMapBytes) { >>>> // Map small file in Memory >>>> } else { >>>> // TODO Create a BigByteBuffer >>>> >>>> } >>>> } { >>>> channel.close() >>>> } >>>> } >>>> >>>> class BigByteBuffer extends ByteBuffer { >>>> val buffers: Array[ByteBuffer] >>>> var currentIndex = 0 >>>> >>>> ... // Other methods >>>> } >>>> >>>> Please let me know if there is any other work-around for the same. Thanks >>>> for your time. >>>> >>>> Regards, >>>> Jegan >>>> >>> >>> >> >