One question I have is the execution model when a query have multiple fragments. My understanding is operators within the same fragment would be executed in "pull" model, using an iterator approach by repeatedly calling next() over the input operators. The pull model will ensure that no operator will produce rows un-required for the downstream operator will not be produced. Across multiple major fragments, Drill more or less use a "push" model, where all the leaf major fragments will start immediately.
The "push" model also causes performance problem for a limit operator (say limit 100), when the query is broken into multiple fragments. If the query's operators are in a single fragment, the "pull" model will ensure operators are not producing unnecessary rows. However, once the query is distributed and executed on multiple major fragments, then the leaf major fragments would process much more than necessary rows, before the root major fragment tells the input fragment that it only requires 100 rows. That would mean wasted memory / CPU resources. One fix for the "limit" operator is that in query planner, we push the "limit n" across "exchange" operator, so that "limit" would be pushed down as much as possible. However, that implies quite complexity in query planner ( need check whether the intermediate operator requires order-ness, filtering, before we can safely push "limit" operator). (See https://issues.apache.org/jira/browse/DRILL-1457) I feel we probably need ask what's the main reason to adopt the current execution model across multiple fragments, its pros vs cons. On Thu, Oct 9, 2014 at 12:37 AM, Timothy Chen <[email protected]> wrote: > Hi Parth, > > Thanks for providing an update, this is really great to see more > design discussions on the list! > > The pipeline chains definitely makes lot of sense, and I still > remember discussions offline around this in the past. > > The global memory efficency seems like a scheduling problem, as the > delay of a chain only benefits if there are other chains in-flight, > and be at a chain level or at a query level. > > I don't have much to add yet, but love to see how we can start simple on > this. > > One paper that is relevant that I just started to read is from the > recent OSDI 14, will chime in more once I grasp it. > > https://www.usenix.org/conference/osdi14/technical-sessions/presentation/boutin > > Tim > > > On Wed, Oct 8, 2014 at 11:03 PM, Parth Chandra <[email protected]> > wrote: > > Hi everyone, > > > > Aman, Jinfeng and I had an initial offline discussion about memory > planning > > in Drill. I’m summarizing the discussion below and hoping to initiate a > > discussion around some of these ideas. > > > > > > Order of Execution > > ------------------------- > > Assertion: For memory management Order of Execution is a fundamental > issue. > > > > One of the problems with memory usage in the execution engine is that all > > operators start up simultaneously and start allocating memory even though > > the downstream operators may not be ready to consume their output. > > > > For example, in the plan below : > > > > Agg > > | > > HJ2 > > / \ > > HJ1 Scan3 > > / \ > > Scan1 Scan2 > > > > > > the scan operators all begin reading data simultaneously. In this case, > the > > Hash Joins are blocking operations and the output of Scan3 cannot be > > consumed until HJ1 is ready to emit its results. If, say, Scan2 is on the > > build side of the hash table for HJ1, then HJ1 will not emit any records > > until Scan2 completes its operation. If Scan3 starts immediately, it is > > consuming memory that could be utilized by Scan2. Instead, if we delay > the > > start of Scan3 until after HJ1 is ready to emit records, we can utilize > > memory more efficiently. > > > > To address this, we can think of the query plan in terms of pipeline > > chains, where a pipeline chain is a chain of operators terminated by a > > blocking operator. > > > > In the example, there would be three pipeline chains : > > PC1 : Scan1-HJ1-HJ2-Agg > > PC2 : Scan 2 > > PC3 : Scan 3 > > > > Now, we can see that we can start PC1 and PC2, but PC3 can be delayed > until > > PC2 is completed and PC1 has reached HJ2. > > > > One thing we need to consider is that multiple major fragments can be > part > > of a single pipeline chain. All these major fragments can begin execution > > if the pipeline chain is ready to begin execution. > > > > We need to think this one through, though. There are probably many > details > > to be hashed out, though one thing is certain: the planner has to provide > > some more information to the execution engine in terms of the ordering of > > the pipeline chains. In other words, implementing this needs work on both > > the planner and the execution engine. We also need to work out the > details > > of how the idea of a pipeline chain will be reconciled with the idea of > > major/minor fragments which are currently the units of execution. > > > > Fragment memory limit > > —---------------------------- > > We have implemented a simple method to limit the use of memory by a > > single fragment in 0.6 (it is disabled by default). This prevents a > single > > fragment from hogging too much memory while other fragments may be > starved. > > However the current implementation has some drawbacks: > > i) The fragment memory allocators have no idea of how much memory is > > really needed by the fragment. The fragment limit is therefore determined > > by dividing the available memory *equally* among the fragments. This is > not > > a fair method; a better choice would be to allocate fragment limits based > > on the relative needs of the executing fragments. > > ii) The idea of limiting memory use by a fragment is a little too > narrow, > > since the purpose is to allow many queries, not fragments, to be able to > > run together. The current mechanism favours queries that may have many > > fragments over queries with fewer fragments which may have equivalent > > memory needs. > > > > To address this, we need to assign memory limits per query instead of > > per fragment. In addition, we have some estimates at the query level for > > the amount of memory that the query may need. We should probably change > the > > memory limit implementation to use this estimate and assign limits > > proportionately. In addition, the limit should be calculated per query > (and > > assigned to all fragments of the same query). It might be even better if > we > > could estimate the memory requirement per fragment and use that as the > > limit. > > > > Again, some work needs to be done to figure out what data is > available > > that the allocators can use and what data can be calculated/estimated at > > the planning stage to allow the allocators to distribute memory fairly. > > > > > > All critiques and criticisms are welcome, we’re hoping to get a good > > discussion going around this. > > > > > > Parth >
