On Thu, May 23, 2019 at 11:23 AM Lukasz Cwik <lc...@google.com> wrote:

> I would suggest that we drop MapState and instead support MultimapState
> without ordering as a first pass and potentially add ordering later.
>

While I agree that MultimapState is useful on its own, for these
aggregation use cases ordering is actually the important factor. I
considered suggesting a SortedBagState, but that just ends up looking like
MultimapState.

Are you suggesting this as an implementation strategy, that we implement
MultimapState first and then add ordering? If so I agree, however in that
case the community should still discuss the ordered state now, as we'll
likely want to add ordering shortly thereafter.

Agree that we don't need both MapState and MultiMap state. We can also
alternatively reimplement MapState as a simple wrapper on top of
MultimapState.


> Inside an SDK we would be able to build a "sorted" multimap state that
> uses a prefix of the key and implement radix based sorting. Any time a
> prefix has too many elements, you rebalance and split it into 2 and use a
> slightly longer prefix. Similarly, removing elements makes prefixes merge
> and become shorter.
>

Can you explain this more? How would this allow us to have a sorted state
without having to read the entire state into memory?


>
>
>
>
> On Thu, May 23, 2019 at 10:36 AM Reuven Lax <re...@google.com> wrote:
>
>> Beam's state API is intended to be useful as an alternative aggregation
>> method. Stateful ParDos can aggregate elements into state and set timers to
>> read the state.  Currently Beam's state API supports two ways of storing
>> collections of elements: BagState and MapState. BagState is the one most
>> commonly for these aggregations, likely for the practical reason that it's
>> the only one that has a scalable implementation on several runners.
>>
>> There are two major limitations of bag state:
>> * It is immutable: To delete an element from BagState, you must read and
>> rewrite the entire bag.
>> * It is unsorted: Almost all use cases of BagState need the element in
>> some order (usually timestamp order). Since BagState is unsorted, every
>> time it's read it must be read in full and sorted in the ParDo.
>>
>> To make this concrete, consider a ParDo that does timeseries joins. There
>> are two types of input events: trades and quotes. A quote tells the
>> current price of a stock. The goal of the ParDo is to pair each stock's
>> trades with the closest quote that proceeds it in time.
>>
>> A simple implementation would look as follows (pseudocode):
>>
>> void process(@Element Event event, @Timestamp timestamp) {
>>   if (event.type() == TRADE) {
>>     tradesBag.add(event.quote());
>>     if (timestamp < existingTimerTime) {
>>       setTimer(processTradeTimer, timestamp);
>>    }
>>   } else {
>>     quotesBag.add(event.trade());
>>   }
>> }
>>
>> void onTimer(@Timestamp timestamp) {
>>   List<Quote> quotes = sort(quotesBag.get());
>>   List<Trade> trades = sort(tradesBag.get());
>>   for (Trade trade : trades) {
>>     if (trade.timestamp() > timestamp) break;
>>     Quote quote = findClosestQuote(quotes, trade.timestamp);
>>     // Output KV<Trade, Quote>
>>   }
>>
>>   List<Trade> newTrades = /* remaining trades. */
>>   tradesBag.clear();
>>   tradesBag.forEach(BagState::add);
>> }
>>
>> A few obvious problems with this code:
>>   1. Removing the elements already processed from the bag requires
>> clearing and rewriting the entire bag. This is O(n^2) in the number of
>> input trades.
>>   2. Both bags need to be fetched and sorted on every timer. This is
>> O(n^2*logn) in the size of the input PCollections.
>>   3. BagState is designed so that the input iterables can be large, the
>> entire BagState shouldn't have to fit in memory. However this stops working
>> when you need to sort it, because we must page the entire BagState into
>> memory in order to sort it.
>>
>> Some of these problems can be alleviated with separate GC, caching of
>> BagState fetches, etc. However this results in increasingly complicated
>> code, and does not solve all the problems above.
>>
>> I'd like to propose a new state type to solve the above problems:
>> SortedMultimapState. SortedMultimapState could have the following signature:
>>
>> public interface SortedMultimapState<K, V> extends State {
>>   // Add a value to the map.
>>   void put(K key, V value);
>>   // Get all values for a given key.
>>   ReadableState<Iterable<V>> get(K key);
>>  // Return all entries in the map.
>>   ReadableState<Iterable<KV<K, V>>> allEntries();
>>   // Return all entries in the map with keys <= limit. returned elements
>> are sorted by the key.
>>   ReadableState<Iterable<KV<K, V>>> entriesUntil(K limit);
>>  // Remove all values with the given key;
>>   void remove(K key);
>>  // Remove all entries in the map with keys <= limit.
>>   void removeUntil(K limit);
>> }
>>
>> Runners will sort based on the encoded value of the key. In order to make
>> this easier for users, I propose that we introduce a new tag on Coders
>> *PreservesOrder*. A Coder that contains this tag guarantees that the
>> encoded value preserves the same ordering as the base Java type.
>>
>> The above logic can now be rewritten! Tjhere's no need to explicitly
>> sort, since the elements are
>> returned sorted by key (in this case the key will be timestamp). We can
>> explicitly only fetch up to the current timer timestamp, so there's no need
>> to pull the entire set into memory. Furthermore we can efficiently delete
>> ranges, with no more need to rewrite the entire set each time.
>>
>> void onTimer(@Timestamp timestamp) {
>>   Iterable<KV<Long, Quote>> quotes = quotesMap.entriesUntil(timestamp);
>>   Iterable<KV<Long, Trade>> trades = tradesMap.entriesUntil(timestamp);
>>   for (Trade trade : trades) {
>>     Quote quote = findClosestQuote(quotes, trade.timestamp);
>>     // Output KV<Trade, Quote>
>>   }
>>   tradesMap.removeUntil(timestamp);
>> }
>>
>> Comments?
>>
>> Reuven
>>
>>
>>

Reply via email to