You can write the data to local hdfs (or local disk) and just load it from

On Mon, Oct 5, 2015 at 4:37 PM, Jegan <> wrote:

> Thanks for your suggestion Ted.
> Unfortunately at this point of time I cannot go beyond 1000 partitions. I
> am writing this data to BigQuery and it has a limit of 1000 jobs per day
> for a table(they have some limits on this)  I currently create 1 load job
> per partition. Is there any other work-around?
> Thanks again.
> Regards,
> Jegan
> On Mon, Oct 5, 2015 at 3:53 PM, Ted Yu <> wrote:
>> As a workaround, can you set the number of partitions higher in the
>> sc.textFile method ?
>> Cheers
>> On Mon, Oct 5, 2015 at 3:31 PM, Jegan <> wrote:
>>> Hi All,
>>> I am facing the below exception when the size of the file being read in
>>> a partition is above 2GB. This is apparently because Java's limitation on
>>> memory mapped files. It supports mapping only 2GB files.
>>> Caused by: java.lang.IllegalArgumentException: Size exceeds
>>> Integer.MAX_VALUE
>>>     at
>>>     at
>>>     at
>>>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1207)
>>>     at
>>>     at
>>>     at
>>>     at
>>>     at
>>>     at
>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153)
>>>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>>>     at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>>     at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>     at
>>>     at
>>> org.apache.spark.executor.Executor$
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$
>>>     at
>>> My use case is to read the files from S3 and do some processing. I am
>>> caching the data like below in order to avoid SocketTimeoutExceptions from
>>> another library I am using for the processing.
>>> val rdd1 = sc.textFile("*******").coalesce(1000)
>>> rdd1.persist(DISK_ONLY_2) // replication factor 2
>>> rdd1.foreachPartition { iter => } // one pass over the data to download
>>> The 3rd line fails with the above error when a partition contains a file
>>> of size more than 2GB file.
>>> Do you think this needs to be fixed in Spark? One idea may be is to use
>>> a wrapper class (something called BigByteBuffer) which keeps an array of
>>> ByteBuffers and keeps the index of the current buffer being read etc. Below
>>> is the modified DiskStore.scala.
>>> private def getBytes(file: File, offset: Long, length: Long): 
>>> Option[ByteBuffer] = {
>>>   val channel = new RandomAccessFile(file, "r").getChannel
>>>   Utils.tryWithSafeFinally {
>>>     // For small files, directly read rather than memory map
>>>     if (length < minMemoryMapBytes) {
>>>       // Map small file in Memory
>>>     } else {
>>>       // TODO Create a BigByteBuffer
>>>     }
>>>   } {
>>>     channel.close()
>>>   }
>>> }
>>> class BigByteBuffer extends ByteBuffer {
>>>   val buffers: Array[ByteBuffer]
>>>   var currentIndex = 0
>>>   ... // Other methods
>>> }
>>> Please let me know if there is any other work-around for the same. Thanks 
>>> for your time.
>>> Regards,
>>> Jegan

Reply via email to