Yingda Chen created TEZ-4018:
--------------------------------

             Summary: Allow conditional vertex in DAG execution
                 Key: TEZ-4018
                 URL: https://issues.apache.org/jira/browse/TEZ-4018
             Project: Apache Tez
          Issue Type: New Feature
            Reporter: Yingda Chen


A high-level description is provided here for now, will follow up with a proper 
design doc later.

We have encountered a few application scenarios for dynamic (logical) DAG here 
in our system. A typical one is for a distributed query to be able to 
dynamically choose among two execution paths, say among hash join and sorted 
merge join.

This can be solved by allowing TEZ to execute "conditional DAG". By that we 
mean that a DAG may have some conditional vertices: several conditional 
verteices can form a conditioanl group, and insided each group, only one will 
be chosen for execution at runtime.

To allow decision at runtime, each conditional group will be associated with a 
“control vertex”. A control vertex can be a pure virtual component that lives 
only on AM with its VertexImpl and VetexManager, but has no associated tasks 
(DoP = 0). It can be also be extended to have physical tasks associated with 
it, in the case where intensitve compuation may be required to make a control 
decision.

A upstream vertex (that produces input data to verteices in downstream 
conditioanl group) will be connected to the control vertex, as well as all 
conditional vertices in the group at the same time. This allows its VMEs and 
DMEs to be sent to all of them. Upon receipt of (enough) VMEs, the control 
vertex would be able to gather enough runtime statistics and determine (by 
user-supplied logic) which downstream vertex should be scheduled (and which 
should be skipped). Such decision will effectively “uncondition” the DAG and 
determines the sub-graph that is actually being executed.

Such conditional verteices can be useful to enable scenarios such as 
conditional join, where a query can choose between hash join and sorted merge 
join at runtime, base on the precious runtime statistics (e.g., output size) of 
upstream mapper.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to