Paul Rogers created DRILL-7686:
----------------------------------

             Summary: Excessive memory use in partition sender
                 Key: DRILL-7686
                 URL: https://issues.apache.org/jira/browse/DRILL-7686
             Project: Apache Drill
          Issue Type: Bug
    Affects Versions: 1.14.0
            Reporter: Paul Rogers


The Partition Sender in Drill is responsible to take a batch from fragment x, 
and send its rows to all other fragments f1, f2, ... fn. For example, when 
joining, fragment x might read from a portion of a file, hash the join key, and 
partition rows by hash key to the receiving fragments that join rows with that 
same key.

Since Drill is columnar, the sender needs to send a batch of columns to each 
receiver. To be efficient, that batch should contain a reasonable number of 
rows. The current default is 1024.

Drill creates buffers, one per sender, to gather the rows. Thus, each sender 
needs n buffers: one for each receiver.

Because Drill is symmetrical, there are n senders (scans). Since each maintains 
n send buffers, we have a total of n^2 buffers. That is, the amount of memory 
used by the partition sender grows with the square of the degree of parallelism 
for a query.

In addition, as seen in DRILL-7675, the size of the buffers is controlled not 
by Drill, but by the incoming data. The query in DRILL-7675 had a row with 260+ 
fields, some of which were map arrays.

The result is that the query, which processes 2 MB of data, runs out of memory 
when may GB are available. Drill is simply doing the math: n^2 buffers, each 
with 1024 rows, each with 250 fields, many with a cardinality of 5x (or 25x or 
125x, depending on array depth) of the row count. The result is a very large 
memory footprint.

There is no simple bug-fix solution: the design is inherently unbounded. This 
ticket asks to develop a new design. Some crude ideas:
 * Use a row-based format for sending to avoid columnar overhead.
 * Send rows as soon as they are available on the sender side; allow the 
receiver to do buffering.
 * If doing buffering, flush rows after x ms to avoid slowing the system. (The 
current approach waits for buffers to fill.)
 * Consolidate buffers on each sending node. (This is the Mux/DeMux approach 
which is in the code, but was never well understood, and has its own 
concurrency, memory ownership problems.)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to