I meant to say just copy everything to a local hdfs, and then don't use
caching ...


On Mon, Oct 5, 2015 at 4:52 PM, Jegan <jega...@gmail.com> wrote:

> I am sorry, I didn't understand it completely. Are you suggesting to copy
> the files from S3 to HDFS? Actually, that is what I am doing. I am reading
> the files using Spark and persisting it locally.
>
> Or did you actually mean to ask the producer to write the files directly
> to HDFS instead of S3? I am not sure I can do this now either.
>
> Please clarify me if I misunderstood what you meant.
>
> Thanks,
> Jegan
>
> On Mon, Oct 5, 2015 at 4:42 PM, Reynold Xin <r...@databricks.com> wrote:
>
>> You can write the data to local hdfs (or local disk) and just load it
>> from there.
>>
>>
>> On Mon, Oct 5, 2015 at 4:37 PM, Jegan <jega...@gmail.com> wrote:
>>
>>> Thanks for your suggestion Ted.
>>>
>>> Unfortunately at this point of time I cannot go beyond 1000 partitions.
>>> I am writing this data to BigQuery and it has a limit of 1000 jobs per day
>>> for a table(they have some limits on this)  I currently create 1 load job
>>> per partition. Is there any other work-around?
>>>
>>> Thanks again.
>>>
>>> Regards,
>>> Jegan
>>>
>>> On Mon, Oct 5, 2015 at 3:53 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> As a workaround, can you set the number of partitions higher in the
>>>> sc.textFile method ?
>>>>
>>>> Cheers
>>>>
>>>> On Mon, Oct 5, 2015 at 3:31 PM, Jegan <jega...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I am facing the below exception when the size of the file being read
>>>>> in a partition is above 2GB. This is apparently because Java's limitation
>>>>> on memory mapped files. It supports mapping only 2GB files.
>>>>>
>>>>> Caused by: java.lang.IllegalArgumentException: Size exceeds
>>>>> Integer.MAX_VALUE
>>>>>     at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
>>>>>     at
>>>>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
>>>>>     at
>>>>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
>>>>>     at
>>>>> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1207)
>>>>>     at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
>>>>>     at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
>>>>>     at
>>>>> org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:102)
>>>>>     at
>>>>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:791)
>>>>>     at
>>>>> org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
>>>>>     at
>>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153)
>>>>>     at
>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>>>>>     at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>>>>     at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>>>>     at
>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>>>>     at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> My use case is to read the files from S3 and do some processing. I am
>>>>> caching the data like below in order to avoid SocketTimeoutExceptions from
>>>>> another library I am using for the processing.
>>>>>
>>>>> val rdd1 = sc.textFile("*******").coalesce(1000)
>>>>> rdd1.persist(DISK_ONLY_2) // replication factor 2
>>>>> rdd1.foreachPartition { iter => } // one pass over the data to download
>>>>>
>>>>> The 3rd line fails with the above error when a partition contains a
>>>>> file of size more than 2GB file.
>>>>>
>>>>> Do you think this needs to be fixed in Spark? One idea may be is to
>>>>> use a wrapper class (something called BigByteBuffer) which keeps an array
>>>>> of ByteBuffers and keeps the index of the current buffer being read etc.
>>>>> Below is the modified DiskStore.scala.
>>>>>
>>>>> private def getBytes(file: File, offset: Long, length: Long): 
>>>>> Option[ByteBuffer] = {
>>>>>   val channel = new RandomAccessFile(file, "r").getChannel
>>>>>   Utils.tryWithSafeFinally {
>>>>>     // For small files, directly read rather than memory map
>>>>>     if (length < minMemoryMapBytes) {
>>>>>       // Map small file in Memory
>>>>>     } else {
>>>>>       // TODO Create a BigByteBuffer
>>>>>
>>>>>     }
>>>>>   } {
>>>>>     channel.close()
>>>>>   }
>>>>> }
>>>>>
>>>>> class BigByteBuffer extends ByteBuffer {
>>>>>   val buffers: Array[ByteBuffer]
>>>>>   var currentIndex = 0
>>>>>
>>>>>   ... // Other methods
>>>>> }
>>>>>
>>>>> Please let me know if there is any other work-around for the same. Thanks 
>>>>> for your time.
>>>>>
>>>>> Regards,
>>>>> Jegan
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to