[ 
https://issues.apache.org/jira/browse/DRILL-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16261772#comment-16261772
 ] 

weijie.tong commented on DRILL-5975:
------------------------------------

Well, I could not understand how to schedule MinorFragments as a Task of YARN. 
Anyway, we can't let our scheduler to depend on Yarn, some  companies like 
Alibaba have their own scheduler systems. Says, the application has a scheduler 
to schedule their application-level tasks, the tasks themselves also need to be 
scheduled as Tasks of Linux threads.  (some digression, we now depend on ZK too 
much to complete some works ,once ZK died, the system also died).

Maybe I have not describe the design clearly. Your first difference does not 
exist. The task dependency problem already  solved by Drill now. I will not 
changed that logic. Every Foreman will have its own FragmentScheduler. The 
FragmentScheduler holds the PlanFragments (different from prior implementation, 
the PlanFragment will not have a explicit hosts initially, the hosts will be 
assigned by the scheduler at runtime.). The `FragmentScheduler` do two things:
*  schedule the leaf fragments (i.e. the Scan nodes, to your examples, A,B)  to 
run actively. 
*  accept the first RecordBatch ready event passively, schedule the next 
MajorFragment's MinorFragments (to your example, the C )to run. The running 
process is the same as before, only one difference: the `receiver` 
MinorFragments need to let the `sender` know its destination hosts (through the 
graph's number 4 step).  Other things left will behave the same as before, the 
receiver of `upstream` MinorFragment (i.e. C) will decide whether to begin the 
probe side after all the build side's data have arrived.

Your second question about the system property. I can not answer well. The 
other changing beside the scheduler is the RecordBatchManager. It just acts as 
a buffer stage between the sender MinorFragments and receiver MinorFragments. 
This design stays in most of the system. Flink is what you already known. Spark 
is the BlockManager, which is master-slave mode ,not easy to applied to Drill. 
Both systems support the stream and batch models. While Drill now will not let 
the data stay in disk at the exchange stage, it let the batch stay in memory 
and flow through the network by a throttling strategy. This design seems to 
well at low response time and low concurrent queries(i.e interactive query).The 
bad thing is resource wasteful , a whole run time tree (someone called it 
memory-pipeline ) which is too fat to schedule (this maybe one reason we have a 
infinite thread pool). Now the new design seems to lost some response time 
performance, but I think it‘s a tradeoff. If the block nodes (sort ,join 
,aggregate) are slow and the scan nodes are quick,  the `memory-pipeline` 
model's advantage is not represented.

Without a scheduler, Drill is not sufficient to run long-running jobs, as it 
lacks fault-tolerance . We can make up this leverage this design step by step.

I think constrained resource is complement with the scheduler each other.


> Resource utilization
> --------------------
>
>                 Key: DRILL-5975
>                 URL: https://issues.apache.org/jira/browse/DRILL-5975
>             Project: Apache Drill
>          Issue Type: New Feature
>    Affects Versions: 2.0.0
>            Reporter: weijie.tong
>            Assignee: weijie.tong
>
> h1. Motivation
> Now the resource utilization radio of Drill's cluster is not too good. Most 
> of the cluster resource is wasted. We can not afford too much concurrent 
> queries. Once the system accepted more queries with a not high cpu load, the 
> query which originally is very quick will become slower and slower.
> The reason is Drill does not supply a scheduler . It just assume all the 
> nodes have enough calculation resource. Once a query comes, it will schedule 
> the related fragments to random nodes not caring about the node's load. Some 
> nodes will suffer more cpu context switch to satisfy the coming query. The 
> profound causes to this is that the runtime minor fragments construct a 
> runtime tree whose nodes spread different drillbits. The runtime tree is a 
> memory pipeline that is all the nodes will stay alone the whole lifecycle of 
> a query by sending out data to upper nodes successively, even though some 
> node could run quickly and quit immediately.What's more the runtime tree is 
> constructed before actual running. The schedule target to Drill will become 
> the whole runtime tree nodes.
> h1. Design
> It will be hard to schedule the runtime tree nodes as a whole. So I try to 
> solve this by breaking the runtime cascade nodes. The graph below describes 
> the initial design. 
> !https://raw.githubusercontent.com/wiki/weijietong/drill/images/design.png!   
>  [graph 
> link|https://raw.githubusercontent.com/wiki/weijietong/drill/images/design.png]
> Every Drillbit instance will have a RecordBatchManager which will accept all 
> the RecordBatchs written by the senders of local different MinorFragments. 
> The RecordBatchManager will hold the RecordBatchs in memory firstly then disk 
> storage . Once the first RecordBatch of a MinorFragment sender of one query 
> occurs , it will notice the FragmentScheduler. The FragmentScheduler is 
> instanced by the Foreman.It holds the whole PlanFragment execution graph.It 
> will allocate a new corresponding FragmentExecutor to run the generated 
> RecordBatch. The allocated FragmentExecutor will then notify the 
> corresponding FragmentManager to indicate that I am ready to receive the 
> data. Then the FragmentManger will send out the RecordBatch one by one to the 
> corresponding FragmentExecutor's receiver like what the current Sender does 
> by throttling the data stream.
> What we can gain from this design is :
> a. The computation leaf node does not to wait for the consumer's speed to end 
> its life to release the resource.
> b. The sending data logic will be isolated from the computation nodes and 
> shared by different FragmentManagers.
> c. We can schedule the MajorFragments according to Drillbit's actual resource 
> capacity at runtime.
> d. Drill's pipeline data processing characteristic is also retained.
> h1. Plan
> This will be a large PR ,so I plan to divide it into some small ones.
> a. to implement the RecordManager.
> b. to implement a simple random FragmentScheduler and the whole event flow.
> c. to implement a primitive FragmentScheduler which may reference the Sparrow 
> project.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to