[jira] [Comment Edited] (SPARK-19659) Fetch big blocks to disk when shuffle-read

2017-04-11 Thread jin xing (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964587#comment-15964587
 ] 

jin xing edited comment on SPARK-19659 at 4/11/17 4:13 PM:
---

[~cloud_fan]
Thanks a lot for taking look into this and sorry for late reply.

My proposal is as below:
1. Track average size and also the outliers(which are larger than 2*avgSize) in 
MapStatus;
2. Add *bytesShuffleToMemory*, which tracks the size of remote blocks shuffled 
to memory;
3. Add *spark.reducer.maxBytesShuffleToMemory*, when bytesShuffleToMemory is 
above this configuration, blocks will shuffle to disk;

*bytesShuffleToMemory* is increased when send fetch request(note that at this 
point, remote blocks maybe not fetched into memory yet, but we add the max 
memory to be used in the fetch request) and get decreased when the ByteBuf is 
released.

*spark.reducer.maxBytesShuffleToMemory* is the max memory to be used for 
fetching remote blocks across all the ShuffleBlockFetcherIterators(there maybe 
multiple shuffle-read happening at the same time). When memory usage(indicated 
by *bytesShuffleToMemory*) is above *spark.reducer.maxBytesShuffleToMemory*, 
shuffle remote blocks to disk instead of memory.



was (Author: jinxing6...@126.com):
[~cloud_fan]
Thanks a lot for taking look into this and sorry for late reply.

My proposal is as below:
1. Track average size and also the outliers(which are larger than 2*avgSize) in 
MapStatus;
2. Add *bytesShuffleToMemory*, which tracks the size of remote blocks shuffled 
to memory;
3. Add *spark.reducer.maxBytesShuffleToMemory*, when bytesShuffleToMemory is 
above this configuration, blocks will shuffle to disk;

*bytesShuffleToMemory* is increased when send fetch request(note that at this 
point, remote blocks maybe not fetched into memory yet, but we add the max 
memory to be used in the fetch request) and get decreased when the ByteBuf is 
released.

*spark.reducer.maxBytesShuffleToMemory* is the max memory to be used for 
fetching remote blocks across all the *ShuffleBlockFetcherIterator*s(there 
maybe multiple shuffle-read happening at the same time). When memory 
usage(indicated by *bytesShuffleToMemory*) is above 
*spark.reducer.maxBytesShuffleToMemory*, shuffle remote blocks to disk instead 
of memory.


> Fetch big blocks to disk when shuffle-read
> --
>
> Key: SPARK-19659
> URL: https://issues.apache.org/jira/browse/SPARK-19659
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.1.0
>Reporter: jin xing
> Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf
>
>
> Currently the whole block is fetched into memory(offheap by default) when 
> shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can 
> be large when skew situations. If OOM happens during shuffle read, job will 
> be killed and users will be notified to "Consider boosting 
> spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more 
> memory can resolve the OOM. However the approach is not perfectly suitable 
> for production environment, especially for data warehouse.
> Using Spark SQL as data engine in warehouse, users hope to have a unified 
> parameter(e.g. memory) but less resource wasted(resource is allocated but not 
> used),
> It's not always easy to predict skew situations, when happen, it make sense 
> to fetch remote blocks to disk for shuffle-read, rather than
> kill the job because of OOM. This approach is mentioned during the discussion 
> in SPARK-3019, by [~sandyr] and [~mridulm80]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19659) Fetch big blocks to disk when shuffle-read

2017-04-11 Thread jin xing (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964587#comment-15964587
 ] 

jin xing edited comment on SPARK-19659 at 4/11/17 4:11 PM:
---

[~cloud_fan]
Thanks a lot for taking look into this and sorry for late reply.

