Re: 2GB limit for partitions?

2015-02-04 Thread Mridul Muralidharan
  *Subject:* Re: 2GB limit for partitions?

 
  The limit is on blocks, not partitions. Partitions have many blocks.
 
  It sounds like you are creating very large values in memory, but I'm
  not sure given your description. You will run into problems if a
  single object is more than 2GB, of course. More of the stack trace
  might show what is mapping that much memory.
 
  If you simply want data into 1000 files it's a lot simpler. Just
  repartition into 1000 partitions and save the data. If you need more
  control over what goes into which partition, use a Partitioner, yes.
 
 
 
  On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
  m_albert...@yahoo.com.invalid wrote:
   Greetings!
  
   SPARK-1476 says that there is a 2G limit for blocks.
   Is this the same as a 2G limit for partitions (or approximately
 so?)?
  
  
   What I had been attempting to do is the following.
   1) Start with a moderately large data set (currently about 100GB,
 but
   growing).
   2) Create about 1,000 files (yes, files) each representing a
 subset of
  the
   data.
  
   The current attempt I am working on is something like this.
   1) Do a map whose output key indicates which of the 1,000 files
 it
  will go
   into and whose value is what I will want to stick into the file.
   2) Partition the data and use the body of mapPartition to open a
 file
  and
   save the data.
  
   My apologies, this is actually embedded in a bigger mess, so I
 won't
  post
   it.
  
   However, I get errors telling me that there is an
  IllegalArgumentException:
   Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map
 at
  the
   top of the stack.  This leads me to think that I have hit the
 limit or
   partition and/or block size.
  
   Perhaps this is not a good way to do it?
  
   I suppose I could run 1,000 passes over the data, each time
 collecting
  the
   output for one of my 1,000 final files, but that seems likely to be
   painfully slow to run.
  
   Am I missing something?
  
   Admittedly, this is an odd use case
  
   Thanks!
  
   Sincerely,
Mike Albert
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 
 
 
 





Re: 2GB limit for partitions?

2015-02-04 Thread Imran Rashid
Hi Mridul,


do you think you'll keep working on this, or should this get picked up by
others?  Looks like there was a lot of work put into LargeByteBuffer, seems
promising.

thanks,
Imran

