Beam's state API is intended to be useful as an alternative aggregation method. Stateful ParDos can aggregate elements into state and set timers to read the state. Currently Beam's state API supports two ways of storing collections of elements: BagState and MapState. BagState is the one most commonly for these aggregations, likely for the practical reason that it's the only one that has a scalable implementation on several runners.
There are two major limitations of bag state: * It is immutable: To delete an element from BagState, you must read and rewrite the entire bag. * It is unsorted: Almost all use cases of BagState need the element in some order (usually timestamp order). Since BagState is unsorted, every time it's read it must be read in full and sorted in the ParDo. To make this concrete, consider a ParDo that does timeseries joins. There are two types of input events: trades and quotes. A quote tells the current price of a stock. The goal of the ParDo is to pair each stock's trades with the closest quote that proceeds it in time. A simple implementation would look as follows (pseudocode): void process(@Element Event event, @Timestamp timestamp) { if (event.type() == TRADE) { tradesBag.add(event.quote()); if (timestamp < existingTimerTime) { setTimer(processTradeTimer, timestamp); } } else { quotesBag.add(event.trade()); } } void onTimer(@Timestamp timestamp) { List<Quote> quotes = sort(quotesBag.get()); List<Trade> trades = sort(tradesBag.get()); for (Trade trade : trades) { if (trade.timestamp() > timestamp) break; Quote quote = findClosestQuote(quotes, trade.timestamp); // Output KV<Trade, Quote> } List<Trade> newTrades = /* remaining trades. */ tradesBag.clear(); tradesBag.forEach(BagState::add); } A few obvious problems with this code: 1. Removing the elements already processed from the bag requires clearing and rewriting the entire bag. This is O(n^2) in the number of input trades. 2. Both bags need to be fetched and sorted on every timer. This is O(n^2*logn) in the size of the input PCollections. 3. BagState is designed so that the input iterables can be large, the entire BagState shouldn't have to fit in memory. However this stops working when you need to sort it, because we must page the entire BagState into memory in order to sort it. Some of these problems can be alleviated with separate GC, caching of BagState fetches, etc. However this results in increasingly complicated code, and does not solve all the problems above. I'd like to propose a new state type to solve the above problems: SortedMultimapState. SortedMultimapState could have the following signature: public interface SortedMultimapState<K, V> extends State { // Add a value to the map. void put(K key, V value); // Get all values for a given key. ReadableState<Iterable<V>> get(K key); // Return all entries in the map. ReadableState<Iterable<KV<K, V>>> allEntries(); // Return all entries in the map with keys <= limit. returned elements are sorted by the key. ReadableState<Iterable<KV<K, V>>> entriesUntil(K limit); // Remove all values with the given key; void remove(K key); // Remove all entries in the map with keys <= limit. void removeUntil(K limit); } Runners will sort based on the encoded value of the key. In order to make this easier for users, I propose that we introduce a new tag on Coders *PreservesOrder*. A Coder that contains this tag guarantees that the encoded value preserves the same ordering as the base Java type. The above logic can now be rewritten! Tjhere's no need to explicitly sort, since the elements are returned sorted by key (in this case the key will be timestamp). We can explicitly only fetch up to the current timer timestamp, so there's no need to pull the entire set into memory. Furthermore we can efficiently delete ranges, with no more need to rewrite the entire set each time. void onTimer(@Timestamp timestamp) { Iterable<KV<Long, Quote>> quotes = quotesMap.entriesUntil(timestamp); Iterable<KV<Long, Trade>> trades = tradesMap.entriesUntil(timestamp); for (Trade trade : trades) { Quote quote = findClosestQuote(quotes, trade.timestamp); // Output KV<Trade, Quote> } tradesMap.removeUntil(timestamp); } Comments? Reuven