cc dev list

How are you saving the data? There are two relevant 2GB limits:

1. Caching

2. Shuffle


For caching, a partition is turned into a single block.

For shuffle, each map partition is partitioned into R blocks, where R =
number of reduce tasks. It is unlikely a shuffle block > 2G, although it
can still happen.

I think the 2nd problem is easier to fix than the 1st, because we can
handle that in the network transport layer. It'd require us to divide the
transfer of a very large block into multiple smaller blocks.



On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid <iras...@cloudera.com> wrote:

> Michael,
>
> you are right, there is definitely some limit at 2GB.  Here is a trivial
> example to demonstrate it:
>
> import org.apache.spark.storage.StorageLevel
> val d = sc.parallelize(1 to 1e6.toInt, 1).map{i => new
> Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
> d.count()
>
> It gives the same error you are observing.  I was under the same
> impression as Sean about the limits only being on blocks, not partitions --
> but clearly that isn't the case here.
>
> I don't know the whole story yet, but I just wanted to at least let you
> know you aren't crazy :)
> At the very least this suggests that you might need to make smaller
> partitions for now.
>
> Imran
>
>
> On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert <
> m_albert...@yahoo.com.invalid> wrote:
>
>> Greetings!
>>
>> Thanks for the response.
>>
>> Below is an example of the exception I saw.
>> I'd rather not post code at the moment, so I realize it is completely
>> unreasonable to ask for a diagnosis.
>> However, I will say that adding a "partitionBy()" was the last change
>> before this error was created.
>>
>>
>> Thanks for your time and any thoughts you might have.
>>
>> Sincerely,
>>  Mike
>>
>>
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
>> failure: Lost task 4.3 in stage 5.0 (TID 6012,
>> ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
>> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>>     at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
>>     at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
>>     at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
>>     at
>> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
>>     at
>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
>>     at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>     at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>     at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>     at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>     at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>     at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>     at
>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>>
>>
>>   ------------------------------
>>  *From:* Sean Owen <so...@cloudera.com>
>> *To:* Michael Albert <m_albert...@yahoo.com>
>> *Cc:* "user@spark.apache.org" <user@spark.apache.org>
>> *Sent:* Monday, February 2, 2015 10:13 PM
>> *Subject:* Re: 2GB limit for partitions?
>>
>> The limit is on blocks, not partitions. Partitions have many blocks.
>>
>> It sounds like you are creating very large values in memory, but I'm
>> not sure given your description. You will run into problems if a
>> single object is more than 2GB, of course. More of the stack trace
>> might show what is mapping that much memory.
>>
>> If you simply want data into 1000 files it's a lot simpler. Just
>> repartition into 1000 partitions and save the data. If you need more
>> control over what goes into which partition, use a Partitioner, yes.
>>
>>
>>
>> On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
>> <m_albert...@yahoo.com.invalid> wrote:
>> > Greetings!
>> >
>> > SPARK-1476 says that there is a 2G limit for "blocks".
>> > Is this the same as a 2G limit for partitions (or approximately so?)?
>> >
>> >
>> > What I had been attempting to do is the following.
>> > 1) Start with a moderately large data set (currently about 100GB, but
>> > growing).
>> > 2) Create about 1,000 files (yes, files) each representing a subset of
>> the
>> > data.
>> >
>> > The current attempt I am working on is something like this.
>> > 1) Do a "map" whose output key indicates which of the 1,000 files it
>> will go
>> > into and whose value is what I will want to stick into the file.
>> > 2) Partition the data and use the body of mapPartition to open a file
>> and
>> > save the data.
>> >
>> > My apologies, this is actually embedded in a bigger mess, so I won't
>> post
>> > it.
>> >
>> > However, I get errors telling me that there is an
>> "IllegalArgumentException:
>> > Size exceeds Inter.MAX_VALUE", with sun.nio.ch.FileChannelImpl.map at
>> the
>> > top of the stack.  This leads me to think that I have hit the limit or
>> > partition and/or block size.
>> >
>> > Perhaps this is not a good way to do it?
>> >
>> > I suppose I could run 1,000 passes over the data, each time collecting
>> the
>> > output for one of my 1,000 final files, but that seems likely to be
>> > painfully slow to run.
>> >
>> > Am I missing something?
>> >
>> > Admittedly, this is an odd use case....
>> >
>> > Thanks!
>> >
>> > Sincerely,
>> >  Mike Albert
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>>
>>
>>
>

Reply via email to