On Tue, Feb 3, 2015 at 7:32 PM, Mridul Muralidharan mri...@gmail.com
wrote:

 That is fairly out of date (we used to run some of our jobs on it ... But
 that is forked off 1.1 actually).

 Regards
 Mridul


 On Tuesday, February 3, 2015, Imran Rashid iras...@cloudera.com wrote:

 Thanks for the explanations, makes sense.  For the record looks like this
 was worked on a while back (and maybe the work is even close to a
 solution?)

 https://issues.apache.org/jira/browse/SPARK-1476

 and perhaps an independent solution was worked on here?

 https://issues.apache.org/jira/browse/SPARK-1391


 On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin r...@databricks.com wrote:

  cc dev list
 
 
  How are you saving the data? There are two relevant 2GB limits:
 
  1. Caching
 
  2. Shuffle
 
 
  For caching, a partition is turned into a single block.
 
  For shuffle, each map partition is partitioned into R blocks, where R =
  number of reduce tasks. It is unlikely a shuffle block  2G, although it
  can still happen.
 
  I think the 2nd problem is easier to fix than the 1st, because we can
  handle that in the network transport layer. It'd require us to divide
 the
  transfer of a very large block into multiple smaller blocks.
 
 
 
  On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com
 wrote:
 
  Michael,
 
  you are right, there is definitely some limit at 2GB.  Here is a
 trivial
  example to demonstrate it:
 
  import org.apache.spark.storage.StorageLevel
  val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new
  Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
  d.count()
 
  It gives the same error you are observing.  I was under the same
  impression as Sean about the limits only being on blocks, not
 partitions --
  but clearly that isn't the case here.
 
  I don't know the whole story yet, but I just wanted to at least let you
  know you aren't crazy :)
  At the very least this suggests that you might need to make smaller
  partitions for now.
 
  Imran
 
 
  On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert 
  m_albert...@yahoo.com.invalid wrote:
 
  Greetings!
 
  Thanks for the response.
 
  Below is an example of the exception I saw.
  I'd rather not post code at the moment, so I realize it is completely
  unreasonable to ask for a diagnosis.
  However, I will say that adding a partitionBy() was the last change
  before this error was created.
 
 
  Thanks for your time and any thoughts you might have.
 
  Sincerely,
   Mike
 
 
 
  Exception in thread main org.apache.spark.SparkException: Job
 aborted
  due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
  failure: Lost task 4.3 in stage 5.0 (TID 6012,
  ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
  java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
  at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
  at
 
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
  at
 
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
  at
 
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
  at
 
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
  at
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at
 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  at
  scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
  at
 
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
 
 
--
   *From:* Sean Owen so...@cloudera.com
  *To:* Michael Albert m_albert...@yahoo.com
  *Cc:* user@spark.apache.org user@spark.apache.org
  *Sent:* Monday, February 2, 2015 10:13 PM
  *Subject:* Re: 2GB limit for partitions?

 
  The limit is on blocks, not partitions. Partitions have many blocks.
 
  It sounds like you are creating very large values in memory, but I'm
  not sure given your description. You will run into problems if a
  single object is more than 2GB, of course. More of the stack trace
  might show what is mapping that much memory.
 
  If you simply want data into 1000 files it's a lot simpler. Just
  repartition into 1000 partitions and save the data. If you need more
  control over what goes into which partition

Re: 2GB limit for partitions?

2015-02-03 Thread Aaron Davidson
To be clear, there is no distinction between partitions and blocks for RDD
caching (each RDD partition corresponds to 1 cache block). The distinction
is important for shuffling, where by definition N partitions are shuffled
into M partitions, creating N*M intermediate blocks. Each of these blocks
must also be smaller than 2GB, but due to their number, this is an atypical
scenario.

If you do

sc.parallelize(1 to 1e6.toInt, 1).map{i = new
Array[Byte](5e3.toInt)}.repartition(1000).count()

you should not see this error, as the 5GB initial partition was split into
1000 partitions of 5MB each, during a shuffle.

On the other hand,

sc.parallelize(1 to 1e6.toInt, 1).map{i = new
Array[Byte](5e3.toInt)}.repartition(1).count()

may have the same error as Imran showed for caching, and for the same
reason.

On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote:

 Michael,

 you are right, there is definitely some limit at 2GB.  Here is a trivial
 example to demonstrate it:

 import org.apache.spark.storage.StorageLevel
 val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new
 Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
 d.count()

 It gives the same error you are observing.  I was under the same
 impression as Sean about the limits only being on blocks, not partitions --
 but clearly that isn't the case here.

 I don't know the whole story yet, but I just wanted to at least let you
 know you aren't crazy :)
 At the very least this suggests that you might need to make smaller
 partitions for now.

 Imran


 On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert 
 m_albert...@yahoo.com.invalid wrote:

 Greetings!

 Thanks for the response.

 Below is an example of the exception I saw.
 I'd rather not post code at the moment, so I realize it is completely
 unreasonable to ask for a diagnosis.
 However, I will say that adding a partitionBy() was the last change
 before this error was created.


 Thanks for your time and any thoughts you might have.

 Sincerely,
  Mike



 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
 failure: Lost task 4.3 in stage 5.0 (TID 6012,
 ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
 at
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


   --
  *From:* Sean Owen so...@cloudera.com
 *To:* Michael Albert m_albert...@yahoo.com
 *Cc:* user@spark.apache.org user@spark.apache.org
 *Sent:* Monday, February 2, 2015 10:13 PM
 *Subject:* Re: 2GB limit for partitions?

 The limit is on blocks, not partitions. Partitions have many blocks.

 It sounds like you are creating very large values in memory, but I'm
 not sure given your description. You will run into problems if a
 single object is more than 2GB, of course. More of the stack trace
 might show what is mapping that much memory.

 If you simply want data into 1000 files it's a lot simpler. Just
 repartition into 1000 partitions and save the data. If you need more
 control over what goes into which partition, use a Partitioner, yes.



 On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
 m_albert...@yahoo.com.invalid wrote:
  Greetings!
 
  SPARK-1476 says that there is a 2G limit for blocks.
  Is this the same as a 2G limit for partitions (or approximately so?)?
 
 
  What I had been attempting to do is the following.
  1) Start with a moderately large data set (currently about 100GB, but
  growing).
  2) Create about 1,000 files (yes, files) each representing a subset of
 the
  data.
 
  The current attempt I am working on is something like this.
  1) Do a map whose output key indicates which of the 1,000 files it
 will go
  into and whose value is what I will want to stick into the file.
  2) Partition the data and use

Re: 2GB limit for partitions?

2015-02-03 Thread Michael Albert
Thank you!
This is very helpful.
-Mike

  From: Aaron Davidson ilike...@gmail.com
 To: Imran Rashid iras...@cloudera.com 
Cc: Michael Albert m_albert...@yahoo.com; Sean Owen so...@cloudera.com; 
user@spark.apache.org user@spark.apache.org 
 Sent: Tuesday, February 3, 2015 6:13 PM
 Subject: Re: 2GB limit for partitions?
   
To be clear, there is no distinction between partitions and blocks for RDD 
caching (each RDD partition corresponds to 1 cache block). The distinction is 
important for shuffling, where by definition N partitions are shuffled into M 
partitions, creating N*M intermediate blocks. Each of these blocks must also be 
smaller than 2GB, but due to their number, this is an atypical scenario.
If you do
sc.parallelize(1 to 1e6.toInt, 1).map{i = new 
Array[Byte](5e3.toInt)}.repartition(1000).count()
you should not see this error, as the 5GB initial partition was split into 1000 
partitions of 5MB each, during a shuffle.
On the other hand,
sc.parallelize(1 to 1e6.toInt, 1).map{i = new 
Array[Byte](5e3.toInt)}.repartition(1).count()

may have the same error as Imran showed for caching, and for the same reason.


On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote:

Michael,
you are right, there is definitely some limit at 2GB.  Here is a trivial 
example to demonstrate it:
import org.apache.spark.storage.StorageLevelval d = sc.parallelize(1 to 
1e6.toInt, 1).map{i = new 
Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)d.count()
It gives the same error you are observing.  I was under the same impression as 
Sean about the limits only being on blocks, not partitions -- but clearly that 
isn't the case here.
I don't know the whole story yet, but I just wanted to at least let you know 
you aren't crazy :)At the very least this suggests that you might need to make 
smaller partitions for now.
Imran

On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert m_albert...@yahoo.com.invalid 
wrote:

Greetings!
Thanks for the response.
Below is an example of the exception I saw.I'd rather not post code at the 
moment, so I realize it is completely unreasonable to ask for a 
diagnosis.However, I will say that adding a partitionBy() was the last change 
before this error was created.

Thanks for your time and any thoughts you might have.
Sincerely, Mike


Exception in thread main org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost 
task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): 
java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds 
Integer.MAX_VALUE    at 
sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)    at 
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)    at 
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)    at 
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)    at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)    
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)    
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)    at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)    at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)

  From: Sean Owen so...@cloudera.com
 To: Michael Albert m_albert...@yahoo.com 
Cc: user@spark.apache.org user@spark.apache.org 
 Sent: Monday, February 2, 2015 10:13 PM
 Subject: Re: 2GB limit for partitions?
   
The limit is on blocks, not partitions. Partitions have many blocks.

It sounds like you are creating very large values in memory, but I'm
not sure given your description. You will run into problems if a
single object is more than 2GB, of course. More of the stack trace
might show what is mapping that much memory.

If you simply want data into 1000 files it's a lot simpler. Just
repartition into 1000 partitions and save the data. If you need more
control over what goes into which partition, use a Partitioner, yes.



On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
m_albert...@yahoo.com.invalid wrote:
 Greetings!

 SPARK-1476 says that there is a 2G limit for blocks.
 Is this the same as a 2G limit for partitions (or approximately so?)?


 What I had been attempting to do is the following.
 1) Start with a moderately large data set (currently about 100GB, but
 growing).
 2) Create about 1,000 files (yes, files) each representing a subset of the
 data.

 The current attempt

Re: 2GB limit for partitions?

2015-02-03 Thread Imran Rashid
Thanks for the explanations, makes sense.  For the record looks like this
was worked on a while back (and maybe the work is even close to a solution?)

https://issues.apache.org/jira/browse/SPARK-1476

and perhaps an independent solution was worked on here?

https://issues.apache.org/jira/browse/SPARK-1391


