Paul Rogers created DRILL-7686:
----------------------------------
Summary: Excessive memory use in partition sender
Key: DRILL-7686
URL: https://issues.apache.org/jira/browse/DRILL-7686
Project: Apache Drill
Issue Type: Bug
Affects Versions: 1.14.0
Reporter: Paul Rogers
The Partition Sender in Drill is responsible to take a batch from fragment x,
and send its rows to all other fragments f1, f2, ... fn. For example, when
joining, fragment x might read from a portion of a file, hash the join key, and
partition rows by hash key to the receiving fragments that join rows with that
same key.
Since Drill is columnar, the sender needs to send a batch of columns to each
receiver. To be efficient, that batch should contain a reasonable number of
rows. The current default is 1024.
Drill creates buffers, one per sender, to gather the rows. Thus, each sender
needs n buffers: one for each receiver.
Because Drill is symmetrical, there are n senders (scans). Since each maintains
n send buffers, we have a total of n^2 buffers. That is, the amount of memory
used by the partition sender grows with the square of the degree of parallelism
for a query.
In addition, as seen in DRILL-7675, the size of the buffers is controlled not
by Drill, but by the incoming data. The query in DRILL-7675 had a row with 260+
fields, some of which were map arrays.
The result is that the query, which processes 2 MB of data, runs out of memory
when may GB are available. Drill is simply doing the math: n^2 buffers, each
with 1024 rows, each with 250 fields, many with a cardinality of 5x (or 25x or
125x, depending on array depth) of the row count. The result is a very large
memory footprint.
There is no simple bug-fix solution: the design is inherently unbounded. This
ticket asks to develop a new design. Some crude ideas:
* Use a row-based format for sending to avoid columnar overhead.
* Send rows as soon as they are available on the sender side; allow the
receiver to do buffering.
* If doing buffering, flush rows after x ms to avoid slowing the system. (The
current approach waits for buffers to fill.)
* Consolidate buffers on each sending node. (This is the Mux/DeMux approach
which is in the code, but was never well understood, and has its own
concurrency, memory ownership problems.)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)