Correction for example II as Drill uses a single thread per pipeline (a
batch is fully processed before the next one is; only receive of batches
can happen concurrently):
- Using batch identifiers for more clarity
- t0: (fragment, opr-1, opr-2) = ([b1], [], [])
- t1: (fragment, opr-1, opr-2) = ([b2], [b1], [])
- t2: (fragment, opr-1, opr-2) = ([b3,b2], [], [b1])
(fragment, opr-1, opr-2) = ([b3], [b2], [])
(fragment, opr-1, opr-2) = ([b3], [], [b2])
(fragment, opr-1, opr-2) = ([], [b3], [])
(fragment, opr-1, opr-2) = ([], [], [b3])
The point remains the same that change of ownership for pass-through
remains valid as it doesn't inflate resource allocation for a given time
snapshot.
On Sat, Apr 28, 2018 at 12:42 AM, salim achouche <[email protected]>
wrote:
> Another point, I don't see a functional benefit from avoiding a change of
> ownership for pass-through operators. Consider the following use-cases:
>
> Example I -
> - Single batch of size 8MB is received at time t0 and then is passed
> through a set of pass-through operators
> - At time t1 owned by operator Opr1, time t2 owned by operator t2, and so
> forth
> - Assume we report memory usage at time t0 - t2; this is what will be seen
> - t0: (fragment, opr-1, opr-2) = (8Mb, 0, 0)
> - t1: (fragment, opr-1, opr-2) = (0, 8MB, 0)
> - t2: (fragment, opr-1, opr-2) = (0, 0, 8MB)
>
> Example II -
> - Multiple batches of size 8MB are received at time t0 - t2 and then is
> passed through a set of pass-through operators
> - At time t1 owned by operator Opr1, time t2 owned by operator t2, and so
> forth
> - Assume we report memory usage at time t0 - t2; this is what will be seen
> - t0: (fragment, opr-1, opr-2) = (8Mb, 0, 0)
> - t1: (fragment, opr-1, opr-2) = (8Mb, 8MB, 0)
> - t2: (fragment, opr-1, opr-2) = (8Mb, 8Mb, 8MB)
>
>
> The key thing is that we clarify our reporting metrics so that users do
> not make the wrong conclusions.
>
> Regards,
> Salim
>
> On Fri, Apr 27, 2018 at 11:47 PM, salim achouche <[email protected]>
> wrote:
>
>> Vlad,
>>
>> - My understanding is that operators need to take ownership of incoming
>> buffers (using
>>
>> the vector method transferTo())
>>
>> - My view is not that receivers are pass-through; instead, I feel that
>> sender & receiver operators should focus on their business logic
>>
>> - It just happens that the unordered-receiver does very little
>> (deserializes the batch through the BatchLoader)
>>
>> - Contrast this with the merge-receiver which needs to consume data from
>> multiple inputs to provide ordered batches
>>
>> - The operator implementation will dictate how many batches are consumed
>> (this should have nothing to do with communication concerns)
>>
>> - Intricacies of buffering, acking, back-pressuring, etc is ideally left
>> to a communication module
>>
>>
>> My intent, is to consistently report on resource usage (I am fine if we
>> exclude pass-through operators as long as we do it consistently). The next
>>
>> enhancement that I am planning to do is to report on the fragment
>> buffered batches. This will enable us to account for such resources when
>> analyzing
>>
>> memory usage.
>>
>> On Fri, Apr 27, 2018 at 9:50 PM, vrozov <[email protected]> wrote:
>>
>>> Github user vrozov commented on the issue:
>>>
>>> https://github.com/apache/drill/pull/1237
>>>
>>> IMO, it will be good to understand what other operators do as well.
>>> For example what Project or Filter operators do. Do they take ownership of
>>> incoming batches? And if they do, when is the ownership taken?
>>>
>>> I do not suggest that we change how Sender and Receiver control
>>> **all** aspects of communication, at least not as part of this JIRA/PR. The
>>> difference in my and your approach is whether or not UnorderedReceiver and
>>> other receivers are pass-through operators. My view is that receivers are
>>> not pass-through operators and they are buffering operators as they receive
>>> batches from the network and buffer them before downstream operators are
>>> ready to consume those batches. In your view, receivers are pass-through
>>> operators that get batches from fragment queue or some other queue and pass
>>> them to downstream. As there is no wait and no processing between getting a
>>> batch from fragment queue and passing it to the next operator, I don't see
>>> why a receiver needs to take the ownership.
>>>
>>>
>>> ---
>>>
>>
>>
>