[ 
https://issues.apache.org/jira/browse/PHOENIX-758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabriel Reid resolved PHOENIX-758.
----------------------------------

    Resolution: Fixed

Bulk resolve of closed issues imported from GitHub. This status was reached by 
first re-opening all closed imported issues and then resolving them in bulk.

> Spillable GroupBy implementation, initial commit
> ------------------------------------------------
>
>                 Key: PHOENIX-758
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-758
>             Project: Phoenix
>          Issue Type: Task
>            Reporter: kutschm
>
> Spillable GroupBy:
> The main entry point is in GroupedAggregateRegionObserver. It instantiates a 
> GroupByCache and invokes a get() method on it. There is no: "if key not 
> exists -> put into map" case, since the cache is a Loading cache and 
> therefore handles the put under the covers. I tried to implement the final 
> cache element accesses (RegionScanner below) streaming, i.e. there is just an 
> iterator on it and removed the existing result materialization.
> GroupByCache implements a Guava LoadingCache, which an upper and lower 
> configurable size limit. Optimally it is sized as  estMapSize / valueSize, 
> since the upper limit is number and not memory budget based. As long as no 
> eviction happens no spillable data structures are allocated, this only 
> happens as soon as the first element is evicted from the cache. We cannot 
> really make any assumptions on which keys arrive at the map, but I thought 
> the LRU would at least cover the cases where some keys have a slight skew and 
> they should stay memory resident.
> Once a key gets evicted, the spillManager is instantiated. It basically takes 
> care of spilling an element to disk and does all the SERDE work. It creates a 
> bunch of SpillFiles (spill partition) which are MemoryMappedFiles. Each 
> MMFile only works with up to 2GB of spilled data, therefore the SpillManager 
> keeps a list of these and hash distributes the keys within this list. Once an 
> element gets spilled, it is serialized and will only get deserialized again, 
> when it is requested from the client, i.e. loaded back into the LRU cache. 
> The SpillManager holds a SpillMap for every spill partition (SpillFile). Each 
> SpillMap has access to all the pages of its SpillFile, it also contains a 
> list of bloomFilters, one for every page of the spillFile. Only a single 
> page, the currently active page stays in memory. The in memory data structure 
> of the page is a HashMap for easy key access purposes. 
> An element evicted form the LRU cache is hashed into the correct SpillMap. 
> the spillMap then determines via the list of BloomFilters on which page in 
> the SpillFile the key resides and loads this page into a HashMap in memory. 
> If the key was never spilled before, the SpillMap tries to fill up the 
> current, in memory residing page with this new key. In case it doesn't fit, 
> the current memory page is flushed to disk and a new page is requested. The 
> key is then written to the new in-memory page. Lastly, the bloomFilter is 
> updated so that this key can be discovered again.
> Loading a key works similarly, if not present in the LRU cache, the 
> CacheLoader sets in. The key gets hashed into the correct SpillMap, the list 
> of bloomFilters is walked to determined the SpillFile page, the key resides 
> on and this page is loaded first into memory and eventually, into the LRU 
> cache. Only for the last step the deserialization is triggered. 
> The aggregators are returned from the LRU cache and the next value is 
> computed. In case the key is not found on any page, the Loader create new 
> aggregators for it.
> TODOs: 
> --> Init a newly deserialized Aggregator with its previous value. I added an 
> init function to the interface and receives the client aggs and init itself 
> form this. This could go into a constructor. Also I just did this for two 
> aggregators so far, not sure if this is the way you want it to be.
> --> Error handling, right now I mostly re-throw local errors as unchecked  
> RuntimeExceptions. From my experience, even these are not always surfaced, 
> this needs some rework I think.
> --> Page defragmentation might happen as spilling occurs. The code tries to 
> fill up the current in memory page. If space is exhausted it simply asks for 
> a new page. Maybe a model that searches for a page that could store this 
> additional element instead of requesting a complete new one, would be better. 
> This would just increase the memory footprint a bit.
> -> Tuning knobs that impact performance and memory footprint:
>    - Number of spillFiles
>    - Size of LRU cache
>    - Page size.
> marcel



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to