Hi Mridul,

do you think you'll keep working on this, or should this get picked up by
others?  Looks like there was a lot of work put into LargeByteBuffer, seems
promising.

thanks,
Imran

On Tue, Feb 3, 2015 at 7:32 PM, Mridul Muralidharan <mri...@gmail.com>
wrote:

> That is fairly out of date (we used to run some of our jobs on it ... But
> that is forked off 1.1 actually).
>
> Regards
> Mridul
>
>
> On Tuesday, February 3, 2015, Imran Rashid <iras...@cloudera.com> wrote:
>
>> Thanks for the explanations, makes sense.  For the record looks like this
>> was worked on a while back (and maybe the work is even close to a
>> solution?)
>>
>> https://issues.apache.org/jira/browse/SPARK-1476
>>
>> and perhaps an independent solution was worked on here?
>>
>> https://issues.apache.org/jira/browse/SPARK-1391
>>
>>
>> On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin <r...@databricks.com> wrote:
>>
>> > cc dev list
>> >
>> >
>> > How are you saving the data? There are two relevant 2GB limits:
>> >
>> > 1. Caching
>> >
>> > 2. Shuffle
>> >
>> >
>> > For caching, a partition is turned into a single block.
>> >
>> > For shuffle, each map partition is partitioned into R blocks, where R =
>> > number of reduce tasks. It is unlikely a shuffle block > 2G, although it
>> > can still happen.
>> >
>> > I think the 2nd problem is easier to fix than the 1st, because we can
>> > handle that in the network transport layer. It'd require us to divide
>> the
>> > transfer of a very large block into multiple smaller blocks.
>> >
>> >
>> >
>> > On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid <iras...@cloudera.com>
>> wrote:
>> >
>> >> Michael,
>> >>
>> >> you are right, there is definitely some limit at 2GB.  Here is a
>> trivial
>> >> example to demonstrate it:
>> >>
>> >> import org.apache.spark.storage.StorageLevel
>> >> val d = sc.parallelize(1 to 1e6.toInt, 1).map{i => new
>> >> Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
>> >> d.count()
>> >>
>> >> It gives the same error you are observing.  I was under the same
>> >> impression as Sean about the limits only being on blocks, not
>> partitions --
>> >> but clearly that isn't the case here.
>> >>
>> >> I don't know the whole story yet, but I just wanted to at least let you
>> >> know you aren't crazy :)
>> >> At the very least this suggests that you might need to make smaller
>> >> partitions for now.
>> >>
>> >> Imran
>> >>
>> >>
>> >> On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert <
>> >> m_albert...@yahoo.com.invalid> wrote:
>> >>
>> >>> Greetings!
>> >>>
>> >>> Thanks for the response.
>> >>>
>> >>> Below is an example of the exception I saw.
>> >>> I'd rather not post code at the moment, so I realize it is completely
>> >>> unreasonable to ask for a diagnosis.
>> >>> However, I will say that adding a "partitionBy()" was the last change
>> >>> before this error was created.
>> >>>
>> >>>
>> >>> Thanks for your time and any thoughts you might have.
>> >>>
>> >>> Sincerely,
>> >>>  Mike
>> >>>
>> >>>
>> >>>
>> >>> Exception in thread "main" org.apache.spark.SparkException: Job
>> aborted
>> >>> due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
>> >>> failure: Lost task 4.3 in stage 5.0 (TID 6012,
>> >>> ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
>> >>> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>> >>>     at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
>> >>>     at
>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
>> >>>     at
>> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
>> >>>     at
>> >>>
>> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
>> >>>     at
>> >>>
>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
>> >>>     at
>> >>>
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>> >>>     at
>> >>>
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>> >>>     at
>> >>>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> >>>     at
>> >>>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> >>>     at
>> >>>
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> >>>   at
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> >>>     at
>> >>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> >>>     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>> >>>     at
>> >>>
>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>> >>>
>> >>>
>> >>>   ------------------------------
>> >>>  *From:* Sean Owen <so...@cloudera.com>
>> >>> *To:* Michael Albert <m_albert...@yahoo.com>
>> >>> *Cc:* "u...@spark.apache.org" <u...@spark.apache.org>
>> >>> *Sent:* Monday, February 2, 2015 10:13 PM
>> >>> *Subject:* Re: 2GB limit for partitions?
>>
>> >>>
>> >>> The limit is on blocks, not partitions. Partitions have many blocks.
>> >>>
>> >>> It sounds like you are creating very large values in memory, but I'm
>> >>> not sure given your description. You will run into problems if a
>> >>> single object is more than 2GB, of course. More of the stack trace
>> >>> might show what is mapping that much memory.
>> >>>
>> >>> If you simply want data into 1000 files it's a lot simpler. Just
>> >>> repartition into 1000 partitions and save the data. If you need more
>> >>> control over what goes into which partition, use a Partitioner, yes.
>> >>>
>> >>>
>> >>>
>> >>> On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
>> >>> <m_albert...@yahoo.com.invalid> wrote:
>> >>> > Greetings!
>> >>> >
>> >>> > SPARK-1476 says that there is a 2G limit for "blocks".
>> >>> > Is this the same as a 2G limit for partitions (or approximately
>> so?)?
>> >>> >
>> >>> >
>> >>> > What I had been attempting to do is the following.
>> >>> > 1) Start with a moderately large data set (currently about 100GB,
>> but
>> >>> > growing).
>> >>> > 2) Create about 1,000 files (yes, files) each representing a subset
>> of
>> >>> the
>> >>> > data.
>> >>> >
>> >>> > The current attempt I am working on is something like this.
>> >>> > 1) Do a "map" whose output key indicates which of the 1,000 files it
>> >>> will go
>> >>> > into and whose value is what I will want to stick into the file.
>> >>> > 2) Partition the data and use the body of mapPartition to open a
>> file
>> >>> and
>> >>> > save the data.
>> >>> >
>> >>> > My apologies, this is actually embedded in a bigger mess, so I won't
>> >>> post
>> >>> > it.
>> >>> >
>> >>> > However, I get errors telling me that there is an
>> >>> "IllegalArgumentException:
>> >>> > Size exceeds Inter.MAX_VALUE", with sun.nio.ch.FileChannelImpl.map
>> at
>> >>> the
>> >>> > top of the stack.  This leads me to think that I have hit the limit
>> or
>> >>> > partition and/or block size.
>> >>> >
>> >>> > Perhaps this is not a good way to do it?
>> >>> >
>> >>> > I suppose I could run 1,000 passes over the data, each time
>> collecting
>> >>> the
>> >>> > output for one of my 1,000 final files, but that seems likely to be
>> >>> > painfully slow to run.
>> >>> >
>> >>> > Am I missing something?
>> >>> >
>> >>> > Admittedly, this is an odd use case....
>> >>> >
>> >>> > Thanks!
>> >>> >
>> >>> > Sincerely,
>> >>> >  Mike Albert
>> >>>
>> >>>
>> >>> ---------------------------------------------------------------------
>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>
>> >
>>
>

Reply via email to