My proposal is as below:
1. Track average size and also the outliers(which are larger than 2*avgSize) in 
MapStatus;
2. Add *bytesShuffleToMemory*, which tracks the size of remote blocks shuffled 
to memory;
3. Add *spark.reducer.maxBytesShuffleToMemory*, when bytesShuffleToMemory is 
above this configuration, blocks will shuffle to disk;

*bytesShuffleToMemory* is increased when send fetch request(note that at this 
point, remote blocks maybe not fetched into memory yet, but we add the max 
memory to be used in the fetch request) and get decreased when the ByteBuf is 
released.

*spark.reducer.maxBytesShuffleToMemory* is the max memory to be used for 
fetching remote blocks across all the *ShuffleBlockFetcherIterator*s(there 
maybe multiple shuffle-read happening at the same time). When memory 
usage(indicated by *bytesShuffleToMemory*) is above 
*spark.reducer.maxBytesShuffleToMemory*, shuffle remote blocks to disk instead 
of memory.



was (Author: jinxing6...@126.com):
[~cloud_fan]
Thanks a lot for taking look into this and sorry for late reply.

My proposal is as below:
1. Track average size and also the outliers(which are larger than 2*avgSize) in 
MapStatus;
2. Add *bytesShuffleToMemory*, which tracks the size of remote blocks shuffled 
to memory;
3. Add *spark.reducer.maxBytesShuffleToMemory*, when bytesShuffleToMemory is 
above this configuration, blocks will shuffle to disk;

*bytesShuffleToMemory* is increased when send fetch request(note that at this 
point, remote blocks maybe not fetched into memory yet, but we add the max 
memory to be used in the fetch request) and get decreased when the ByteBuf is 
released.

*spark.reducer.maxBytesShuffleToMemory* is the max memory to be used for 
fetching remote blocks across all the ShuffleBlockFetcherIterators(there maybe 
multiple shuffle-read happening at the same time). When memory usage(indicated 
by bytesShuffleToMemory) is above spark.reducer.maxBytesShuffleToMemory, 
shuffle remote blocks to disk instead of memory.


> Fetch big blocks to disk when shuffle-read
> --
>
> Key: SPARK-19659
> URL: https://issues.apache.org/jira/browse/SPARK-19659
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.1.0
>Reporter: jin xing
> Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf
>
>
> Currently the whole block is fetched into memory(offheap by default) when 
> shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can 
> be large when skew situations. If OOM happens during shuffle read, job will 
> be killed and users will be notified to "Consider boosting 
> spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more 
> memory can resolve the OOM. However the approach is not perfectly suitable 
> for production environment, especially for data warehouse.
> Using Spark SQL as data engine in warehouse, users hope to have a unified 
> parameter(e.g. memory) but less resource wasted(resource is allocated but not 
> used),
> It's not always easy to predict skew situations, when happen, it make sense 
> to fetch remote blocks to disk for shuffle-read, rather than
> kill the job because of OOM. This approach is mentioned during the discussion 
> in SPARK-3019, by [~sandyr] and [~mridulm80]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19659) Fetch big blocks to disk when shuffle-read

2017-04-11 Thread jin xing (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964587#comment-15964587
 ] 

jin xing edited comment on SPARK-19659 at 4/11/17 4:11 PM:
---

[~cloud_fan]
Thanks a lot for taking look into this and sorry for late reply.

My proposal is as below:
1. Track average size and also the outliers(which are larger than 2*avgSize) in 
MapStatus;
2. Add *bytesShuffleToMemory*, which tracks the size of remote blocks shuffled 
to memory;
3. Add *spark.reducer.maxBytesShuffleToMemory*, when bytesShuffleToMemory is 
above this configuration, blocks will shuffle to disk;

*bytesShuffleToMemory* is increased when send fetch request(note that at this 
point, remote blocks maybe not fetched into memory yet, but we add the max 
memory to be used in the fetch request) and get decreased when the ByteBuf is 
released.

