[ 
https://issues.apache.org/jira/browse/TEZ-3209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15383824#comment-15383824
 ] 

Siddharth Seth commented on TEZ-3209:
-------------------------------------

[~mingma] - apologies for the delay in responding, again :| .

Most of the functionality that's pulled out seems to be around scheduling.
I believe the new VertexManager that is being written is primarily targeted 
towards Unordered Data? Consumers can potentially complete before all produces 
have generated data (or even started). e.g. the case where a single partition 
from different sources is going to different destination tasks. The moment one 
source completes - the corresponding destination is also ready to start and 
complete. At some point, would we want to use a different slow-start / 
scheduling policy in this VertexManager.
Should a different strategy be employed to determine when to trigger 
parallelism determination in this case?

Use the current Shuffle config parameter names, or define new ones for the new 
VertexManager. This shouldn't really get in the way of the refactor if the 
current concepts are retained. My vote would be for separate config parameter 
names.

If the plan is to eventually move to a different set of scheduling strategies - 
I suspect a lot of the code in ShuffleVMBase will go away.

> Support for fair custom data routing
> ------------------------------------
>
>                 Key: TEZ-3209
>                 URL: https://issues.apache.org/jira/browse/TEZ-3209
>             Project: Apache Tez
>          Issue Type: New Feature
>            Reporter: Ming Ma
>            Assignee: Ming Ma
>         Attachments: TEZ-3209.patch, Tez-based demuxer for highly skewed 
> category data.pdf
>
>
> This is based on offline discussion with [~gopalv], [~hitesh], 
> [~jrottinghuis] and [~lohit] w.r.t. the support for efficient processing of 
> highly skewed unordered partitioned mapper output. Our use case is to demux 
> highly skewed unordered category data partitioned by category name. Gopal and 
> Hitesh mentioned dynamically shuffled join scenario.
> One option we discussed is to leverage auto-parallelism feature with upfront 
> over-partitioning. That means possible overhead to support large number 
> partitions and unnecessary data movement as each reducer needs to get data 
> from all mappers. 
> Another alternative is to use custom {{DataMovementType}} which doesn't 
> require each reducer to fetch data from all mappers. In that way, a large 
> partition will be processed by several reducers, each of which will fetch 
> data from a portion of mappers.
> For example, say there are 100 mappers each of which has 10 partitions (P1, 
> ..., P10). Each mapper generates 100MB for its P10 and 1MB for each of its 
> (P1, ... P9). The default SCATTER_GATHER routing means the reducer for P10 
> has to process 10GB of input and becomes the bottleneck of the job. With the 
> fair custom data routing, The P10 belonging to the first 10 mappers will be 
> processed by one reducer with 1GB input data. The P10 belonging to the second 
> 10 mappers will be processed by another reducer, etc.
> For further optimization, we can allocate the reducer on the same nodes as 
> the mappers that it fetches data from.
> To support this, we need TEZ-3206 as well as customized data routing based on 
> {{VertexManagerPlugin}} and {{EdgeManagerPluginOnDemand}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to