Beam's state API is intended to be useful as an alternative aggregation
method. Stateful ParDos can aggregate elements into state and set timers to
read the state.  Currently Beam's state API supports two ways of storing
collections of elements: BagState and MapState. BagState is the one most
commonly for these aggregations, likely for the practical reason that it's
the only one that has a scalable implementation on several runners.

There are two major limitations of bag state:
* It is immutable: To delete an element from BagState, you must read and
rewrite the entire bag.
* It is unsorted: Almost all use cases of BagState need the element in some
order (usually timestamp order). Since BagState is unsorted, every time
it's read it must be read in full and sorted in the ParDo.

To make this concrete, consider a ParDo that does timeseries joins. There
are two types of input events: trades and quotes. A quote tells the
current price of a stock. The goal of the ParDo is to pair each stock's
trades with the closest quote that proceeds it in time.

A simple implementation would look as follows (pseudocode):

void process(@Element Event event, @Timestamp timestamp) {
  if (event.type() == TRADE) {
    tradesBag.add(event.quote());
    if (timestamp < existingTimerTime) {
      setTimer(processTradeTimer, timestamp);
   }
  } else {
    quotesBag.add(event.trade());
  }
}

void onTimer(@Timestamp timestamp) {
  List<Quote> quotes = sort(quotesBag.get());
  List<Trade> trades = sort(tradesBag.get());
  for (Trade trade : trades) {
    if (trade.timestamp() > timestamp) break;
    Quote quote = findClosestQuote(quotes, trade.timestamp);
    // Output KV<Trade, Quote>
  }

  List<Trade> newTrades = /* remaining trades. */
  tradesBag.clear();
  tradesBag.forEach(BagState::add);
}

A few obvious problems with this code:
  1. Removing the elements already processed from the bag requires clearing
and rewriting the entire bag. This is O(n^2) in the number of input trades.
  2. Both bags need to be fetched and sorted on every timer. This is
O(n^2*logn) in the size of the input PCollections.
  3. BagState is designed so that the input iterables can be large, the
entire BagState shouldn't have to fit in memory. However this stops working
when you need to sort it, because we must page the entire BagState into
memory in order to sort it.

Some of these problems can be alleviated with separate GC, caching of
BagState fetches, etc. However this results in increasingly complicated
code, and does not solve all the problems above.

I'd like to propose a new state type to solve the above problems:
SortedMultimapState. SortedMultimapState could have the following signature:

public interface SortedMultimapState<K, V> extends State {
  // Add a value to the map.
  void put(K key, V value);
  // Get all values for a given key.
  ReadableState<Iterable<V>> get(K key);
 // Return all entries in the map.
  ReadableState<Iterable<KV<K, V>>> allEntries();
  // Return all entries in the map with keys <= limit. returned elements
are sorted by the key.
  ReadableState<Iterable<KV<K, V>>> entriesUntil(K limit);
 // Remove all values with the given key;
  void remove(K key);
 // Remove all entries in the map with keys <= limit.
  void removeUntil(K limit);
}

Runners will sort based on the encoded value of the key. In order to make
this easier for users, I propose that we introduce a new tag on Coders
*PreservesOrder*. A Coder that contains this tag guarantees that the
encoded value preserves the same ordering as the base Java type.

The above logic can now be rewritten! Tjhere's no need to explicitly sort,
since the elements are
returned sorted by key (in this case the key will be timestamp). We can
explicitly only fetch up to the current timer timestamp, so there's no need
to pull the entire set into memory. Furthermore we can efficiently delete
ranges, with no more need to rewrite the entire set each time.

void onTimer(@Timestamp timestamp) {
  Iterable<KV<Long, Quote>> quotes = quotesMap.entriesUntil(timestamp);
  Iterable<KV<Long, Trade>> trades = tradesMap.entriesUntil(timestamp);
  for (Trade trade : trades) {
    Quote quote = findClosestQuote(quotes, trade.timestamp);
    // Output KV<Trade, Quote>
  }
  tradesMap.removeUntil(timestamp);
}

Comments?

Reuven

Reply via email to