*spark.reducer.maxBytesShuffleToMemory* is the max memory to be used for 
fetching remote blocks across all the ShuffleBlockFetcherIterators(there maybe 
multiple shuffle-read happening at the same time). When memory usage(indicated 
by bytesShuffleToMemory) is above spark.reducer.maxBytesShuffleToMemory, 
shuffle remote blocks to disk instead of memory.



was (Author: jinxing6...@126.com):
[~cloud_fan]
Thanks a lot for taking look into this and sorry for late reply.

My proposal is as below:
1. Track average size and also the outliers(which are larger than 2*avgSize) in 
MapStatus;
2. Add bytesShuffleToMemory, which tracks the size of remote blocks shuffled to 
memory;
3. Add spark.reducer.maxBytesShuffleToMemory, when bytesShuffleToMemory is 
above this configuration, blocks will shuffle to disk;

bytesShuffleToMemory is increased when send fetch request(note that at this 
point, remote blocks maybe not fetched into memory yet, but we add the max 
memory to be used in the fetch request) and get decreased when the ByteBuf is 
released.

spark.reducer.maxBytesShuffleToMemory is the max memory to be used for fetching 
remote blocks across all the ShuffleBlockFetcherIterators(there maybe multiple 
shuffle-read happening at the same time). When memory usage(indicated by 
bytesShuffleToMemory) is above spark.reducer.maxBytesShuffleToMemory, shuffle 
remote blocks to disk instead of memory.


> Fetch big blocks to disk when shuffle-read
> --
>
> Key: SPARK-19659
> URL: https://issues.apache.org/jira/browse/SPARK-19659
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.1.0
>Reporter: jin xing
> Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf
>
>
> Currently the whole block is fetched into memory(offheap by default) when 
> shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can 
> be large when skew situations. If OOM happens during shuffle read, job will 
> be killed and users will be notified to "Consider boosting 
> spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more 
> memory can resolve the OOM. However the approach is not perfectly suitable 
> for production environment, especially for data warehouse.
> Using Spark SQL as data engine in warehouse, users hope to have a unified 
> parameter(e.g. memory) but less resource wasted(resource is allocated but not 
> used),
> It's not always easy to predict skew situations, when happen, it make sense 
> to fetch remote blocks to disk for shuffle-read, rather than
> kill the job because of OOM. This approach is mentioned during the discussion 
> in SPARK-3019, by [~sandyr] and [~mridulm80]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19659) Fetch big blocks to disk when shuffle-read

2017-03-19 Thread jin xing (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15891999#comment-15891999
 ] 

jin xing edited comment on SPARK-19659 at 3/19/17 3:16 PM:
---

[~rxin] [~davies] [~andrewor14] [~joshrosen] [~kayousterhout]

I've uploaded a design doc, It's great if you could help comment on it when 
have time :) Actually, my data warehouse suffers a lot on this issue. Please 
take a look if possible. Sorry if this is bothering.


was (Author: jinxing6...@126.com):
[~rxin] [~davies] [~andrewor14] [~joshrosen]

I've uploaded a design doc, It's great if you could help comment on it when 
have time :) Actually, my data warehouse suffers a lot on this issue. Please 
take a look if possible. Sorry if this is bothering.

> Fetch big blocks to disk when shuffle-read
> --
>
> Key: SPARK-19659
> URL: https://issues.apache.org/jira/browse/SPARK-19659
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.1.0
>Reporter: jin xing
> Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf
>
>
> Currently the whole block is fetched into memory(offheap by default) when 
> shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can 
> be large when skew situations. If OOM happens during shuffle read, job will 
> be killed and users will be notified to "Consider boosting 
> spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more 
> memory can resolve the OOM. However the approach is not perfectly suitable 
> for production environment, especially for data warehouse.
> Using Spark SQL as data engine in warehouse, users hope to have a unified 
> parameter(e.g. memory) but less resource wasted(resource is allocated but not 
> used),
> It's not always easy to predict skew situations, when happen, it make sense 
> to fetch remote blocks to disk for shuffle-read, rather than
> kill the job because of OOM. This approach is mentioned during the discussion 
> in SPARK-3019, by [~sandyr] and [~mridulm80]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19659) Fetch big blocks to disk when shuffle-read

