[
https://issues.apache.org/jira/browse/DRILL-8088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17468279#comment-17468279
]
ASF GitHub Bot commented on DRILL-8088:
---------------------------------------
paul-rogers commented on pull request #2412:
URL: https://github.com/apache/drill/pull/2412#issuecomment-1004442320
The last topic is so complex that no myth has grown up around it, and the
issue is not at all well understood. Vectors (and batches) are hell for
distributed system performance. This gets pretty techie, so hang on.
**Vectors are Hell for Exchanges**: This comes from a real-world case in
which a large cluster worked no faster than a single thread of execution. We've
discussed how Drill wants to create large batches (per the myth) to benefit
from vectorization (which we don't have) and to optimize L1 cache usage (which,
as we've seen, we don't actually do.) Let's assume "small" batches of 1K rows.
Drill also wants the single format for in-memory and over-the-wire usage.
This means we want to transfer 1K record batches so that the receiver gets
batches of the optimal in-memory size.
Now, what happens in a distributed system? Assume you have 100 fragments
running. (Maybe 10 machines with 10 cores each.) Let's think about one
fragment, call it "f0.0". Let's assume f.0.0 runs a scan and a network sender.
The scan builds up its 1K batches, because those are "efficient" (according to
the myths we've discussed.)
What does f0.0's network sender do? Let's assume the target is a hash join.
So, the sender hashes the keys into 100 buckets. Now, the sender follows
Drill's rule: send 1K record batches. Since there are 100 targets, the sender
has to create 100 buffered batches, fill them each to 1K records, then send
them. To visualize:
`f0.0 (reader --> sender) - - > f1.x (receiver --> hash-join --> ...) ...`
There are 100 f0 fragments: f0.0, ... f0.99, we're looking just at one of
them: f0.0. The f0 "slice" sends to the "f1" slice that consists of 100
additional fragments: f1.0, ... f1.99.
So, what happens in our sender? Assuming even hash distribution, we have to
fill all our 100 outgoing batches before we can send them. This means we have
to read 100 * 1K = 100K input records before we send the first outgoing batch.
The result is a huge memory usage (those 100 batches), plus all the vector
resizes and copies we discussed (as we grow those batches.)
If that we not bad enough, this occurs in all our other 99 f0 fragments:
we've got 100 * 100 = 10K buffered batches waiting to send. Yikes!
Now, what happens in f1? It is sitting around waiting for data. No f0 will
send until if fills its first outgoing batch for that receiver. If we assume an
even distribution of data, then the outgoing batches fill at about the same
rate. None can be sent until one of them reaches the target, at which point
most of them are near-full. Once the first hits the 1K mark, off it goes to f1
who can filly start processing. This is bad because Drill claims to be highly
distributed, but we just described is a serial way of working.
But, it gets worse! Now, assume we're deeper in the DAG, at a sort:
`f4: (receiver --> sort --> sender) - - > f4: (receiver --> merge --> ...)`
The sort sorts its slice of records, and sends it to the merge fragment
which merges all the partial sorts. Classic distributed systems stuff. Again,
the f4 (sort) sender waits to fill its outgoing batches, then it sends. The
merge can't start until it sees batches from all 100 inputs. So, it proceeds at
the rate of the slowest sort.
Now what happens? The merge uses up one of the 100 input batches, and needs
another before it can proceed. But, here things get really nasty.
On the f4 side, f4.0, say, sent the first batch to get full. It then sent
the others as they filled. Meanwhile, the first batch started refilling and
eventually will need to be sent again. Since the merges can't read a new batch
until its used up the previous one, it blocks the f4 sender. As a result, f4
can't send to *any* other merge.
The downstream fragment throttles the upstream, and visa versa. Not quite
deadlock, but the entire system becomes serialized: the sort can't ship batches
until the slowest merge can receive them. The merge can't make progress until
the slowest sort provides the next batch. Every fragment depends on every
other. Disaster!
Again, we spent hours trying to figure this out on a customer cluster. We
could see the effect, but we could not get in to work out the details. Would be
great for someone to do the experiments.
**Summary**: The above has debunked the major myths around columnar storage
within a query engine. Note that **none** of the above changes if we use Arrow.
We'd do a huge amount of work to switch, and be stuck with the same fundamental
problems.
Hence, we have to think deeply about this issue, not just by the snake oil
that "vectors are good for an execution engine." Good old solid engineering and
experimentation will tell us what's what.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> Improve expression evaluation performance
> -----------------------------------------
>
> Key: DRILL-8088
> URL: https://issues.apache.org/jira/browse/DRILL-8088
> Project: Apache Drill
> Issue Type: Improvement
> Components: Execution - Codegen
> Reporter: wtf
> Assignee: wtf
> Priority: Minor
>
> Found unnecessary map copy when doing expression evaluation, it will slow
> down the codegen when the query include many "case when" or avg/stddev(the
> reduced expressions include "case when"). In our case, the query include 314
> avg, it takes 3+ seconds to generate the projector expressions(Intel(R)
> Xeon(R) CPU E5-2682 v4 @ 2.50GHz 32cores).
--
This message was sent by Atlassian Jira
(v8.20.1#820001)