Re: 2GB limit for partitions?
*Subject:* Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this. 1) Do a map whose output key indicates which of the 1,000 files it will go into and whose value is what I will want to stick into the file. 2) Partition the data and use the body of mapPartition to open a file and save the data. My apologies, this is actually embedded in a bigger mess, so I won't post it. However, I get errors telling me that there is an IllegalArgumentException: Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the top of the stack. This leads me to think that I have hit the limit or partition and/or block size. Perhaps this is not a good way to do it? I suppose I could run 1,000 passes over the data, each time collecting the output for one of my 1,000 final files, but that seems likely to be painfully slow to run. Am I missing something? Admittedly, this is an odd use case Thanks! Sincerely, Mike Albert - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 2GB limit for partitions?
Hi Mridul, do you think you'll keep working on this, or should this get picked up by others? Looks like there was a lot of work put into LargeByteBuffer, seems promising. thanks, Imran On Tue, Feb 3, 2015 at 7:32 PM, Mridul Muralidharan mri...@gmail.com wrote: That is fairly out of date (we used to run some of our jobs on it ... But that is forked off 1.1 actually). Regards Mridul On Tuesday, February 3, 2015, Imran Rashid iras...@cloudera.com wrote: Thanks for the explanations, makes sense. For the record looks like this was worked on a while back (and maybe the work is even close to a solution?) https://issues.apache.org/jira/browse/SPARK-1476 and perhaps an independent solution was worked on here? https://issues.apache.org/jira/browse/SPARK-1391 On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin r...@databricks.com wrote: cc dev list How are you saving the data? There are two relevant 2GB limits: 1. Caching 2. Shuffle For caching, a partition is turned into a single block. For shuffle, each map partition is partitioned into R blocks, where R = number of reduce tasks. It is unlikely a shuffle block 2G, although it can still happen. I think the 2nd problem is easier to fix than the 1st, because we can handle that in the network transport layer. It'd require us to divide the transfer of a very large block into multiple smaller blocks. On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote: Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :) At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw. I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis. However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) -- *From:* Sean Owen so...@cloudera.com *To:* Michael Albert m_albert...@yahoo.com *Cc:* user@spark.apache.org user@spark.apache.org *Sent:* Monday, February 2, 2015 10:13 PM *Subject:* Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition
Re: 2GB limit for partitions?
To be clear, there is no distinction between partitions and blocks for RDD caching (each RDD partition corresponds to 1 cache block). The distinction is important for shuffling, where by definition N partitions are shuffled into M partitions, creating N*M intermediate blocks. Each of these blocks must also be smaller than 2GB, but due to their number, this is an atypical scenario. If you do sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.repartition(1000).count() you should not see this error, as the 5GB initial partition was split into 1000 partitions of 5MB each, during a shuffle. On the other hand, sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.repartition(1).count() may have the same error as Imran showed for caching, and for the same reason. On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote: Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :) At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw. I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis. However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) -- *From:* Sean Owen so...@cloudera.com *To:* Michael Albert m_albert...@yahoo.com *Cc:* user@spark.apache.org user@spark.apache.org *Sent:* Monday, February 2, 2015 10:13 PM *Subject:* Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this. 1) Do a map whose output key indicates which of the 1,000 files it will go into and whose value is what I will want to stick into the file. 2) Partition the data and use
Re: 2GB limit for partitions?
Thank you! This is very helpful. -Mike From: Aaron Davidson ilike...@gmail.com To: Imran Rashid iras...@cloudera.com Cc: Michael Albert m_albert...@yahoo.com; Sean Owen so...@cloudera.com; user@spark.apache.org user@spark.apache.org Sent: Tuesday, February 3, 2015 6:13 PM Subject: Re: 2GB limit for partitions? To be clear, there is no distinction between partitions and blocks for RDD caching (each RDD partition corresponds to 1 cache block). The distinction is important for shuffling, where by definition N partitions are shuffled into M partitions, creating N*M intermediate blocks. Each of these blocks must also be smaller than 2GB, but due to their number, this is an atypical scenario. If you do sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.repartition(1000).count() you should not see this error, as the 5GB initial partition was split into 1000 partitions of 5MB each, during a shuffle. On the other hand, sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.repartition(1).count() may have the same error as Imran showed for caching, and for the same reason. On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote: Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevelval d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :)At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw.I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis.However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) From: Sean Owen so...@cloudera.com To: Michael Albert m_albert...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, February 2, 2015 10:13 PM Subject: Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt
Re: 2GB limit for partitions?
Thanks for the explanations, makes sense. For the record looks like this was worked on a while back (and maybe the work is even close to a solution?) https://issues.apache.org/jira/browse/SPARK-1476 and perhaps an independent solution was worked on here? https://issues.apache.org/jira/browse/SPARK-1391 On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin r...@databricks.com wrote: cc dev list How are you saving the data? There are two relevant 2GB limits: 1. Caching 2. Shuffle For caching, a partition is turned into a single block. For shuffle, each map partition is partitioned into R blocks, where R = number of reduce tasks. It is unlikely a shuffle block 2G, although it can still happen. I think the 2nd problem is easier to fix than the 1st, because we can handle that in the network transport layer. It'd require us to divide the transfer of a very large block into multiple smaller blocks. On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote: Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :) At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw. I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis. However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) -- *From:* Sean Owen so...@cloudera.com *To:* Michael Albert m_albert...@yahoo.com *Cc:* user@spark.apache.org user@spark.apache.org *Sent:* Monday, February 2, 2015 10:13 PM *Subject:* Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this. 1) Do a map whose output key indicates which of the 1,000
Re: 2GB limit for partitions?
cc dev list How are you saving the data? There are two relevant 2GB limits: 1. Caching 2. Shuffle For caching, a partition is turned into a single block. For shuffle, each map partition is partitioned into R blocks, where R = number of reduce tasks. It is unlikely a shuffle block 2G, although it can still happen. I think the 2nd problem is easier to fix than the 1st, because we can handle that in the network transport layer. It'd require us to divide the transfer of a very large block into multiple smaller blocks. On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote: Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :) At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw. I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis. However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) -- *From:* Sean Owen so...@cloudera.com *To:* Michael Albert m_albert...@yahoo.com *Cc:* user@spark.apache.org user@spark.apache.org *Sent:* Monday, February 2, 2015 10:13 PM *Subject:* Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this. 1) Do a map whose output key indicates which of the 1,000 files it will go into and whose value is what I will want to stick into the file. 2) Partition the data and use the body of mapPartition to open a file and save the data. My apologies, this is actually embedded in a bigger mess, so I won't post it. However, I get errors telling me that there is an IllegalArgumentException: Size exceeds Inter.MAX_VALUE
Re: 2GB limit for partitions?
Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :) At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw. I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis. However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) -- *From:* Sean Owen so...@cloudera.com *To:* Michael Albert m_albert...@yahoo.com *Cc:* user@spark.apache.org user@spark.apache.org *Sent:* Monday, February 2, 2015 10:13 PM *Subject:* Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this. 1) Do a map whose output key indicates which of the 1,000 files it will go into and whose value is what I will want to stick into the file. 2) Partition the data and use the body of mapPartition to open a file and save the data. My apologies, this is actually embedded in a bigger mess, so I won't post it. However, I get errors telling me that there is an IllegalArgumentException: Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the top of the stack. This leads me to think that I have hit the limit or partition and/or block size. Perhaps this is not a good way to do it? I suppose I could run 1,000 passes over the data, each time collecting the output for one of my 1,000 final files, but that seems likely to be painfully slow to run. Am I missing something? Admittedly, this is an odd use case Thanks! Sincerely, Mike Albert - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e
Re: 2GB limit for partitions?
Greetings! Thanks for the response. Below is an example of the exception I saw.I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis.However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) From: Sean Owen so...@cloudera.com To: Michael Albert m_albert...@yahoo.com Cc: user@spark.apache.org user@spark.apache.org Sent: Monday, February 2, 2015 10:13 PM Subject: Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this. 1) Do a map whose output key indicates which of the 1,000 files it will go into and whose value is what I will want to stick into the file. 2) Partition the data and use the body of mapPartition to open a file and save the data. My apologies, this is actually embedded in a bigger mess, so I won't post it. However, I get errors telling me that there is an IllegalArgumentException: Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the top of the stack. This leads me to think that I have hit the limit or partition and/or block size. Perhaps this is not a good way to do it? I suppose I could run 1,000 passes over the data, each time collecting the output for one of my 1,000 final files, but that seems likely to be painfully slow to run. Am I missing something? Admittedly, this is an odd use case Thanks! Sincerely, Mike Albert - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 2GB limit for partitions?
That is fairly out of date (we used to run some of our jobs on it ... But that is forked off 1.1 actually). Regards Mridul On Tuesday, February 3, 2015, Imran Rashid iras...@cloudera.com wrote: Thanks for the explanations, makes sense. For the record looks like this was worked on a while back (and maybe the work is even close to a solution?) https://issues.apache.org/jira/browse/SPARK-1476 and perhaps an independent solution was worked on here? https://issues.apache.org/jira/browse/SPARK-1391 On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin r...@databricks.com javascript:; wrote: cc dev list How are you saving the data? There are two relevant 2GB limits: 1. Caching 2. Shuffle For caching, a partition is turned into a single block. For shuffle, each map partition is partitioned into R blocks, where R = number of reduce tasks. It is unlikely a shuffle block 2G, although it can still happen. I think the 2nd problem is easier to fix than the 1st, because we can handle that in the network transport layer. It'd require us to divide the transfer of a very large block into multiple smaller blocks. On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com javascript:; wrote: Michael, you are right, there is definitely some limit at 2GB. Here is a trivial example to demonstrate it: import org.apache.spark.storage.StorageLevel val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY) d.count() It gives the same error you are observing. I was under the same impression as Sean about the limits only being on blocks, not partitions -- but clearly that isn't the case here. I don't know the whole story yet, but I just wanted to at least let you know you aren't crazy :) At the very least this suggests that you might need to make smaller partitions for now. Imran On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! Thanks for the response. Below is an example of the exception I saw. I'd rather not post code at the moment, so I realize it is completely unreasonable to ask for a diagnosis. However, I will say that adding a partitionBy() was the last change before this error was created. Thanks for your time and any thoughts you might have. Sincerely, Mike Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) -- *From:* Sean Owen so...@cloudera.com javascript:; *To:* Michael Albert m_albert...@yahoo.com javascript:; *Cc:* user@spark.apache.org javascript:; user@spark.apache.org javascript:; *Sent:* Monday, February 2, 2015 10:13 PM *Subject:* Re: 2GB limit for partitions? The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks
2GB limit for partitions?
Greetings! SPARK-1476 says that there is a 2G limit for blocks.Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following.1) Start with a moderately large data set (currently about 100GB, but growing).2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this.1) Do a map whose output key indicates which of the 1,000 files it will go into and whose value is what I will want to stick into the file.2) Partition the data and use the body of mapPartition to open a file and save the data. My apologies, this is actually embedded in a bigger mess, so I won't post it. However, I get errors telling me that there is an IllegalArgumentException: Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the top of the stack. This leads me to think that I have hit the limit or partition and/or block size. Perhaps this is not a good way to do it? I suppose I could run 1,000 passes over the data, each time collecting the output for one of my 1,000 final files, but that seems likely to be painfully slow to run. Am I missing something? Admittedly, this is an odd use case Thanks! Sincerely, Mike Albert
Re: 2GB limit for partitions?
The limit is on blocks, not partitions. Partitions have many blocks. It sounds like you are creating very large values in memory, but I'm not sure given your description. You will run into problems if a single object is more than 2GB, of course. More of the stack trace might show what is mapping that much memory. If you simply want data into 1000 files it's a lot simpler. Just repartition into 1000 partitions and save the data. If you need more control over what goes into which partition, use a Partitioner, yes. On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert m_albert...@yahoo.com.invalid wrote: Greetings! SPARK-1476 says that there is a 2G limit for blocks. Is this the same as a 2G limit for partitions (or approximately so?)? What I had been attempting to do is the following. 1) Start with a moderately large data set (currently about 100GB, but growing). 2) Create about 1,000 files (yes, files) each representing a subset of the data. The current attempt I am working on is something like this. 1) Do a map whose output key indicates which of the 1,000 files it will go into and whose value is what I will want to stick into the file. 2) Partition the data and use the body of mapPartition to open a file and save the data. My apologies, this is actually embedded in a bigger mess, so I won't post it. However, I get errors telling me that there is an IllegalArgumentException: Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the top of the stack. This leads me to think that I have hit the limit or partition and/or block size. Perhaps this is not a good way to do it? I suppose I could run 1,000 passes over the data, each time collecting the output for one of my 1,000 final files, but that seems likely to be painfully slow to run. Am I missing something? Admittedly, this is an odd use case Thanks! Sincerely, Mike Albert - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org