Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it:
import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt, 1).map{i => new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :) At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert < m_albert...@yahoo.com.invalid> wrote: > Greetings! > > Thanks for the response. > > Below is an example of the exception I saw. > I'd rather not post code at the moment, so I realize it is completely > unreasonable to ask for a diagnosis. > However, I will say that adding a "partitionBy()" was the last change > before this error was created. > > > Thanks for your time and any thoughts you might have. > > Sincerely, > Mike > > > > Exception in thread "main" org.apache.spark.SparkException: Job aborted > due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent > failure: Lost task 4.3 in stage 5.0 (TID 6012, > ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: > java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) > at > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) > at > org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) > at > org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) > at > org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) > at > org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) > > > ------------------------------ > *From:* Sean Owen <so...@cloudera.com> > *To:* Michael Albert <m_albert...@yahoo.com> > *Cc:* "user@spark.apache.org" <user@spark.apache.org> > *Sent:* Monday, February 2, 2015 10:13 PM > *Subject:* Re: 2GB limit for partitions? > > The limit is on blocks, not partitions. Partitions have many blocks. > > It sounds like you are creating very large values in memory, but I'm > not sure given your description. You will run into problems if a > single object is more than 2GB, of course. More of the stack trace > might show what is mapping that much memory. > > If you simply want data into 1000 files it's a lot simpler. Just > repartition into 1000 partitions and save the data. If you need more > control over what goes into which partition, use a Partitioner, yes. > > > > On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert > <m_albert...@yahoo.com.invalid> wrote: > > Greetings! > > > > SPARK-1476 says that there is a 2G limit for "blocks". > > Is this the same as a 2G limit for partitions (or approximately so?)? > > > > > > What I had been attempting to do is the following. > > 1) Start with a moderately large data set (currently about 100GB, but > > growing). > > 2) Create about 1,000 files (yes, files) each representing a subset of > the > > data. > > > > The current attempt I am working on is something like this. > > 1) Do a "map" whose output key indicates which of the 1,000 files it > will go > > into and whose value is what I will want to stick into the file. > > 2) Partition the data and use the body of mapPartition to open a file and > > save the data. > > > > My apologies, this is actually embedded in a bigger mess, so I won't post > > it. > > > > However, I get errors telling me that there is an > "IllegalArgumentException: > > Size exceeds Inter.MAX_VALUE", with sun.nio.ch.FileChannelImpl.map at the > > top of the stack. This leads me to think that I have hit the limit or > > partition and/or block size. > > > > Perhaps this is not a good way to do it? > > > > I suppose I could run 1,000 passes over the data, each time collecting > the > > output for one of my 1,000 final files, but that seems likely to be > > painfully slow to run. > > > > Am I missing something? > > > > Admittedly, this is an odd use case.... > > > > Thanks! > > > > Sincerely, > > Mike Albert > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > > > >