[ 
https://issues.apache.org/jira/browse/SPARK-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13967854#comment-13967854
 ] 

Mridul Muralidharan edited comment on SPARK-1476 at 4/13/14 2:45 PM:
---------------------------------------------------------------------

There are multiple issues at play here :

a) If a block goes beyond 2G, everything fails - this is the case for shuffle, 
cached and 'normal' blocks.
Irrespective of storage level and/or other config.
In a lot of cases, particularly when data generated 'increases' after a 
map/flatMap/etc, the user has no control over the size increase in the output 
block for a given input block (think iterations over datasets).

b) Increasing number of partitions is not always an option (for the subset of 
usecases where it can be done) :
1) It has an impact on number of intermediate files created while doing a 
shuffle. (ulimit constraints, IO performance issues, etc).
2) It does not help when there is skew anyway.

c) Compression codec, serializer used, etc have an impact.

d) 2G is extremely low limit to have in modern hardware : and this is 
particularly a severe limitation when we have nodes running on 32G to 64G ram 
and TB's of disk space available for spark.


To address specific points raised above :

A) [~pwendell] Mapreduce jobs dont fail in case the block size of files 
increases - it might be inefficient, it still runs (not that I know of any case 
where it does actually, but theoretically I guess it can become inefficient).
So analogy does not apply.
Also to add, 2G is not really an unlimited increase in block size - and in MR, 
output of a map can easily go a couple of orders above 2G : whether it is 
followed by a reduce or not.

B) [~matei] In the specific cases it was failing, the users were not caching 
the data but directly going to shuffle.
There was no skew from what I see : just the data size per key is high; and 
there are a lot of keys too btw (as iterations increase and nnz increases).
Note that it was an impl detail that it was not being cached - it could have 
been too.
Additionally, compression and/or serialization also apply implicitly in this 
case, since it was impacting shuffle - the 2G limit was observed at both the 
map and reduce side (in two different jobs).


In general, our effort is to make spark as a drop in replacement for most 
usecases which are currently being done via MR/Pig/etc.
Limitations of this sort make it difficult to position spark as a credible 
alternative.


Current approach we are exploring is to remove all direct references to 
ByteBuffer from spark (except for ConnectionManager, etc parts); and rely on a 
BlockData or similar datastructure which encapsulate the data corresponding to 
a block. By default, a single ByteBuffer should suffice but in case it does 
not, the class will automatically take care of splitting across blocks.
Similarly, all references to byte array backed streams will need to be replaced 
with a wrapper stream which multiplexes over byte array streams.
The performance impact for all 'normal' usecases should be the minimal, while 
allowing for spark to be used in cases where 2G limit is being hit.

The only unknown here is tachyon integration : where the interface is a 
ByteBuffer - and I am not knowledgable enough to comment on what the issues 
there would be.


was (Author: mridulm80):

There are multiple issues at play here :

a) If a block goes beyond 2G, everything fails - this is the case for shuffle, 
cached and 'normal' blocks.
Irrespective of storage level and/or other config.
In a lot of cases, particularly when data generated 'increases' after a 
map/flatMap/etc, the user has no control over the size increase in the output 
block for a given input block (think iterations over datasets).

b) Increasing number of partitions is not always an option (for the subset of 
usecases where it can be done) :
1) It has an impact on number of intermediate files created while doing a 
shuffle. (ulimit constraints, IO performance issues, etc).
2) It does not help when there is skew anyway.

c) Compression codec, serializer used, etc have an impact.

d) 2G is extremely low limit to have in modern hardware : and this is 
practically a severe limitation when we have nodes running on 32G to 64G ram 
and TB's of disk space available for spark.


To address specific points raised above :

A) [~pwendell] Mapreduce jobs dont fail in case the block size of files 
increases - it might be inefficient, it still runs (not that I know of any case 
where it does actually, but theoretically I guess it can).
So analogy does not apply.
To add, 2G is not really an unlimited increase in block size - and in MR, 
output of a map can easily go a couple of orders above 2G.

B) [~matei] In the specific cases it was failing, the users were not caching 
the data but directly going to shuffle.
There was no skew from what we see : just the data size per key is high; and 
there are a lot of keys too btw.
Note that it was an impl detail that it was not being cached - it could have 
been too.
Additionally, compression and/or serialization apply implicitly since in this 
case, it was impacting shuffle - the 2G limit was observed at both the map and 
reduce side (in two different instances).


In general, our effort is to make spark as a drop in replacement for most 
usecases which are currently being done via MR/Pig/etc.
Limitations of this sort make it difficult to position spark as a credible 
alternative.


Current approach we are exploring is to remove all direct references to 
ByteBuffer from spark (except for ConnectionManager, etc parts); and rely on a 
BlockData or similar datastructure which encapsulate the data corresponding to 
a block. By default, a single ByteBuffer should suffice but in case it does 
not, the class will automatically take care of splitting across blocks.
Similarly, all references to byte array backed streams will need to be replaced 
with a wrapper stream which multiplexes over byte array streams.
The performance impact for all 'normal' usecases should be the minimal, while 
allowing for spark to be used in cases where 2G limit is being hit.

The only unknown here is tachyon integration : where the interface is a 
ByteBuffer - and I am not knowledgable enough to comment on what the issues 
there would be.

> 2GB limit in spark for blocks
> -----------------------------
>
>                 Key: SPARK-1476
>                 URL: https://issues.apache.org/jira/browse/SPARK-1476
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>         Environment: all
>            Reporter: Mridul Muralidharan
>            Priority: Critical
>             Fix For: 1.1.0
>
>
> The underlying abstraction for blocks in spark is a ByteBuffer : which limits 
> the size of the block to 2GB.
> This has implication not just for managed blocks in use, but also for shuffle 
> blocks (memory mapped blocks are limited to 2gig, even though the api allows 
> for long), ser-deser via byte array backed outstreams (SPARK-1391), etc.
> This is a severe limitation for use of spark when used on non trivial 
> datasets.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to