Hi Mridul,
do you think you'll keep working on this, or should this get picked up by others? Looks like there was a lot of work put into LargeByteBuffer, seems promising. thanks, Imran On Tue, Feb 3, 2015 at 7:32 PM, Mridul Muralidharan <mri...@gmail.com> wrote: > That is fairly out of date (we used to run some of our jobs on it ... But > that is forked off 1.1 actually). > > Regards > Mridul > > > On Tuesday, February 3, 2015, Imran Rashid <iras...@cloudera.com> wrote: > >> Thanks for the explanations, makes sense. For the record looks like this >> was worked on a while back (and maybe the work is even close to a >> solution?) >> >> https://issues.apache.org/jira/browse/SPARK-1476 >> >> and perhaps an independent solution was worked on here? >> >> https://issues.apache.org/jira/browse/SPARK-1391 >> >> >> On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin <r...@databricks.com> wrote: >> >> > cc dev list >> > >> > >> > How are you saving the data? There are two relevant 2GB limits: >> > >> > 1. Caching >> > >> > 2. Shuffle >> > >> > >> > For caching, a partition is turned into a single block. >> > >> > For shuffle, each map partition is partitioned into R blocks, where R = >> > number of reduce tasks. It is unlikely a shuffle block > 2G, although it >> > can still happen. >> > >> > I think the 2nd problem is easier to fix than the 1st, because we can >> > handle that in the network transport layer. It'd require us to divide >> the >> > transfer of a very large block into multiple smaller blocks. >> > >> > >> > >> > On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid <iras...@cloudera.com> >> wrote: >> > >> >> Michael, >> >> >> >> you are right, there is definitely some limit at 2GB. Here is a >> trivial >> >> example to demonstrate it: >> >> >> >> import org.apache.spark.storage.StorageLevel >> >> val d = sc.parallelize(1 to 1e6.toInt, 1).map{i => new >> >> Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) >> >> d.count() >> >> >> >> It gives the same error you are observing. I was under the same >> >> impression as Sean about the limits only being on blocks, not >> partitions -- >> >> but clearly that isn't the case here. >> >> >> >> I don't know the whole story yet, but I just wanted to at least let you >> >> know you aren't crazy :) >> >> At the very least this suggests that you might need to make smaller >> >> partitions for now. >> >> >> >> Imran >> >> >> >> >> >> On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert < >> >> m_albert...@yahoo.com.invalid> wrote: >> >> >> >>> Greetings! >> >>> >> >>> Thanks for the response. >> >>> >> >>> Below is an example of the exception I saw. >> >>> I'd rather not post code at the moment, so I realize it is completely >> >>> unreasonable to ask for a diagnosis. >> >>> However, I will say that adding a "partitionBy()" was the last change >> >>> before this error was created. >> >>> >> >>> >> >>> Thanks for your time and any thoughts you might have. >> >>> >> >>> Sincerely, >> >>> Mike >> >>> >> >>> >> >>> >> >>> Exception in thread "main" org.apache.spark.SparkException: Job >> aborted >> >>> due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent >> >>> failure: Lost task 4.3 in stage 5.0 (TID 6012, >> >>> ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: >> >>> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE >> >>> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) >> >>> at >> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) >> >>> at >> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) >> >>> at >> >>> >> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) >> >>> at >> >>> >> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) >> >>> at >> >>> >> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >> >>> at >> >>> >> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >> >>> at >> >>> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> >>> at >> >>> >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> >>> at >> >>> >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> >>> at >> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> >>> at >> >>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> >>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >> >>> at >> >>> >> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) >> >>> >> >>> >> >>> ------------------------------ >> >>> *From:* Sean Owen <so...@cloudera.com> >> >>> *To:* Michael Albert <m_albert...@yahoo.com> >> >>> *Cc:* "u...@spark.apache.org" <u...@spark.apache.org> >> >>> *Sent:* Monday, February 2, 2015 10:13 PM >> >>> *Subject:* Re: 2GB limit for partitions? >> >> >>> >> >>> The limit is on blocks, not partitions. Partitions have many blocks. >> >>> >> >>> It sounds like you are creating very large values in memory, but I'm >> >>> not sure given your description. You will run into problems if a >> >>> single object is more than 2GB, of course. More of the stack trace >> >>> might show what is mapping that much memory. >> >>> >> >>> If you simply want data into 1000 files it's a lot simpler. Just >> >>> repartition into 1000 partitions and save the data. If you need more >> >>> control over what goes into which partition, use a Partitioner, yes. >> >>> >> >>> >> >>> >> >>> On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert >> >>> <m_albert...@yahoo.com.invalid> wrote: >> >>> > Greetings! >> >>> > >> >>> > SPARK-1476 says that there is a 2G limit for "blocks". >> >>> > Is this the same as a 2G limit for partitions (or approximately >> so?)? >> >>> > >> >>> > >> >>> > What I had been attempting to do is the following. >> >>> > 1) Start with a moderately large data set (currently about 100GB, >> but >> >>> > growing). >> >>> > 2) Create about 1,000 files (yes, files) each representing a subset >> of >> >>> the >> >>> > data. >> >>> > >> >>> > The current attempt I am working on is something like this. >> >>> > 1) Do a "map" whose output key indicates which of the 1,000 files it >> >>> will go >> >>> > into and whose value is what I will want to stick into the file. >> >>> > 2) Partition the data and use the body of mapPartition to open a >> file >> >>> and >> >>> > save the data. >> >>> > >> >>> > My apologies, this is actually embedded in a bigger mess, so I won't >> >>> post >> >>> > it. >> >>> > >> >>> > However, I get errors telling me that there is an >> >>> "IllegalArgumentException: >> >>> > Size exceeds Inter.MAX_VALUE", with sun.nio.ch.FileChannelImpl.map >> at >> >>> the >> >>> > top of the stack. This leads me to think that I have hit the limit >> or >> >>> > partition and/or block size. >> >>> > >> >>> > Perhaps this is not a good way to do it? >> >>> > >> >>> > I suppose I could run 1,000 passes over the data, each time >> collecting >> >>> the >> >>> > output for one of my 1,000 final files, but that seems likely to be >> >>> > painfully slow to run. >> >>> > >> >>> > Am I missing something? >> >>> > >> >>> > Admittedly, this is an odd use case.... >> >>> > >> >>> > Thanks! >> >>> > >> >>> > Sincerely, >> >>> > Mike Albert >> >>> >> >>> >> >>> --------------------------------------------------------------------- >> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >>> For additional commands, e-mail: user-h...@spark.apache.org >> >>> >> >>> >> >>> >> >>> >> >>> >> >> >> > >> >