GitHub user Ben-Zvi opened a pull request:
https://github.com/apache/drill/pull/822
DRILL-5457: Spill implementation for Hash Aggregate
This pull-request is for the work on enabling memory spill for the Hash
Aggregate Operator.
To assist in reviewing this extensive code change, listed below are various
topics/issues that describe the implementation decisions and give some code
pointers. The reviewer can read these items and peek into their relevant code,
or read all the items first (and comment on the design decisions as well).
The last topic is not "least": It describes many issues and solutions
related to the need to estimate the memory size of batches (and hash tables,
etc.) This work took a significant amount of time, and will need some more to
get better.
(Most of the code changes are in HashAggTemplate.java, hence this file is
not mentioned specifically below)
### Aggregation phase:
The code was changed to pass the aggregation phase information (whether
this is a Single phase, or 1st of two phase, or 2nd of two phase) from the
planner to the HAG operator code.
(See HashAggregate.java, AggPrelBase.java, HashAggPrel.java )
### Partitioning:
The data (rows/groups) coming into the HAG is partitioned into (a power
of 2) number of partitions, based on the N least significant bits of the hash
value (computed out of the row's key columns).
Each partition can be handled independently of the others. Ideally each
partition should fit into the available memory. The number of partitions is
initialized from the option "drill.exec.hashagg.num_partitions", and scaled
down if the available memory seems too small (each partition needs to hold at
least one batch in memory).
The scaling down uses the formula: AVAIL_MEMORY > NUM_PARTITIONS * ( K
* EST_BATCH_SIZE + 8M )
(see delayedSetup() ) where K is the option
drill.exec.hashagg.min_batches_per_partition -- see below).
Computing the number of partitions is delayed till actuall data arrives
on incoming (in order to get an accurate sizing on varchars). See
delayedSetup(). There is also special code for cases data never arrives (empty
batches) hence no partitions (see beginning of outputCurrentBatch(), cleanUp(),
delayedSetup() ).
Many of the code changes made in order to implement multi-partitions
follow the original code, only changing scalar members (of HashAggTemplate)
into arrays, like "htable" becomes "htables[]".
Each partition has its own hash table. After each time it is spilled, its
hash table is freed and reallocated.
### Hash Code:
The hash code computation result was extracted from the HashTable (needed
for the partitions), and added as a parameter to the put() method. Thus for
each new incoming row, first the hash code is computed, and the low N bits are
used to select the partition, then the hash code is right shifted by N, and the
result is passed back to the put() method.
After spilling, the hash codes are not kept. When reading the rows
(groups) from the spill, the hash codes are computed again (and right shifted
before use - once per each cycle of spilling - thus repartitioning).
(See more about "spilling cycle" below).
### Hash Table put():
The put() method for the hash table was rewriten and simplified. In
addition to the hash-code parameter change, it was changed to return the
PutStatus, with two new states: NEW_BATCH_ADDED notifies the caller that a new
batch was created internally, hence a new batch (only needed for Hash Agg) is
needed (prior code was getting this from comparing the returned index against
the prior number of batches).
A second new state is KEY_ADDED_LAST, which notifies that a batch was
just filled, hence it is time for checking memory availability (because a new
batch would be allocated soon).
Similar rewriting was done for the hash table containsKey() method (and
addBatchifNeeded() ).
### Memory pressure check:
Logically the place to check for a memory pressure is when a new memory
is needed (i.e., when a new group needs to be created.) However the code
structure does not let this easily (e.g., a new batch is allocated inside the
hash table object when a new group is detected, or the hash table structure is
doubled in size), thus instead the check is done AFTER a new group was added,
in case this was the last group added to that batch (see in
checkGroupAndAggrValues() - checking for a new status KEY_ADDED_LAST )
This memory availability check checks if there is enough memory left
between the allocated so far and the limit.
Spill is initiated when: MEMORY_USED + MAX_MEMORY_NEEDED > MEMORY_LIMIT
(see checkGroupAndAggrValues() )
where the memory needed is: (EST_BATCH_SIZE + 64K * (4+4)) * K *
PLANNED_BATCHES + MAX_HASH_TABLES_RESIZE
(See K above, under Partitioning, and the rest well below, under memory
estimations).
### When can not spill:
Single phase HAG can not spill. Also under memory duress 2nd phase may
end up with only a single partition, which can not allow spilling (no progress
is made). In these two cases, the memory check is skipped, and the operator
functions like the old code -- if runs out of memory then it will OOM. A
try-catch was added into the code to provide more detail on the OOM (see
getOOMErrorMsg() ).
Also in case of a single partition the allocator's memory limit is set
to 10GB, to be compatible with the prior code.
Another "can't spill" situation is when choosing a partition to spill,
but no partition has more than 1 batch (hence memory can not be gained, as
after spilling 1 batch need to reinitialize that partition with a new batch).
See chooseAPartitionToFlush(). In such a case the code "crosses its fingers"
and continues without spilling.
### 1st phase - Early return:
The 1st phase HAG does not spill to disk. When the 1st detects a memory
pressure it picks the current partition (the one whose last batch just got
full) and returns that partition downstream (just like regular return, only
early). Afterwards that partition is (deallocated and) initialized. Note the
boolean "earlyOutput" in the code which controls special processing in this
case - when turned on the code switches to output (e.g., innerNext() in
HashAggBatch.java), and turned off when done (see outputCurrentBatch() ).
### Spilling:
Using the SpillSet (like the External Sort does) for the actual IO. Each
partition spills into a single
file. Changes to SpillSet: Generalize it for any kind of "buffered memory"
operator (pass in the operator's type). Also small changes to the spill file
name.
### 2nd phase - Flushing/spilling:
When a memory pressure is detected (while reading and processing the
incoming), one of the
partitions is selected ( see chooseAPartitionToFlush() ) and flushed (
spillAPartition() ), and then its memory is freed and that partition is
re-initialized ( reinitPartition() ). The choice of a partition gives some
small priority to the current partition (since its last batch is full, unlike
the others), and priority by a factor of 4 to partitions that are already
spilled (i.e., a spilled partition with 10 batches would be chosen vs a
pristine/non-spilled with 39 batches.)
Partition spilling (spillAPartition() ): For each batch in the partition -
Allocate an outgoing container, link the values and the keys into this
container, and write it to the file/stream.
2nd phase - End of incoming: After the last batch was read from the
incoming - the original code ( next() ) returned a status of NONE. The new code
- after spilling can't return NONE, so instead returning a special status of
RESTART (see outputCurrentBatch() ). This RESTART is captured by the caller of
the next() ( innerNext() in HashAggBatch.java ) which continues to drive the
aggregation (instead of returning).
After the end of the incoming, all the (partially) spilled partitions
finish spilling all their remaining in-memory batches to disk (see
outputCurrentBatch() ). This is done to simplify the later processing of each
spilled partition, as well as freeing memory which may be needed as partitions
are processed. The spilled partitions are added into a list
(spilledPartitionsList) to allow for later processing.
### 2nd phase reading of the spill:
Reading of each spilled partition/file is performed like reading the
incoming. For this purpose, a new class was added: SpilledRecordbatch. The main
method there is next() which reads a batch from the stream -- first time it
uses the original readFromStream() method, which creates a new container;
subsequent calls use the new readFromStreamWithContainer() method, which is
similar - only reuses the prior container. (This was done because many places
in the code have references into the container).
### Spilling cycles:
Reading a spilled partition "just like incoming" allows for that input to
spill again (and again ...);
this was termed SECONDARY spilling (and TERTIARY ...). As the spilled
partitions are kept in a FIFO list, processing of SECONDARY partitions would
start only after all the regular spilled ones, etc. Hence a member "cycleNum"
was created, incremented every time that processing the spilled list advances
to another "cycle" (see outputCurrentBatch() ).
The "cycleNum" is used for the hash-code computation; the same hash-code
is computed at every cycle, but the cycle tells how much to right-shift that
code so that different bits would be used (for partitioning and hash-table
bucket).
### Configuration options:
- drill.exec.hashagg.num_partitions: Initial number of partitions in each
HAG operator (the number may be down adjusted in case too little memory is
available). Default value: 32 , allowed range 1 - 128 , where a value of 1
means "No spilling" (and thus setting 10GB limit).
- drill.exec.hashagg.min_batches_per_partition: Range 2--5. Default 3. Used
for internal initial estimate of number of partitions, and later when
predicting memory needed (to avoid a spill).
(A value of 2 may be better, but it evokes some bug which would be
addressed separately).
Also using options common to all the "buffered" operators (can be
overriden, per operator):
- drill.exe.spill.fs: File system for spilling into.
- drill.exec.spill.directories: (Array of) directories to spill into.
(To override, per-operator: for the (managed) External Sort:
"drill.exec.sort.external.spill.fs" and
"drill.exec.sort.external.spill.directories", and for the Hash Aggregate:
"drill.exec.hashagg.spill.fs" and "drill.exec.hashagg.spill.directories")
For testing:
- drill.exec.hashagg.mem_limit: Limit the memory for each HAG operator
(also sets this number in the allocator, hence this is a "hard limit").
Also for testing (or for a customer workaround ??):
- planner.force_2phase_aggr: Forces the aggregation to be two phase.
### Stats and metrics:
The hash-table stats were modified to sum the stats across all the
partitions' hash tables. (This only applies to the first spilling cycle; does
not count for SECONDARY, TERTIARY spilling etc.).
New metrics added:
- "num partitions" (actual number; may have been scaled down due to memory
pressure)
- "spilled partitions" (number that has spilled)
- "MB spilled" (in case of 1st phase - that's the total data returned
early).
All the above three refer to the end of input into the HAG (does not
include handling of spills, secondary spills, etc.)
- "cycle": 0 - no spill (or 1st phase), 1 - regular spill, 2 - Secondary,
3 - Tertiary ...)
### Memory Allocation Utilities:
Extended for all "buffered operators", not only for Sort. (Hash Join
will be added later as well, etc.)
###Changes to Unit tests:
- New TestHashAggSpill : Runs two hash agg queries - One spills some of its
partitions (1 out of 2), and the other test forces a smaller memory hence gets
into a Secondary and Tertiary spills.
- TestBugFixes.testDRILL4884: This test implicitly relied on rows returned
in order (the old Hash agg, plus the Parquet file).
With the new division into partitions, that order was broken. Fix:
added an "order by".
- TestTpchDistributedConcurrent.testConcurrentQueries: Needed longer
timeout (probably spilled).
### MISC
- After a spill, check again if enough memory was freed, else spill
(another partition) again. (Not sure if needed.)
- Suggestion not implemented: Scaling down the initial hash-table sizes by
the number of partitions (e.g. when 4 partitions, each hash-table starts with
1/4 of the initial size). Reason for not changing: starting with a small size
immediately causes doubling and another doubling etc. Better allocate a little
more and save that work.
- The RecordBatchSizer had a recent change to handle MAPs (recursively).
Merged this change with the modified measureColumn() which returns an int (the
est size).
### MEMORY SIZE ESTIMATIONS
As described above, we need to get good estimate of the memory needs in
order to decide initially on the number of partitions, and later to decide each
time (a batch gets filled) wheter to spill or not.
These estimates are complicated due to:
(1) Possible changes in the incoming data batches (e.g., varchar(12) in the
first batch becomes varchar(200) in the second incoming batch). This may
invalidate prior estimates.
(2) Arbitrary setting of length 50 for varchar type (when sizing the
allocation of DrillBufs)
(3) Allocation size aligned up to nearest power of 2 (DrillBufs for
varchars)
(4) When an internal batch gets filled, and estimation shows ample memory
-- a second batch may get filled before the first one's partition allocated a
new batch (hence may cause "double booking").
(5) Inserting a single value may cause the "start indices" (the real actual
"hash table") to double in size. This structure can get pretty large (few MB).
(6) Does the size of the incoming batch being charged against the HAG's
memory allocator limit ? (Not sure; usually not a problem as the prior batch is
deallocated before the next one comes; unless the next one is "much bigger")
(7) For varchars: The memory is allocated as a power of 2 (e.g. doubled via
setSafe()). This can cause a big memory waste, like if the total memory needed
for 64k varchars is ~5MB, then 8MB is allocated, wasting 3MB).
(8) The varchar value vector uses an internal "offset vector" that
allocates "size+1", hence for 64K it allocates 512kb, of which 256kb are wasted
(see DRILL-5446).
### Solutions for the memory estimation issues:
(1)+(6) above: Monitor the size of each incoming batch. Resize batch size
estimate if the incoming batch is bigger (see doWork() )
(5) When estimating memory needs, take into account hash table size
doubling in all partitions (using the new hash table method
extraMemoryNeededForResize() ).
(4) Track "plannedBatches"; when "promising" few partitions a new batch
each, take this into account when checking for available memory. (Though "more
than 1" situation seems very rare).
(2)+(3) Idealy tracking the size of EACH varchar column could work better,
but not simple to implement. Instead -- just find the maximum size of any of
the incoming columns (for simplicity - not only varchars), and use this value
(capped at 50, min value of 8; rounded up to the next power of 2 if needed).
This addresses the common situation of multiple short varchar key columns but
not the (very rare) situation of a huge varchar KEY column, plus few short ones.
(7) Update RecordBatchSizer.java -- added a method netRowWidthCap50()
which takes into account the rounding up (per each column in a row), plus nulls
arrays as needed, for each row (will multiply that by 64K in
updateEstMaxBatchSize() ).
==== END ====
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/Ben-Zvi/drill hashagg-spill
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/drill/pull/822.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #822
----
commit 1e436cef2dd8c4f519e584fbe18d233eab26468f
Author: Boaz Ben-Zvi <[email protected]>
Date: 2017-05-02T02:59:49Z
Spill implementation for Hash Aggregate
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---