I would suggest that we drop MapState and instead support MultimapState
without ordering as a first pass and potentially add ordering later.

Inside an SDK we would be able to build a "sorted" multimap state that uses
a prefix of the key and implement radix based sorting. Any time a prefix
has too many elements, you rebalance and split it into 2 and use a slightly
longer prefix. Similarly, removing elements makes prefixes merge and become
shorter.




On Thu, May 23, 2019 at 10:36 AM Reuven Lax <re...@google.com> wrote:

> Beam's state API is intended to be useful as an alternative aggregation
> method. Stateful ParDos can aggregate elements into state and set timers to
> read the state.  Currently Beam's state API supports two ways of storing
> collections of elements: BagState and MapState. BagState is the one most
> commonly for these aggregations, likely for the practical reason that it's
> the only one that has a scalable implementation on several runners.
>
> There are two major limitations of bag state:
> * It is immutable: To delete an element from BagState, you must read and
> rewrite the entire bag.
> * It is unsorted: Almost all use cases of BagState need the element in
> some order (usually timestamp order). Since BagState is unsorted, every
> time it's read it must be read in full and sorted in the ParDo.
>
> To make this concrete, consider a ParDo that does timeseries joins. There
> are two types of input events: trades and quotes. A quote tells the
> current price of a stock. The goal of the ParDo is to pair each stock's
> trades with the closest quote that proceeds it in time.
>
> A simple implementation would look as follows (pseudocode):
>
> void process(@Element Event event, @Timestamp timestamp) {
>   if (event.type() == TRADE) {
>     tradesBag.add(event.quote());
>     if (timestamp < existingTimerTime) {
>       setTimer(processTradeTimer, timestamp);
>    }
>   } else {
>     quotesBag.add(event.trade());
>   }
> }
>
> void onTimer(@Timestamp timestamp) {
>   List<Quote> quotes = sort(quotesBag.get());
>   List<Trade> trades = sort(tradesBag.get());
>   for (Trade trade : trades) {
>     if (trade.timestamp() > timestamp) break;
>     Quote quote = findClosestQuote(quotes, trade.timestamp);
>     // Output KV<Trade, Quote>
>   }
>
>   List<Trade> newTrades = /* remaining trades. */
>   tradesBag.clear();
>   tradesBag.forEach(BagState::add);
> }
>
> A few obvious problems with this code:
>   1. Removing the elements already processed from the bag requires
> clearing and rewriting the entire bag. This is O(n^2) in the number of
> input trades.
>   2. Both bags need to be fetched and sorted on every timer. This is
> O(n^2*logn) in the size of the input PCollections.
>   3. BagState is designed so that the input iterables can be large, the
> entire BagState shouldn't have to fit in memory. However this stops working
> when you need to sort it, because we must page the entire BagState into
> memory in order to sort it.
>
> Some of these problems can be alleviated with separate GC, caching of
> BagState fetches, etc. However this results in increasingly complicated
> code, and does not solve all the problems above.
>
> I'd like to propose a new state type to solve the above problems:
> SortedMultimapState. SortedMultimapState could have the following signature:
>
> public interface SortedMultimapState<K, V> extends State {
>   // Add a value to the map.
>   void put(K key, V value);
>   // Get all values for a given key.
>   ReadableState<Iterable<V>> get(K key);
>  // Return all entries in the map.
>   ReadableState<Iterable<KV<K, V>>> allEntries();
>   // Return all entries in the map with keys <= limit. returned elements
> are sorted by the key.
>   ReadableState<Iterable<KV<K, V>>> entriesUntil(K limit);
>  // Remove all values with the given key;
>   void remove(K key);
>  // Remove all entries in the map with keys <= limit.
>   void removeUntil(K limit);
> }
>
> Runners will sort based on the encoded value of the key. In order to make
> this easier for users, I propose that we introduce a new tag on Coders
> *PreservesOrder*. A Coder that contains this tag guarantees that the
> encoded value preserves the same ordering as the base Java type.
>
> The above logic can now be rewritten! Tjhere's no need to explicitly sort,
> since the elements are
> returned sorted by key (in this case the key will be timestamp). We can
> explicitly only fetch up to the current timer timestamp, so there's no need
> to pull the entire set into memory. Furthermore we can efficiently delete
> ranges, with no more need to rewrite the entire set each time.
>
> void onTimer(@Timestamp timestamp) {
>   Iterable<KV<Long, Quote>> quotes = quotesMap.entriesUntil(timestamp);
>   Iterable<KV<Long, Trade>> trades = tradesMap.entriesUntil(timestamp);
>   for (Trade trade : trades) {
>     Quote quote = findClosestQuote(quotes, trade.timestamp);
>     // Output KV<Trade, Quote>
>   }
>   tradesMap.removeUntil(timestamp);
> }
>
> Comments?
>
> Reuven
>
>
>

Reply via email to