On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin r...@databricks.com wrote:

 cc dev list


 How are you saving the data? There are two relevant 2GB limits:

 1. Caching

 2. Shuffle


 For caching, a partition is turned into a single block.

 For shuffle, each map partition is partitioned into R blocks, where R =
 number of reduce tasks. It is unlikely a shuffle block  2G, although it
 can still happen.

 I think the 2nd problem is easier to fix than the 1st, because we can
 handle that in the network transport layer. It'd require us to divide the
 transfer of a very large block into multiple smaller blocks.



 On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote:

 Michael,

 you are right, there is definitely some limit at 2GB.  Here is a trivial
 example to demonstrate it:

 import org.apache.spark.storage.StorageLevel
 val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new
 Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
 d.count()

 It gives the same error you are observing.  I was under the same
 impression as Sean about the limits only being on blocks, not partitions --
 but clearly that isn't the case here.

 I don't know the whole story yet, but I just wanted to at least let you
 know you aren't crazy :)
 At the very least this suggests that you might need to make smaller
 partitions for now.

 Imran


 On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert 
 m_albert...@yahoo.com.invalid wrote:

 Greetings!

 Thanks for the response.

 Below is an example of the exception I saw.
 I'd rather not post code at the moment, so I realize it is completely
 unreasonable to ask for a diagnosis.
 However, I will say that adding a partitionBy() was the last change
 before this error was created.


 Thanks for your time and any thoughts you might have.

 Sincerely,
  Mike



 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
 failure: Lost task 4.3 in stage 5.0 (TID 6012,
 ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
 at
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


   --
  *From:* Sean Owen so...@cloudera.com
 *To:* Michael Albert m_albert...@yahoo.com
 *Cc:* user@spark.apache.org user@spark.apache.org
 *Sent:* Monday, February 2, 2015 10:13 PM
 *Subject:* Re: 2GB limit for partitions?

 The limit is on blocks, not partitions. Partitions have many blocks.

 It sounds like you are creating very large values in memory, but I'm
 not sure given your description. You will run into problems if a
 single object is more than 2GB, of course. More of the stack trace
 might show what is mapping that much memory.

 If you simply want data into 1000 files it's a lot simpler. Just
 repartition into 1000 partitions and save the data. If you need more
 control over what goes into which partition, use a Partitioner, yes.



 On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
 m_albert...@yahoo.com.invalid wrote:
  Greetings!
 
  SPARK-1476 says that there is a 2G limit for blocks.
  Is this the same as a 2G limit for partitions (or approximately so?)?
 
 
  What I had been attempting to do is the following.
  1) Start with a moderately large data set (currently about 100GB, but
  growing).
  2) Create about 1,000 files (yes, files) each representing a subset of
 the
  data.
 
  The current attempt I am working on is something like this.
  1) Do a map whose output key indicates which of the 1,000

Re: 2GB limit for partitions?

2015-02-03 Thread Reynold Xin
cc dev list


How are you saving the data? There are two relevant 2GB limits:

1. Caching

2. Shuffle


For caching, a partition is turned into a single block.

For shuffle, each map partition is partitioned into R blocks, where R =
number of reduce tasks. It is unlikely a shuffle block  2G, although it
can still happen.

I think the 2nd problem is easier to fix than the 1st, because we can
handle that in the network transport layer. It'd require us to divide the
transfer of a very large block into multiple smaller blocks.



On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote:

 Michael,

 you are right, there is definitely some limit at 2GB.  Here is a trivial
 example to demonstrate it:

 import org.apache.spark.storage.StorageLevel
 val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new
 Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
 d.count()

 It gives the same error you are observing.  I was under the same
 impression as Sean about the limits only being on blocks, not partitions --
 but clearly that isn't the case here.

 I don't know the whole story yet, but I just wanted to at least let you
 know you aren't crazy :)
 At the very least this suggests that you might need to make smaller
 partitions for now.

 Imran


 On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert 
 m_albert...@yahoo.com.invalid wrote:

 Greetings!

 Thanks for the response.

 Below is an example of the exception I saw.
 I'd rather not post code at the moment, so I realize it is completely
 unreasonable to ask for a diagnosis.
 However, I will say that adding a partitionBy() was the last change
 before this error was created.


 Thanks for your time and any thoughts you might have.

 Sincerely,
  Mike



 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
 failure: Lost task 4.3 in stage 5.0 (TID 6012,
 ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
 at
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


   --
  *From:* Sean Owen so...@cloudera.com
 *To:* Michael Albert m_albert...@yahoo.com
 *Cc:* user@spark.apache.org user@spark.apache.org
 *Sent:* Monday, February 2, 2015 10:13 PM
 *Subject:* Re: 2GB limit for partitions?

 The limit is on blocks, not partitions. Partitions have many blocks.

 It sounds like you are creating very large values in memory, but I'm
 not sure given your description. You will run into problems if a
 single object is more than 2GB, of course. More of the stack trace
 might show what is mapping that much memory.

 If you simply want data into 1000 files it's a lot simpler. Just
 repartition into 1000 partitions and save the data. If you need more
 control over what goes into which partition, use a Partitioner, yes.



 On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
 m_albert...@yahoo.com.invalid wrote:
  Greetings!
 
  SPARK-1476 says that there is a 2G limit for blocks.
  Is this the same as a 2G limit for partitions (or approximately so?)?
 
 
  What I had been attempting to do is the following.
  1) Start with a moderately large data set (currently about 100GB, but
  growing).
  2) Create about 1,000 files (yes, files) each representing a subset of
 the
  data.
 
  The current attempt I am working on is something like this.
  1) Do a map whose output key indicates which of the 1,000 files it
 will go
  into and whose value is what I will want to stick into the file.
  2) Partition the data and use the body of mapPartition to open a file
 and
  save the data.
 
  My apologies, this is actually embedded in a bigger mess, so I won't
 post
  it.
 
  However, I get errors telling me that there is an
 IllegalArgumentException:
  Size exceeds Inter.MAX_VALUE

