Paul Rogers created DRILL-7686: ---------------------------------- Summary: Excessive memory use in partition sender Key: DRILL-7686 URL: https://issues.apache.org/jira/browse/DRILL-7686 Project: Apache Drill Issue Type: Bug Affects Versions: 1.14.0 Reporter: Paul Rogers
The Partition Sender in Drill is responsible to take a batch from fragment x, and send its rows to all other fragments f1, f2, ... fn. For example, when joining, fragment x might read from a portion of a file, hash the join key, and partition rows by hash key to the receiving fragments that join rows with that same key. Since Drill is columnar, the sender needs to send a batch of columns to each receiver. To be efficient, that batch should contain a reasonable number of rows. The current default is 1024. Drill creates buffers, one per sender, to gather the rows. Thus, each sender needs n buffers: one for each receiver. Because Drill is symmetrical, there are n senders (scans). Since each maintains n send buffers, we have a total of n^2 buffers. That is, the amount of memory used by the partition sender grows with the square of the degree of parallelism for a query. In addition, as seen in DRILL-7675, the size of the buffers is controlled not by Drill, but by the incoming data. The query in DRILL-7675 had a row with 260+ fields, some of which were map arrays. The result is that the query, which processes 2 MB of data, runs out of memory when may GB are available. Drill is simply doing the math: n^2 buffers, each with 1024 rows, each with 250 fields, many with a cardinality of 5x (or 25x or 125x, depending on array depth) of the row count. The result is a very large memory footprint. There is no simple bug-fix solution: the design is inherently unbounded. This ticket asks to develop a new design. Some crude ideas: * Use a row-based format for sending to avoid columnar overhead. * Send rows as soon as they are available on the sender side; allow the receiver to do buffering. * If doing buffering, flush rows after x ms to avoid slowing the system. (The current approach waits for buffers to fill.) * Consolidate buffers on each sending node. (This is the Mux/DeMux approach which is in the code, but was never well understood, and has its own concurrency, memory ownership problems.) -- This message was sent by Atlassian Jira (v8.3.4#803005)