paul-rogers commented on issue #2421:
URL: https://github.com/apache/drill/issues/2421#issuecomment-1019033520


   @Leon-WTF, good question, as always. We don't actually *want* any scan to be 
blocked: this is the problem we need to fix. Every scan fragment has a set of 
outgoing buffers. The problem is, in normal Drill operation, we don't send them 
until they are full. But, to absorb delays, we want to keep them empty most of 
the time, and allow them to absorb rows when the receiver is blocked.
   
   Our example had 10 sort fragments, so there are 10 outgoing buffers per scan 
fragment (ignoring the Mux exchange.) Let's look at this from the scan's 
perspective. It can fill each outgoing buffer to, say 1024 rows.
   
   The scan chugs along, producing rows. The random exchange assigns each to 
one of the outgoing buffers and copies it across. (Note the copy: we get no 
benefit from vectors during this operation.)
   
   Our data is evenly distributed, so we start with 0 rows per outgoing buffer. 
After a while, most buffers have, say, 3 rows. Some have more, some less, since 
things are random. We chug along some more. Now we're up to an average of 1000 
rows. Pretty soon, one of the buffers fills to 1024.
   
   At this point, the scan fragment can't add another row to that buffer, so 
off the buffer goes across the wire to its target sort fragment. All good. This 
continues, the next buffer gets to 1024 and is sent off. Again, because of even 
distribution, pretty soon all are full and sent. Now the scan can chug along 
and refill them.
   
   Now, let's say that Sort 1 can only buffer 9 incoming batches, so it now 
wants to spill when, say, Scan 8 sends it the 10th batch.
   
   Now, back to scan 1, it has filled its buffers again, and it happens that 
the one for Sort 1 (the one which is spilling) is filled first. So, it has to 
be sent off before the Scan can produce another row. Why? There is no other 
place to put that row, and we can't get to the row behind that until we dispose 
of the current wow. But, the destination is blocked, doing a spill (ignoring 
the three-buffer rule), so the full buffer can't be sent, and so we can't add 
to it, since it is full.
   
   So, here's the heart of the problem. Scan 1 has just produced a row (or a 
batch with a row) that must go to Scan 1. The outgoing buffer is full, but it 
can't be sent. Scan 1 can't go any further until it can get rid of that current 
row. So...it...waits...
   
   In effect, our buffer to absorb delays can sometimes be as small as 0 or 1 
rows (though it will average to 512 rows.)
   
   Even if we have separate channels (which we do), we can't use them. We could 
send off all the other partly-full buffers and it still wouldn't help us with 
that one we most care about: the one that has to take the current row but can't 
(because it is full and we're blocked sending that buffer). Sending the other 
buffers _might_ get the other sorts to spill sooner, so we get concurrent 
spilling. (Something for us to consider as a partial fix.)
   
   This is what I meant by a 10-way distributed query that begins to act as a 
single thread. Each scan can only send as fast as the slowest soft. Any 
spilling sort blocks all scans, which blocks all sorts. Definitely A Bad Thing.
   
   Now, if we have a row-based exchange, things might work better. Each scan 
might now have a rule:
   
   * Keep a buffer of up to 1024 rows (just picking a number).
   * Send the buffer if it is full, or if non-empty and has not been sent in 
the last 100 ms. (pick any time interval).
   
   With that rule (properly tuned), the scans will typically have mostly empty 
send buffers. When Sort 1 stars spilling, Scan 1 can buffer up to 1024 rows for 
that sort, while also sending partial buffers to other scans, to get them 
closer to spilling. Once Sort 1 is blocked, its rows are sent and maybe some 
other Sort's outgoing buffer starts to fill. We get closer to doing spills in 
parallel rather than sequentially.
   
   In this way, we normally have empty buffers, and have logs of margin to 
absorb blocking. Contrast this to the current scheme in which we have no leeway 
when a buffer fills and the receiver blocks.
   
   I don't have hard numbers, but I suspect (based on my experience with 
Impala) that the row-based schema would not fall prey to the single-threading 
that the vector-based approach does. (And, of course, we could implement the 
above idea with vectors, but it would normally lead to small batches downstream 
which might cause unexpected other problems in the current design.)
   
   To see this, we need more detailed instrumentation (a time series of events, 
not just a summary), and a beefy cluster, with a large data set, to run tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to