I would suggest that we drop MapState and instead support MultimapState without ordering as a first pass and potentially add ordering later.
Inside an SDK we would be able to build a "sorted" multimap state that uses a prefix of the key and implement radix based sorting. Any time a prefix has too many elements, you rebalance and split it into 2 and use a slightly longer prefix. Similarly, removing elements makes prefixes merge and become shorter. On Thu, May 23, 2019 at 10:36 AM Reuven Lax <re...@google.com> wrote: > Beam's state API is intended to be useful as an alternative aggregation > method. Stateful ParDos can aggregate elements into state and set timers to > read the state. Currently Beam's state API supports two ways of storing > collections of elements: BagState and MapState. BagState is the one most > commonly for these aggregations, likely for the practical reason that it's > the only one that has a scalable implementation on several runners. > > There are two major limitations of bag state: > * It is immutable: To delete an element from BagState, you must read and > rewrite the entire bag. > * It is unsorted: Almost all use cases of BagState need the element in > some order (usually timestamp order). Since BagState is unsorted, every > time it's read it must be read in full and sorted in the ParDo. > > To make this concrete, consider a ParDo that does timeseries joins. There > are two types of input events: trades and quotes. A quote tells the > current price of a stock. The goal of the ParDo is to pair each stock's > trades with the closest quote that proceeds it in time. > > A simple implementation would look as follows (pseudocode): > > void process(@Element Event event, @Timestamp timestamp) { > if (event.type() == TRADE) { > tradesBag.add(event.quote()); > if (timestamp < existingTimerTime) { > setTimer(processTradeTimer, timestamp); > } > } else { > quotesBag.add(event.trade()); > } > } > > void onTimer(@Timestamp timestamp) { > List<Quote> quotes = sort(quotesBag.get()); > List<Trade> trades = sort(tradesBag.get()); > for (Trade trade : trades) { > if (trade.timestamp() > timestamp) break; > Quote quote = findClosestQuote(quotes, trade.timestamp); > // Output KV<Trade, Quote> > } > > List<Trade> newTrades = /* remaining trades. */ > tradesBag.clear(); > tradesBag.forEach(BagState::add); > } > > A few obvious problems with this code: > 1. Removing the elements already processed from the bag requires > clearing and rewriting the entire bag. This is O(n^2) in the number of > input trades. > 2. Both bags need to be fetched and sorted on every timer. This is > O(n^2*logn) in the size of the input PCollections. > 3. BagState is designed so that the input iterables can be large, the > entire BagState shouldn't have to fit in memory. However this stops working > when you need to sort it, because we must page the entire BagState into > memory in order to sort it. > > Some of these problems can be alleviated with separate GC, caching of > BagState fetches, etc. However this results in increasingly complicated > code, and does not solve all the problems above. > > I'd like to propose a new state type to solve the above problems: > SortedMultimapState. SortedMultimapState could have the following signature: > > public interface SortedMultimapState<K, V> extends State { > // Add a value to the map. > void put(K key, V value); > // Get all values for a given key. > ReadableState<Iterable<V>> get(K key); > // Return all entries in the map. > ReadableState<Iterable<KV<K, V>>> allEntries(); > // Return all entries in the map with keys <= limit. returned elements > are sorted by the key. > ReadableState<Iterable<KV<K, V>>> entriesUntil(K limit); > // Remove all values with the given key; > void remove(K key); > // Remove all entries in the map with keys <= limit. > void removeUntil(K limit); > } > > Runners will sort based on the encoded value of the key. In order to make > this easier for users, I propose that we introduce a new tag on Coders > *PreservesOrder*. A Coder that contains this tag guarantees that the > encoded value preserves the same ordering as the base Java type. > > The above logic can now be rewritten! Tjhere's no need to explicitly sort, > since the elements are > returned sorted by key (in this case the key will be timestamp). We can > explicitly only fetch up to the current timer timestamp, so there's no need > to pull the entire set into memory. Furthermore we can efficiently delete > ranges, with no more need to rewrite the entire set each time. > > void onTimer(@Timestamp timestamp) { > Iterable<KV<Long, Quote>> quotes = quotesMap.entriesUntil(timestamp); > Iterable<KV<Long, Trade>> trades = tradesMap.entriesUntil(timestamp); > for (Trade trade : trades) { > Quote quote = findClosestQuote(quotes, trade.timestamp); > // Output KV<Trade, Quote> > } > tradesMap.removeUntil(timestamp); > } > > Comments? > > Reuven > > >