Thank you!
This is very helpful.
-Mike

      From: Aaron Davidson <ilike...@gmail.com>
 To: Imran Rashid <iras...@cloudera.com> 
Cc: Michael Albert <m_albert...@yahoo.com>; Sean Owen <so...@cloudera.com>; 
"user@spark.apache.org" <user@spark.apache.org> 
 Sent: Tuesday, February 3, 2015 6:13 PM
 Subject: Re: 2GB limit for partitions?
   
To be clear, there is no distinction between partitions and blocks for RDD 
caching (each RDD partition corresponds to 1 cache block). The distinction is 
important for shuffling, where by definition N partitions are shuffled into M 
partitions, creating N*M intermediate blocks. Each of these blocks must also be 
smaller than 2GB, but due to their number, this is an atypical scenario.
If you do
sc.parallelize(1 to 1e6.toInt, 1).map{i => new 
Array[Byte](5e3.toInt)}.repartition(1000).count()
you should not see this error, as the 5GB initial partition was split into 1000 
partitions of 5MB each, during a shuffle.
On the other hand,
sc.parallelize(1 to 1e6.toInt, 1).map{i => new 
Array[Byte](5e3.toInt)}.repartition(1).count()

may have the same error as Imran showed for caching, and for the same reason.


On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid <iras...@cloudera.com> wrote:

Michael,
you are right, there is definitely some limit at 2GB.  Here is a trivial 
example to demonstrate it:
import org.apache.spark.storage.StorageLevelval d = sc.parallelize(1 to 
1e6.toInt, 1).map{i => new 
Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)d.count()
It gives the same error you are observing.  I was under the same impression as 
Sean about the limits only being on blocks, not partitions -- but clearly that 
isn't the case here.
I don't know the whole story yet, but I just wanted to at least let you know 
you aren't crazy :)At the very least this suggests that you might need to make 
smaller partitions for now.
Imran

On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert <m_albert...@yahoo.com.invalid> 
wrote:

Greetings!
Thanks for the response.
Below is an example of the exception I saw.I'd rather not post code at the 
moment, so I realize it is completely unreasonable to ask for a 
diagnosis.However, I will say that adding a "partitionBy()" was the last change 
before this error was created.

Thanks for your time and any thoughts you might have.
Sincerely, Mike


Exception in thread "main" org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost 
task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): 
java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds 
Integer.MAX_VALUE    at 
sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)    at 
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)    at 
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)    at 
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)    at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)    
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)    
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)    at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)    at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)

      From: Sean Owen <so...@cloudera.com>
 To: Michael Albert <m_albert...@yahoo.com> 
Cc: "user@spark.apache.org" <user@spark.apache.org> 
 Sent: Monday, February 2, 2015 10:13 PM
 Subject: Re: 2GB limit for partitions?
   
The limit is on blocks, not partitions. Partitions have many blocks.

It sounds like you are creating very large values in memory, but I'm
not sure given your description. You will run into problems if a
single object is more than 2GB, of course. More of the stack trace
might show what is mapping that much memory.

If you simply want data into 1000 files it's a lot simpler. Just
repartition into 1000 partitions and save the data. If you need more
control over what goes into which partition, use a Partitioner, yes.



On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
<m_albert...@yahoo.com.invalid> wrote:
> Greetings!
>
> SPARK-1476 says that there is a 2G limit for "blocks".
> Is this the same as a 2G limit for partitions (or approximately so?)?
>
>
> What I had been attempting to do is the following.
> 1) Start with a moderately large data set (currently about 100GB, but
> growing).
> 2) Create about 1,000 files (yes, files) each representing a subset of the
> data.
>
> The current attempt I am working on is something like this.
> 1) Do a "map" whose output key indicates which of the 1,000 files it will go
> into and whose value is what I will want to stick into the file.
> 2) Partition the data and use the body of mapPartition to open a file and
> save the data.
>
> My apologies, this is actually embedded in a bigger mess, so I won't post
> it.
>
> However, I get errors telling me that there is an "IllegalArgumentException:
> Size exceeds Inter.MAX_VALUE", with sun.nio.ch.FileChannelImpl.map at the
> top of the stack.  This leads me to think that I have hit the limit or
> partition and/or block size.
>
> Perhaps this is not a good way to do it?
>
> I suppose I could run 1,000 passes over the data, each time collecting the
> output for one of my 1,000 final files, but that seems likely to be
> painfully slow to run.
>
> Am I missing something?
>
> Admittedly, this is an odd use case....
>
> Thanks!
>
> Sincerely,
>  Mike Albert

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



   





  

Reply via email to