2017-03-07 Thread jin xing (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900741#comment-15900741
 ] 

jin xing edited comment on SPARK-19659 at 3/8/17 5:47 AM:
--

[~irashid] [~rxin]
I uploaded SPARK-19659-design-v2.pdf, please take a look.

Yes, only outliers should be tracked and MapStatus should stay compact. It is a 
good idea to track all the sizes that are more than 2x the average. “2x” is a 
default value, it will be configurable by parameter.

We need to collect metrics for stability and performance:

- For stability:
 1) Blocks which have size between the average and 2x the average are 
underestimated. So there is a risk that those blocks cannot fit in memory. In 
this approach, driver should calculate the sum of underestimated sizes. 
 2) Show block sizes’ distribution of MapStatus, the distribution will 
between(0~100K, 100K~1M, 1~10M, 10~100M, 100M~1G, 1G~10G), which will help a 
lot for debugging(e.g. find skew situations).
* For performance:
 1) Peak memory(off heap or on heap) used for fetching blocks should be tracked.
 2) Fetching blocks to disk will cause performance degradation. Executor should 
calculate the size of blocks shuffled to disk and the time cost. 

Yes, Imran, I will definitely break the proposal to smaller pieces(jiras and 
prs). The metrics part should be done first before other parts proposed here.

What's more, why not make 2000 as a configurable parameter, thus user can chose 
the track the accurate sizes of blocks to some level.




was (Author: jinxing6...@126.com):
[~irashid] [~rxin]
I uploaded SPARK-19659-design-v2.pdf, please take a look.

Yes, only outliers should be tracked and MapStatus should stay compact. It is a 
good idea to track all the sizes that are more than 2x the average. “2x” is a 
default value, it will be configurable by parameter.

We need to collect metrics for stability and performance:

- For stability:
 1) Blocks which have size between the average and 2x the average are 
underestimated. So there is a risk that those blocks cannot fit in memory. In 
this approach, driver should calculate the sum of underestimated sizes. 
 2) Show block sizes’ distribution of MapStatus, the distribution will 
between(0~100K, 100K~1M, 1~10M, 10~100M, 100M~1G, 1G~10G), which will help a 
lot for debugging(e.g. find skew situations).
* For performance:
 1) Memory(off heap or on heap) used for fetching blocks should be tracked.
 2) Fetching blocks to disk will cause performance degradation. Executor should 
calculate the size of blocks shuffled to disk and the time cost. 

Yes, Imran, I will definitely break the proposal to smaller pieces(jiras and 
prs). The metrics part should be done first before other parts proposed here.

What's more, why not make 2000 as a configurable parameter, thus user can chose 
the track the accurate sizes of blocks to some level.



> Fetch big blocks to disk when shuffle-read
> --
>
> Key: SPARK-19659
> URL: https://issues.apache.org/jira/browse/SPARK-19659
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.1.0
>Reporter: jin xing
> Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf
>
>
> Currently the whole block is fetched into memory(offheap by default) when 
> shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can 
> be large when skew situations. If OOM happens during shuffle read, job will 
> be killed and users will be notified to "Consider boosting 
> spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more 
> memory can resolve the OOM. However the approach is not perfectly suitable 
> for production environment, especially for data warehouse.
> Using Spark SQL as data engine in warehouse, users hope to have a unified 
> parameter(e.g. memory) but less resource wasted(resource is allocated but not 
> used),
> It's not always easy to predict skew situations, when happen, it make sense 
> to fetch remote blocks to disk for shuffle-read, rather than
> kill the job because of OOM. This approach is mentioned during the discussion 
> in SPARK-3019, by [~sandyr] and [~mridulm80]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19659) Fetch big blocks to disk when shuffle-read

