I did not mean that a pass-through operator should not take the ownership of a batch it processes. My question was whether they do so and if they do, when and how. As far as I can see in the ProjectorTemplate code, the transfer is not done in all cases and when Projector operates in sv2 mode, there is no transfer of the ownership. Additionally, when there is a transfer, it is done when the processing of the batch is almost complete. IMO, such behavior is counter intuitive and I would expect that if there is a transfer of the ownership, it is part of  RecordBatch.next(), meaning that once an operator gets a reference to a record batch, it owns it. At this point, an operator may consume content of the record batch and create a completely new record batch or it can modify the record batch and pass it to the next downstream operator.

The behavior above applies to an operator that consumes record batches from another operator. An input operator (scan or edge operator) is an operator that produces record batches from an external source (parquet file, hbase, kafka, etc). IMO, when such operators create record batches they should allocate memory using operator allocator compared to fragment allocator. If the memory is allocated using fragment allocator, there is no point changing ownership when batch construction is complete and the batch is passed to the next operator.

The same approach applies to senders and receivers. Senders gets batches from the upstream operators taking ownership of those batches and send data to receivers. Receivers get data from senders and reconstruct record batches. It is the business logic of senders and receivers and they may rely on other libraries (rpc and netty) or classes to handle serialization/de-serialization, buffering, acknowledgment, back-pressure or dealing with network. From other Drill operators point of view, senders and receivers are operators responsible for passing record batches from one drillbit to another.

Following your approach it is necessary to modify MergingReceiver as well. It also pulls batches from a queue (see MergingRecordBatch.getNext()), but instead of almost immediately passing it to a next operator as UnorderReceiver does, MergingReceiver creates a new record batch from those batches that it pulls from the queue. To be consistent with proposed changes to UnorderReceiver, it is necessary to change the ownership of batches that MergingReceiver pulls as well especially that MergingReciver may keep reference to the original batch much longer compared to UnorderedReceiver (while it waits for batches from other drillbits).

I don't see a reason to modify both UnorderedReceiver and MergingReceiver, instead, I think, we should modify allocator used when batches are created in the first place before they are added to a queue.

Thank you,

Vlad

On 4/27/18 18:10, salim achouche wrote:
Correction for example II as Drill uses a single thread per pipeline (a
batch is fully processed before the next one is; only receive of batches
can happen concurrently):
- Using batch identifiers for more clarity
- t0: (fragment, opr-1, opr-2) = ([b1], [], [])
- t1: (fragment, opr-1, opr-2) = ([b2], [b1], [])
- t2: (fragment, opr-1, opr-2) = ([b3,b2], [], [b1])
        (fragment, opr-1, opr-2) = ([b3], [b2], [])
        (fragment, opr-1, opr-2) = ([b3], [], [b2])
        (fragment, opr-1, opr-2) = ([], [b3], [])
        (fragment, opr-1, opr-2) = ([], [], [b3])

The point remains the same that change of ownership for pass-through
remains valid as it doesn't inflate resource allocation for a given time
snapshot.


On Sat, Apr 28, 2018 at 12:42 AM, salim achouche <sachouc...@gmail.com>
wrote:

Another point, I don't see a functional benefit from avoiding a change of
ownership for pass-through operators. Consider the following use-cases:

Example I -
- Single batch of size 8MB is received at time t0 and then is passed
through a set of pass-through operators
- At time t1 owned by operator Opr1, time t2 owned by operator t2, and so
forth
- Assume we report memory usage at time t0 - t2; this is what will be seen
- t0: (fragment, opr-1, opr-2) = (8Mb, 0, 0)
- t1: (fragment, opr-1, opr-2) = (0, 8MB, 0)
- t2: (fragment, opr-1, opr-2) = (0, 0, 8MB)

Example II -
- Multiple batches of size 8MB are received at time t0 - t2 and then is
passed through a set of pass-through operators
- At time t1 owned by operator Opr1, time t2 owned by operator t2, and so
forth
- Assume we report memory usage at time t0 - t2; this is what will be seen
- t0: (fragment, opr-1, opr-2) = (8Mb, 0, 0)
- t1: (fragment, opr-1, opr-2) = (8Mb, 8MB, 0)
- t2: (fragment, opr-1, opr-2) = (8Mb, 8Mb, 8MB)


The key thing is that we clarify our reporting metrics so that users do
not make the wrong conclusions.

Regards,
Salim

On Fri, Apr 27, 2018 at 11:47 PM, salim achouche <sachouc...@gmail.com>
wrote:

Vlad,

- My understanding is that operators need to take ownership of incoming
buffers (using

the vector method transferTo())

- My view is not that receivers are pass-through; instead, I feel that
sender & receiver operators should focus on their business logic

- It just happens that the unordered-receiver does very little
(deserializes the batch through the BatchLoader)

- Contrast this with the merge-receiver which needs to consume data from
multiple inputs to provide ordered batches

- The operator implementation will dictate how many batches are consumed
(this should have nothing to do with communication concerns)

- Intricacies of buffering, acking, back-pressuring, etc is ideally left
to a communication module


My intent, is to consistently report on resource usage (I am fine if we
exclude pass-through operators as long as we do it consistently). The next

enhancement that I am planning to do is to report on the fragment
buffered batches. This will enable us to account for such resources when
analyzing

memory usage.

On Fri, Apr 27, 2018 at 9:50 PM, vrozov <g...@git.apache.org> wrote:

Github user vrozov commented on the issue:

     https://github.com/apache/drill/pull/1237

     IMO, it will be good to understand what other operators do as well.
For example what Project or Filter operators do. Do they take ownership of
incoming batches? And if they do, when is the ownership taken?

     I do not suggest that we change how Sender and Receiver control
**all** aspects of communication, at least not as part of this JIRA/PR. The
difference in my and your approach is whether or not UnorderedReceiver and
other receivers are pass-through operators. My view is that receivers are
not pass-through operators and they are buffering operators as they receive
batches from the network and buffer them before downstream operators are
ready to consume those batches. In your view, receivers are pass-through
operators that get batches from fragment queue or some other queue and pass
them to downstream. As there is no wait and no processing between getting a
batch from fragment queue and passing it to the next operator, I don't see
why a receiver needs to take the ownership.


---



Reply via email to