Re: 2GB limit for partitions?

2015-02-03 Thread Imran Rashid
Michael,

you are right, there is definitely some limit at 2GB.  Here is a trivial
example to demonstrate it:

import org.apache.spark.storage.StorageLevel
val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new
Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
d.count()

It gives the same error you are observing.  I was under the same impression
as Sean about the limits only being on blocks, not partitions -- but
clearly that isn't the case here.

I don't know the whole story yet, but I just wanted to at least let you
know you aren't crazy :)
At the very least this suggests that you might need to make smaller
partitions for now.

Imran


On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert 
m_albert...@yahoo.com.invalid wrote:

 Greetings!

 Thanks for the response.

 Below is an example of the exception I saw.
 I'd rather not post code at the moment, so I realize it is completely
 unreasonable to ask for a diagnosis.
 However, I will say that adding a partitionBy() was the last change
 before this error was created.


 Thanks for your time and any thoughts you might have.

 Sincerely,
  Mike



 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
 failure: Lost task 4.3 in stage 5.0 (TID 6012,
 ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
 at
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


   --
  *From:* Sean Owen so...@cloudera.com
 *To:* Michael Albert m_albert...@yahoo.com
 *Cc:* user@spark.apache.org user@spark.apache.org
 *Sent:* Monday, February 2, 2015 10:13 PM
 *Subject:* Re: 2GB limit for partitions?

 The limit is on blocks, not partitions. Partitions have many blocks.

 It sounds like you are creating very large values in memory, but I'm
 not sure given your description. You will run into problems if a
 single object is more than 2GB, of course. More of the stack trace
 might show what is mapping that much memory.

 If you simply want data into 1000 files it's a lot simpler. Just
 repartition into 1000 partitions and save the data. If you need more
 control over what goes into which partition, use a Partitioner, yes.



 On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
 m_albert...@yahoo.com.invalid wrote:
  Greetings!
 
  SPARK-1476 says that there is a 2G limit for blocks.
  Is this the same as a 2G limit for partitions (or approximately so?)?
 
 
  What I had been attempting to do is the following.
  1) Start with a moderately large data set (currently about 100GB, but
  growing).
  2) Create about 1,000 files (yes, files) each representing a subset of
 the
  data.
 
  The current attempt I am working on is something like this.
  1) Do a map whose output key indicates which of the 1,000 files it
 will go
  into and whose value is what I will want to stick into the file.
  2) Partition the data and use the body of mapPartition to open a file and
  save the data.
 
  My apologies, this is actually embedded in a bigger mess, so I won't post
  it.
 
  However, I get errors telling me that there is an
 IllegalArgumentException:
  Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the
  top of the stack.  This leads me to think that I have hit the limit or
  partition and/or block size.
 
  Perhaps this is not a good way to do it?
 
  I suppose I could run 1,000 passes over the data, each time collecting
 the
  output for one of my 1,000 final files, but that seems likely to be
  painfully slow to run.
 
  Am I missing something?
 
  Admittedly, this is an odd use case
 
  Thanks!
 
  Sincerely,
   Mike Albert


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e

Re: 2GB limit for partitions?

