Hi Vlad,

Glad to see you are becoming an expert in the mechanics of data batch handling. 
This is a complex area that deserves the care and attention your are investing.

Drill's current behavior reflects the design decisions of Drill's original 
authors. Unfortunately, those authors are no longer available. (If you are out 
there, lurking, now would be a great time to help out Vlad by explaining the 
original design.) Failing that, we have to use our collective knowledge of the 
intended design. Plus, we should explore ways to improve the design, as you 
seem to be doing.

Drill has a complex memory model that works only if each operator ("record 
batch" in Drill's unfortunate terminology) takes ownership of each incoming 
record batch ("vector container" in Drill's terminology.) Recall that each 
operator has an operator-specific memory allocator with its own budget (though, 
at present, but budget numbers are completely artificial and nonsensical.) In 
addition, the minor fragment as a whole has a budget.

For the operator budget to work, the operator must take ownership of incoming 
batches, and give up ownership of outgoing batches. Why? Because doing so is 
the only way to track the memory that each operator uses in its 
operator-specific allocator. While this may not be the ideal design, it is how 
Drill works today.

If we move fully to the budget-based design, than this level of operator 
control will no longer be necessary, and will be an unnecessary complication. 
Under the budget model, only the minor fragment as a whole needs an allocator; 
each operator plays its part within the overall fragment budget. A planning 
step works out the memory budget for the query, the minor fragments and each 
operator. This is all explained in [1].

Under the budget model, each operator attempts to stay within its budget, 
spilling to disk as needed. The budget model works only if "single batch" 
operators (such as Project, Filter, etc.) are given sufficient memory to hold 
two batches. This, in turn, requires that we control the size of each batch as 
Padma and others are doing.

That said, today exchanges *might* be special. My understanding is that some 
can receive a single batch from the network and feed that single batch to 
multiple slices ("minor fragments") of the same operator. This happens in, say, 
a broadcast exchange.

You mention SV2 mode. In fact, SV2 mode should operate the same as "plain" 
batches: an SV2 is a single indirection vector on a single batch of data. 
Perhaps you meant "SV4 mode." Indeed, SV4 is special since an SV4 sits atop a 
large collection of batches and simulates a batch by picking out a collection 
of rows across the many batches. SV4 is used in the output of an in-memory sort 
(and perhaps other places.) There is no transfer of ownership in SV4 mode 
because the same batches will be used over and over until all data is 
delivered. It is the responsibility of the Sort operator to release the 
collection of batches once it has delivered all results (or the query fails.)


Enough for this response. I'll send additional responses for your other points.

The key concept to keep in mind is that the Drill memory system, as a whole, is 
quite complex. It can certainly be improved (as we are doing with the batch 
handling revisions.) But, we must consider the entire system when considering 
changes to any one part of the system. It is a complex topic; it is great that 
we have someone with your experience exploring our options.

Thanks,
- Paul

[1]  https://github.com/paul-rogers/drill/wiki/Batch-Handling-Upgrades


 

    On Sunday, April 29, 2018, 9:26:24 PM PDT, Vlad Rozov <[email protected]> 
wrote:  
 
 I did not mean that a pass-through operator should not take the 
ownership of a batch it processes. My question was whether they do so 
and if they do, when and how. As far as I can see in the 
ProjectorTemplate code, the transfer is not done in all cases and when 
Projector operates in sv2 mode, there is no transfer of the ownership. 
Additionally, when there is a transfer, it is done when the processing 
of the batch is almost complete. IMO, such behavior is counter intuitive 
and I would expect that if there is a transfer of the ownership, it is 
part of  RecordBatch.next(), meaning that once an operator gets a 
reference to a record batch, it owns it. At this point, an operator may 
consume content of the record batch and create a completely new record 
batch or it can modify the record batch and pass it to the next 
downstream operator.

The behavior above applies to an operator that consumes record batches 
from another operator. An input operator (scan or edge operator) is an 
operator that produces record batches from an external source (parquet 
file, hbase, kafka, etc). IMO, when such operators create record batches 
they should allocate memory using operator allocator compared to 
fragment allocator. If the memory is allocated using fragment allocator, 
there is no point changing ownership when batch construction is complete 
and the batch is passed to the next operator.

The same approach applies to senders and receivers. Senders gets batches 
from the upstream operators taking ownership of those batches and send 
data to receivers. Receivers get data from senders and reconstruct 
record batches. It is the business logic of senders and receivers and 
they may rely on other libraries (rpc and netty) or classes to handle 
serialization/de-serialization, buffering, acknowledgment, back-pressure 
or dealing with network. From other Drill operators point of view, 
senders and receivers are operators responsible for passing record 
batches from one drillbit to another.

Following your approach it is necessary to modify MergingReceiver as 
well. It also pulls batches from a queue (see 
MergingRecordBatch.getNext()), but instead of almost immediately passing 
it to a next operator as UnorderReceiver does, MergingReceiver creates a 
new record batch from those batches that it pulls from the queue. To be 
consistent with proposed changes to UnorderReceiver, it is necessary to 
change the ownership of batches that MergingReceiver pulls as well 
especially that MergingReciver may keep reference to the original batch 
much longer compared to UnorderedReceiver (while it waits for batches 
from other drillbits).

I don't see a reason to modify both UnorderedReceiver and 
MergingReceiver, instead, I think, we should modify allocator used when 
batches are created in the first place before they are added to a queue.

Thank you,

Vlad

On 4/27/18 18:10, salim achouche wrote:
> Correction for example II as Drill uses a single thread per pipeline (a
> batch is fully processed before the next one is; only receive of batches
> can happen concurrently):
> - Using batch identifiers for more clarity
> - t0: (fragment, opr-1, opr-2) = ([b1], [], [])
> - t1: (fragment, opr-1, opr-2) = ([b2], [b1], [])
> - t2: (fragment, opr-1, opr-2) = ([b3,b2], [], [b1])
>        (fragment, opr-1, opr-2) = ([b3], [b2], [])
>        (fragment, opr-1, opr-2) = ([b3], [], [b2])
>        (fragment, opr-1, opr-2) = ([], [b3], [])
>        (fragment, opr-1, opr-2) = ([], [], [b3])
>
> The point remains the same that change of ownership for pass-through
> remains valid as it doesn't inflate resource allocation for a given time
> snapshot.
>
>
> On Sat, Apr 28, 2018 at 12:42 AM, salim achouche <[email protected]>
> wrote:
>
>> Another point, I don't see a functional benefit from avoiding a change of
>> ownership for pass-through operators. Consider the following use-cases:
>>
>> Example I -
>> - Single batch of size 8MB is received at time t0 and then is passed
>> through a set of pass-through operators
>> - At time t1 owned by operator Opr1, time t2 owned by operator t2, and so
>> forth
>> - Assume we report memory usage at time t0 - t2; this is what will be seen
>> - t0: (fragment, opr-1, opr-2) = (8Mb, 0, 0)
>> - t1: (fragment, opr-1, opr-2) = (0, 8MB, 0)
>> - t2: (fragment, opr-1, opr-2) = (0, 0, 8MB)
>>
>> Example II -
>> - Multiple batches of size 8MB are received at time t0 - t2 and then is
>> passed through a set of pass-through operators
>> - At time t1 owned by operator Opr1, time t2 owned by operator t2, and so
>> forth
>> - Assume we report memory usage at time t0 - t2; this is what will be seen
>> - t0: (fragment, opr-1, opr-2) = (8Mb, 0, 0)
>> - t1: (fragment, opr-1, opr-2) = (8Mb, 8MB, 0)
>> - t2: (fragment, opr-1, opr-2) = (8Mb, 8Mb, 8MB)
>>
>>
>> The key thing is that we clarify our reporting metrics so that users do
>> not make the wrong conclusions.
>>
>> Regards,
>> Salim
>>
>> On Fri, Apr 27, 2018 at 11:47 PM, salim achouche <[email protected]>
>> wrote:
>>
>>> Vlad,
>>>
>>> - My understanding is that operators need to take ownership of incoming
>>> buffers (using
>>>
>>> the vector method transferTo())
>>>
>>> - My view is not that receivers are pass-through; instead, I feel that
>>> sender & receiver operators should focus on their business logic
>>>
>>> - It just happens that the unordered-receiver does very little
>>> (deserializes the batch through the BatchLoader)
>>>
>>> - Contrast this with the merge-receiver which needs to consume data from
>>> multiple inputs to provide ordered batches
>>>
>>> - The operator implementation will dictate how many batches are consumed
>>> (this should have nothing to do with communication concerns)
>>>
>>> - Intricacies of buffering, acking, back-pressuring, etc is ideally left
>>> to a communication module
>>>
>>>
>>> My intent, is to consistently report on resource usage (I am fine if we
>>> exclude pass-through operators as long as we do it consistently). The next
>>>
>>> enhancement that I am planning to do is to report on the fragment
>>> buffered batches. This will enable us to account for such resources when
>>> analyzing
>>>
>>> memory usage.
>>>
>>> On Fri, Apr 27, 2018 at 9:50 PM, vrozov <[email protected]> wrote:
>>>
>>>> Github user vrozov commented on the issue:
>>>>
>>>>      https://github.com/apache/drill/pull/1237
>>>>
>>>>      IMO, it will be good to understand what other operators do as well.
>>>> For example what Project or Filter operators do. Do they take ownership of
>>>> incoming batches? And if they do, when is the ownership taken?
>>>>
>>>>      I do not suggest that we change how Sender and Receiver control
>>>> **all** aspects of communication, at least not as part of this JIRA/PR. The
>>>> difference in my and your approach is whether or not UnorderedReceiver and
>>>> other receivers are pass-through operators. My view is that receivers are
>>>> not pass-through operators and they are buffering operators as they receive
>>>> batches from the network and buffer them before downstream operators are
>>>> ready to consume those batches. In your view, receivers are pass-through
>>>> operators that get batches from fragment queue or some other queue and pass
>>>> them to downstream. As there is no wait and no processing between getting a
>>>> batch from fragment queue and passing it to the next operator, I don't see
>>>> why a receiver needs to take the ownership.
>>>>
>>>>
>>>> ---
>>>>
>>>

  

Reply via email to