2017-03-03 Thread jin xing (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895411#comment-15895411
 ] 

jin xing edited comment on SPARK-19659 at 3/4/17 2:11 AM:
--

[~rxin]
Thanks a lot for comment.
Tracking average size and also the outliers is a good idea.
But there can be multiple huge blocks creating too much pressure(e.g. there are 
10% blocks much bigger than they other 90%) and it is a little bit hard to 
decide how many outliers we should track. 
If we track too many outliers, *MapStatus* can cost too much memory.
I think the benefit of tracking the max for each N/2000 consecutive blocks is 
that we can avoid having *MapStatus* cost too much memory(at most around 
2000Bytes after compressing) and we can have all outliers under control. Do you 
think it's worth trying?


was (Author: jinxing6...@126.com):
[~rxin]
Thanks a lot for comment.
Tracking average size and also the outliers is a good idea.
But there can be multiple huge blocks creating too much pressure(e.g. there are 
10% blocks much bigger than they other 90%) and it is a little bit hard to 
decide how many outliers we should track. 
If we track too many outliers, *MapStatus* can cost too much memory.
I think the benefit of tracking the max for each N/2000 consecutive blocks is 
that we can avoid having *MapStatus* cost too much memory(at most around 
2000Bytes).

> Fetch big blocks to disk when shuffle-read
> --
>
> Key: SPARK-19659
> URL: https://issues.apache.org/jira/browse/SPARK-19659
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.1.0
>Reporter: jin xing
> Attachments: SPARK-19659-design-v1.pdf
>
>
> Currently the whole block is fetched into memory(offheap by default) when 
> shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can 
> be large when skew situations. If OOM happens during shuffle read, job will 
> be killed and users will be notified to "Consider boosting 
> spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more 
> memory can resolve the OOM. However the approach is not perfectly suitable 
> for production environment, especially for data warehouse.
> Using Spark SQL as data engine in warehouse, users hope to have a unified 
> parameter(e.g. memory) but less resource wasted(resource is allocated but not 
> used),
> It's not always easy to predict skew situations, when happen, it make sense 
> to fetch remote blocks to disk for shuffle-read, rather than
> kill the job because of OOM. This approach is mentioned during the discussion 
> in SPARK-3019, by [~sandyr] and [~mridulm80]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19659) Fetch big blocks to disk when shuffle-read

2017-03-02 Thread jin xing (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15884789#comment-15884789
 ] 

jin xing edited comment on SPARK-19659 at 3/2/17 10:21 AM:
---

[~irashid]

I've uploaded a design doc, please take a look and give your comments when you 
have time : )

Thanks a lot for your previous comments. They are very helpful. It's great if 
you can help comment more and I can continue working on this :)


was (Author: jinxing6...@126.com):
[~irashid]

I've uploaded a design doc, please take a look and give your comments when you 
have time : )

> Fetch big blocks to disk when shuffle-read
> --
>
> Key: SPARK-19659
> URL: https://issues.apache.org/jira/browse/SPARK-19659
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.1.0
>Reporter: jin xing
> Attachments: SPARK-19659-design-v1.pdf
>
>
> Currently the whole block is fetched into memory(offheap by default) when 
> shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can 
> be large when skew situations. If OOM happens during shuffle read, job will 
> be killed and users will be notified to "Consider boosting 
> spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more 
> memory can resolve the OOM. However the approach is not perfectly suitable 
> for production environment, especially for data warehouse.
> Using Spark SQL as data engine in warehouse, users hope to have a unified 
> parameter(e.g. memory) but less resource wasted(resource is allocated but not 
> used),
> It's not always easy to predict skew situations, when happen, it make sense 
> to fetch remote blocks to disk for shuffle-read, rather than
> kill the job because of OOM. This approach is mentioned during the discussion 
> in SPARK-3019, by [~sandyr] and [~mridulm80]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org