2015-02-03 Thread Michael Albert
Greetings!
Thanks for the response.
Below is an example of the exception I saw.I'd rather not post code at the 
moment, so I realize it is completely unreasonable to ask for a 
diagnosis.However, I will say that adding a partitionBy() was the last change 
before this error was created.

Thanks for your time and any thoughts you might have.
Sincerely, Mike


Exception in thread main org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost 
task 4.3 in stage 5.0 (TID 6012, ip-10-171-0-31.ec2.internal): 
java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds 
Integer.MAX_VALUE    at 
sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)    at 
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)    at 
org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)    at 
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)    at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)    
at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)    
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)    at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)    at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)

  From: Sean Owen so...@cloudera.com
 To: Michael Albert m_albert...@yahoo.com 
Cc: user@spark.apache.org user@spark.apache.org 
 Sent: Monday, February 2, 2015 10:13 PM
 Subject: Re: 2GB limit for partitions?
   
The limit is on blocks, not partitions. Partitions have many blocks.

It sounds like you are creating very large values in memory, but I'm
not sure given your description. You will run into problems if a
single object is more than 2GB, of course. More of the stack trace
might show what is mapping that much memory.

If you simply want data into 1000 files it's a lot simpler. Just
repartition into 1000 partitions and save the data. If you need more
control over what goes into which partition, use a Partitioner, yes.



On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
m_albert...@yahoo.com.invalid wrote:
 Greetings!

 SPARK-1476 says that there is a 2G limit for blocks.
 Is this the same as a 2G limit for partitions (or approximately so?)?


 What I had been attempting to do is the following.
 1) Start with a moderately large data set (currently about 100GB, but
 growing).
 2) Create about 1,000 files (yes, files) each representing a subset of the
 data.

 The current attempt I am working on is something like this.
 1) Do a map whose output key indicates which of the 1,000 files it will go
 into and whose value is what I will want to stick into the file.
 2) Partition the data and use the body of mapPartition to open a file and
 save the data.

 My apologies, this is actually embedded in a bigger mess, so I won't post
 it.

 However, I get errors telling me that there is an IllegalArgumentException:
 Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the
 top of the stack.  This leads me to think that I have hit the limit or
 partition and/or block size.

 Perhaps this is not a good way to do it?

 I suppose I could run 1,000 passes over the data, each time collecting the
 output for one of my 1,000 final files, but that seems likely to be
 painfully slow to run.

 Am I missing something?

 Admittedly, this is an odd use case

 Thanks!

 Sincerely,
  Mike Albert

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  

Re: 2GB limit for partitions?

2015-02-03 Thread Mridul Muralidharan
That is fairly out of date (we used to run some of our jobs on it ... But
that is forked off 1.1 actually).

Regards
Mridul

