paul-rogers commented on issue #2421: URL: https://github.com/apache/drill/issues/2421#issuecomment-1019033520
@Leon-WTF, good question, as always. We don't actually *want* any scan to be blocked: this is the problem we need to fix. Every scan fragment has a set of outgoing buffers. The problem is, in normal Drill operation, we don't send them until they are full. But, to absorb delays, we want to keep them empty most of the time, and allow them to absorb rows when the receiver is blocked. Our example had 10 sort fragments, so there are 10 outgoing buffers per scan fragment (ignoring the Mux exchange.) Let's look at this from the scan's perspective. It can fill each outgoing buffer to, say 1024 rows. The scan chugs along, producing rows. The random exchange assigns each to one of the outgoing buffers and copies it across. (Note the copy: we get no benefit from vectors during this operation.) Our data is evenly distributed, so we start with 0 rows per outgoing buffer. After a while, most buffers have, say, 3 rows. Some have more, some less, since things are random. We chug along some more. Now we're up to an average of 1000 rows. Pretty soon, one of the buffers fills to 1024. At this point, the scan fragment can't add another row to that buffer, so off the buffer goes across the wire to its target sort fragment. All good. This continues, the next buffer gets to 1024 and is sent off. Again, because of even distribution, pretty soon all are full and sent. Now the scan can chug along and refill them. Now, let's say that Sort 1 can only buffer 9 incoming batches, so it now wants to spill when, say, Scan 8 sends it the 10th batch. Now, back to scan 1, it has filled its buffers again, and it happens that the one for Sort 1 (the one which is spilling) is filled first. So, it has to be sent off before the Scan can produce another row. Why? There is no other place to put that row, and we can't get to the row behind that until we dispose of the current wow. But, the destination is blocked, doing a spill (ignoring the three-buffer rule), so the full buffer can't be sent, and so we can't add to it, since it is full. So, here's the heart of the problem. Scan 1 has just produced a row (or a batch with a row) that must go to Scan 1. The outgoing buffer is full, but it can't be sent. Scan 1 can't go any further until it can get rid of that current row. So...it...waits... In effect, our buffer to absorb delays can sometimes be as small as 0 or 1 rows (though it will average to 512 rows.) Even if we have separate channels (which we do), we can't use them. We could send off all the other partly-full buffers and it still wouldn't help us with that one we most care about: the one that has to take the current row but can't (because it is full and we're blocked sending that buffer). Sending the other buffers _might_ get the other sorts to spill sooner, so we get concurrent spilling. (Something for us to consider as a partial fix.) This is what I meant by a 10-way distributed query that begins to act as a single thread. Each scan can only send as fast as the slowest soft. Any spilling sort blocks all scans, which blocks all sorts. Definitely A Bad Thing. Now, if we have a row-based exchange, things might work better. Each scan might now have a rule: * Keep a buffer of up to 1024 rows (just picking a number). * Send the buffer if it is full, or if non-empty and has not been sent in the last 100 ms. (pick any time interval). With that rule (properly tuned), the scans will typically have mostly empty send buffers. When Sort 1 stars spilling, Scan 1 can buffer up to 1024 rows for that sort, while also sending partial buffers to other scans, to get them closer to spilling. Once Sort 1 is blocked, its rows are sent and maybe some other Sort's outgoing buffer starts to fill. We get closer to doing spills in parallel rather than sequentially. In this way, we normally have empty buffers, and have logs of margin to absorb blocking. Contrast this to the current scheme in which we have no leeway when a buffer fills and the receiver blocks. I don't have hard numbers, but I suspect (based on my experience with Impala) that the row-based schema would not fall prey to the single-threading that the vector-based approach does. (And, of course, we could implement the above idea with vectors, but it would normally lead to small batches downstream which might cause unexpected other problems in the current design.) To see this, we need more detailed instrumentation (a time series of events, not just a summary), and a beefy cluster, with a large data set, to run tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