On Tuesday, February 3, 2015, Imran Rashid iras...@cloudera.com wrote:

 Thanks for the explanations, makes sense.  For the record looks like this
 was worked on a while back (and maybe the work is even close to a
 solution?)

 https://issues.apache.org/jira/browse/SPARK-1476

 and perhaps an independent solution was worked on here?

 https://issues.apache.org/jira/browse/SPARK-1391


 On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin r...@databricks.com
 javascript:; wrote:

  cc dev list
 
 
  How are you saving the data? There are two relevant 2GB limits:
 
  1. Caching
 
  2. Shuffle
 
 
  For caching, a partition is turned into a single block.
 
  For shuffle, each map partition is partitioned into R blocks, where R =
  number of reduce tasks. It is unlikely a shuffle block  2G, although it
  can still happen.
 
  I think the 2nd problem is easier to fix than the 1st, because we can
  handle that in the network transport layer. It'd require us to divide the
  transfer of a very large block into multiple smaller blocks.
 
 
 
  On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com
 javascript:; wrote:
 
  Michael,
 
  you are right, there is definitely some limit at 2GB.  Here is a trivial
  example to demonstrate it:
 
  import org.apache.spark.storage.StorageLevel
  val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new
  Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
  d.count()
 
  It gives the same error you are observing.  I was under the same
  impression as Sean about the limits only being on blocks, not
 partitions --
  but clearly that isn't the case here.
 
  I don't know the whole story yet, but I just wanted to at least let you
  know you aren't crazy :)
  At the very least this suggests that you might need to make smaller
  partitions for now.
 
  Imran
 
 
  On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert 
  m_albert...@yahoo.com.invalid wrote:
 
  Greetings!
 
  Thanks for the response.
 
  Below is an example of the exception I saw.
  I'd rather not post code at the moment, so I realize it is completely
  unreasonable to ask for a diagnosis.
  However, I will say that adding a partitionBy() was the last change
  before this error was created.
 
 
  Thanks for your time and any thoughts you might have.
 
  Sincerely,
   Mike
 
 
 
  Exception in thread main org.apache.spark.SparkException: Job aborted
  due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
  failure: Lost task 4.3 in stage 5.0 (TID 6012,
  ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
  java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
  at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
  at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
  at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
  at
 
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
  at
 
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
  at
 
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
  at
 
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
  at
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at
 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  at
  scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
  at
 
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
 
 
--
   *From:* Sean Owen so...@cloudera.com javascript:;
  *To:* Michael Albert m_albert...@yahoo.com javascript:;
  *Cc:* user@spark.apache.org javascript:; user@spark.apache.org
 javascript:;
  *Sent:* Monday, February 2, 2015 10:13 PM
  *Subject:* Re: 2GB limit for partitions?
 
  The limit is on blocks, not partitions. Partitions have many blocks.
 
  It sounds like you are creating very large values in memory, but I'm
  not sure given your description. You will run into problems if a
  single object is more than 2GB, of course. More of the stack trace
  might show what is mapping that much memory.
 
  If you simply want data into 1000 files it's a lot simpler. Just
  repartition into 1000 partitions and save the data. If you need more
  control over what goes into which partition, use a Partitioner, yes.
 
 
 
  On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
  m_albert...@yahoo.com.invalid wrote:
   Greetings!
  
   SPARK-1476 says that there is a 2G limit for blocks

2GB limit for partitions?

2015-02-02 Thread Michael Albert
Greetings!
SPARK-1476 says that there is a 2G limit for blocks.Is this the same as a 2G 
limit for partitions (or approximately so?)?

What I had been attempting to do is the following.1) Start with a moderately 
large data set (currently about 100GB, but growing).2) Create about 1,000 files 
(yes, files) each representing a subset of the data.
The current attempt I am working on is something like this.1) Do a map whose 
output key indicates which of the 1,000 files it will go into and whose value 
is what I will want to stick into the file.2) Partition the data and use the 
body of mapPartition to open a file and save the data.
My apologies, this is actually embedded in a bigger mess, so I won't post it.
However, I get errors telling me that there is an IllegalArgumentException: 
Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the top 
of the stack.  This leads me to think that I have hit the limit or partition 
and/or block size.
Perhaps this is not a good way to do it?
I suppose I could run 1,000 passes over the data, each time collecting the 
output for one of my 1,000 final files, but that seems likely to be painfully 
slow to run.
Am I missing something?
Admittedly, this is an odd use case
Thanks!
Sincerely, Mike Albert

Re: 2GB limit for partitions?

2015-02-02 Thread Sean Owen
The limit is on blocks, not partitions. Partitions have many blocks.

It sounds like you are creating very large values in memory, but I'm
not sure given your description. You will run into problems if a
single object is more than 2GB, of course. More of the stack trace
might show what is mapping that much memory.

If you simply want data into 1000 files it's a lot simpler. Just
repartition into 1000 partitions and save the data. If you need more
control over what goes into which partition, use a Partitioner, yes.

On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
m_albert...@yahoo.com.invalid wrote:
 Greetings!

 SPARK-1476 says that there is a 2G limit for blocks.
 Is this the same as a 2G limit for partitions (or approximately so?)?


 What I had been attempting to do is the following.
 1) Start with a moderately large data set (currently about 100GB, but
 growing).
 2) Create about 1,000 files (yes, files) each representing a subset of the
 data.

 The current attempt I am working on is something like this.
 1) Do a map whose output key indicates which of the 1,000 files it will go
 into and whose value is what I will want to stick into the file.
 2) Partition the data and use the body of mapPartition to open a file and
 save the data.

 My apologies, this is actually embedded in a bigger mess, so I won't post
 it.

 However, I get errors telling me that there is an IllegalArgumentException:
 Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the
 top of the stack.  This leads me to think that I have hit the limit or
 partition and/or block size.

 Perhaps this is not a good way to do it?

 I suppose I could run 1,000 passes over the data, each time collecting the
 output for one of my 1,000 final files, but that seems likely to be
 painfully slow to run.

 Am I missing something?

 Admittedly, this is an odd use case

 Thanks!

 Sincerely,
  Mike Albert

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org