Zooming in from generic philosophy to be clear: adding time ordered
buffer to the Fn state API is *not* a shortcut.It has benefits that
will not be achieved by SDK-side implementation on top of either
ordered or unordered multimap. Are those benefits worth expanding
the API? I don't know.
A change to allow a runner to have a specialized implementation for
time-buffered state would be one or more StateKey types, right?
Reuven, maybe put this and your Java API in a doc? A BIP? Seems like
there's at least the following to explore:
- how that Java API would map to an SDK-side implementation on top of
multimap state key
- how that Java API would map to a new StateKey
- whether there's actually more than one relevant implementation of
that StateKey
- whether SDK-side implementation on some other state key would be
performant enough in all SDK languages (present and future)
Zooming back out to generic philosophy: Proliferation of StateKey
types tuned by runners (which can very easily still share
implementation) is probably better than proliferation of complex
SDK-side implementations with varying completeness and performance.
Kenn
On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax <re...@google.com
<mailto:re...@google.com>> wrote:
It might help for me to describe what I have in mind. I'm still
proposing that we build multimap, just not a globally-sorted
multimap.
My previous proposal was that we provide a Multimap<Key, Value>
state type, sorted by key. this would have two additional
operations - multimap.getRange(startKey, endKey) and
multimap.deleteRange(startKey, endKey). The primary use case was
timestamp sorting, but I felt that a sorted multimap provided a
nice generalization - after all, you can simply key the multimap
by timestamp to get timestamp sorting.
This approach had some issues immediately that would take some
work to solve. Since a multimap key can have any type and a runner
will only be able to sort by encoded type, we would need to
introduce a concept of order-preserving coders into Beam and
plumb that through. Robert pointed out that even our existing
standard coders for simple integral types don't preserve order, so
there will likely be surprises here.
My current proposal is for a multimap that is not sorted by key,
but that can support.ordered values for a single key. Remember
that a multimap maps K -> Iterable<V>, so this means that each
individual Iterable<V> is ordered, but the keys have no specific
order relative to each other. This is not too different from many
multimap implementations where the keys are unordered, but the
list of values for a single key at least has a stable order.
The interface would look like this:
public interface MultimapState<K, V> extends State {
// Add a value with a default timestamp.
void put(K key, V value);
// Add a timestamped value.
void put(K, key, TimestampedValue<V> value);
// Remove all values for a key.
void remove (K key);
// Remove all values for a key with timestamps within the
specified range.
void removeRange(K key, Instant startTs, Instant endTs);
// Get an Iterable of values for V. The Iterable will be
returned sorted by timestamp.
ReadableState<Iterable<TimestampedValue<V>>> get(K key);
// Get an Iterable of values for V in the specified range. The
Iterable will be returned sorted by timestamp.
ReadableState<Iterable<TimestampedValue<V>>> getRange(K key,
Instant startTs, Instant endTs);
ReadableState<Iterable<K>> keys();
ReadableState<Iterable<TimestampedValue<V>>> values();
ReadableState<Iterable<Map.Entry<K, TimestampedValue<V>> entries;
}
We can of course provide helper functions that allow using
MultimapState without deailing with TimestampValue for users who
only want a multimap and don't want sorting.
I think many users will only need a single sorted list - not a
full multimap. It's worth offering this as well, and we can simply
build it on top of MultimapState. It will look like an extension
of BagState
public interface TimestampSortedListState<T> extends State {
void add(TimestampedValue<T> value);
Iterable<TimestampedValue<T>> read();
Iterable<TimestampedValue<T>> readRange(Instant startTs, Instant
endTs);
void clearRange(Instant startTs, Instant endTs);
}
On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik <lc...@google.com
<mailto:lc...@google.com>> wrote:
The portability layer is meant to live across multiple
versions of Beam and I don't think it should be treated by
doing the simple and useful thing now since I believe it will
lead to a proliferation of the API.
On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles
<k...@apache.org <mailto:k...@apache.org>> wrote:
I have thoughts on the subject of whether to have APIs
just for the lowest-level building blocks versus having
APIs for higher-level constructs. Specifically this
applies to providing only unsorted multimap vs what I will
call "time-ordered buffer". TL;DR: I'd vote to focus on
time-ordered buffer; if it turns out to be easy to go all
the way to sorted multimap that's nice-to-have; if it
turns out to be easy to implement on top of unsorted map
state that should probably be under the hood
Reasons to build low-level multimap in the runner & fn api
and layer higher-level things in the SDK:
- It is less implementation for runners if they have to
only provide fewer lower-level building blocks like
multimap state.
- There are many more runners than SDKs (and will be even
more and more) so this saves overall.
Reasons to build higher-level constructs directly in the
runner and fn api:
- Having multiple higher-level state types may actually
be less implementation than one complex state type,
especially if they map to runner primitives.
- The runner may have better specialized implementations,
especially for something like a time-ordered buffer.
- The particular access patterns in an SDK-based
implementation may not be ideal for each runner's
underlying implementation of the low-level building block.
- There may be excessive gRPC overhead even for optimal
access patterns.
There are ways to have best of both worlds, like:
1. Define multiple state types according to fundamental
access patterns, like we did this before portability.
2. If it is easy to layer one on top of the other, do that
inside the runner. Provide shared code so for runners
providing the lowest-level primitive they get all the
types for free.
I understand that this is an oversimplification. It still
creates some more work. And APIs are a burden so it is
good to introduce as few as possible for maintenance. But
it has performance benefits and also unblocks "just doing
the simple and useful thing now" which I always like to do
as long as it is compatible with future changes. If the
APIs are fundamental, like sets, maps, timestamp ordering,
then it is safe to guess that they will change rarely and
be useful forever.
Kenn
On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik
<lc...@google.com <mailto:lc...@google.com>> wrote:
I would be glad to take a stab at how to provide
sorting on top of unsorted multimap state.
Based upon your description, you want integer keys
representing timestamps and arbitrary user value for
the values, is that correct?
What kinds of operations do you need on the sorted map
state in order of efficiency requirements?
(e.g. Next(x), Previous(x), GetAll(Range[x, y)),
ClearAll(Range[x, y))
What kinds of operations do we expect the underlying
unsorted map state to be able to provide?
(at a minimum Get(K), Append(K), Clear(K) but what
else e.g. enumerate(K)?)
I went through a similar exercise of how to provide a
list like side input view over a multimap[1] side
input which efficiently allowed computation of size
and provided random access while only having access to
get(K) and enumerate K's.
1:
https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568
On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax
<re...@google.com <mailto:re...@google.com>> wrote:
Bringing this subject up again,
I've spent some time looking into implementing
this for the Dataflow runner. I'm unable to find a
way to implement the arbitrary sorted multimap
efficiently for the case where there are large
numbers of unique keys. Since the primary driving
use case is timestamp ordering (i.e. key is event
timestamp), you would expect to have nearly a new
key per element. I considered Luke's suggestion
above, but unfortunately it doesn't really solve
this issue.
The primary use case for sorting always seems to
be sorting by timestamp. I want to propose that
instead of building the fully-general sorted
multimap, we instead focus on a state type where
the sort key is an integral type (like a timestamp
or an integer). There is still a valid use case
for multimap, but we can provide that as an
unordered state. At least for Dataflow, it will be
much easier
While my difficulties here may be specific to the
Dataflow runner, any such support would have to be
built into other runners as well, and limiting to
integral sorting likely makes it easier for other
runners to implement this. Also, if you look at
this
<https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95>
Flink
comment pointed out by Aljoscha, for Flink the
main use case identified was also timestamp
sorting. This will also simplify the API design
for this feature: Sorted multimap with arbitrary
keys would require us to introduce a way of
mapping natural ordering to encoded ordering (i.e.
a new OrderPreservingCoder), but if we limit sort
keys to integral types, the API design is simpler
as integral types can be represented directly.
Reuven
On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax
<re...@google.com <mailto:re...@google.com>> wrote:
This sounds to me like a potential runner
strategy. However if a runner can natively
support sorted maps (e.g. we expect the
Dataflow runner to be able to do so, and I
think it would be useful for other runners as
well), then it's probably preferable to allow
the runner to use its native capabilities.
On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik
<lc...@google.com <mailto:lc...@google.com>>
wrote:
For the API that you proposed, the map key
is always "void" and the sort key == user
key. So in my example of
key: dummy value
key.000: token, (0001, value4)
key.001: token, (0010, value1), (0011, value2)
key.01: token
key.1: token, (1011, value3)
you would have:
"void": dummy value
"void".000: token, (0001, value4)
"void".001: token, (0010, value1), (0011,
value2)
"void".01: token
"void".1: token, (1011, value3)
Iterable<KV<K, V>> entriesUntil(K limit)
translates into walking the the prefixes
until you find a common prefix for K and
then filter for values where they have a
sort key <= K. Using the example above, to
find entriesUntil(0010) you would:
look for key."", miss
look for key.0, miss
look for key.00, miss
look for key.000, hit, sort all contained
values using secondary key, provide value4
to user
look for key.001, hit, notice that 001 is
a prefix of 0010 so we sort all contained
values using secondary key, filter out
value2 and provide value1
void removeUntil(K limit) also translates
into walking the prefixes but instead we
will clear them when we have a "hit" with
some special logic for when the sort key
is a prefix of the key. Used the example,
to removeUntil(0010) you would:
look for key."", miss
look for key.0, miss
look for key.00, miss
look for key.000, hit, clear
look for key.001, hit, notice that 001 is
a prefix of 0010 so we sort all contained
values using secondary key, store in
memory all values that > 0010, clear and
append values stored in memory.
On Fri, May 24, 2019 at 10:36 AM Reuven
Lax <re...@google.com
<mailto:re...@google.com>> wrote:
Can you explain how fetching and
deleting ranges of keys would work
with this data structure?
On Fri, May 24, 2019 at 9:50 AM Lukasz
Cwik <lc...@google.com
<mailto:lc...@google.com>> wrote:
Reuven, for the example, I assume
that we never want to store more
then 2 values at a given sort key
prefix, and if we do then we will
create a new longer prefix
splitting up the values based upon
the sort key.
Tuple representation in examples
below is (key, sort key, value)
and . is a character outside of
the alphabet which can be
represented by using an escaping
encoding that wraps the key + sort
key encoding.
To insert (key, 0010, value1), we
lookup "key" + all the prefixes of
0010 finding one that is not
empty. In this case its 0, so we
append value to the map at key.0
ending up with (we also set the
key to any dummy value to know
that it it contains values):
key: dummy value
key."": token, (0010, value1)
Now we insert (key, 0011, value2),
we again lookup "key" + all the
prefixes of 0010, finding "", so
we append value2 to key."" ending
up with:
key: dummy value
key."": token, (0010, value1),
(0011, value2)
Now we insert (key, 1011, value3),
we again lookup "key" + all the
prefixes of 1011 finding "" but
notice that it is full, so we
partition all the values into two
prefixes 0 and 1. We also clear
the "" prefix ending up with:
key: dummy value
key.0: token, (0010, value1),
(0011, value2)
key.1: token, (1011, value3)
Now we insert (key, 0001, value4),
we again lookup "key" + all the
prefixes of the value finding 0
but notice that it is full, so we
partition all the values into two
prefixes 00 and 01 but notice this
doesn't help us since 00 will be
too full so we split 00 again to
000, 001. We also clear the 0
prefix ending up with:
key: dummy value
key.000: token, (0001, value4)
key.001: token, (0010, value1),
(0011, value2)
key.01: token
key.1: token, (1011, value3)
We are effectively building a
trie[1] where we only have values
at the leaves and control how full
each leaf can be. There are other
trie representations like a radix
tree that may be better.
Looking up the values in sorted
order for "key" would go like this:
Is key set, yes
look for key."", miss
look for key.0, miss
look for key.00, miss
look for key.000, hit, sort all
contained values using secondary
key, provide value4 to user
look for key.001, hit, sort all
contained values using secondary
key, provide value1 followed by
value2 to user
look for key.01, hit, empty,
return no values to user
look for key.1, hit, sort all
contained values using secondary
key, provide value3 to user
we have walked the entire prefix
space, signal end of iterable
Some notes for the above:
* The dummy value is used to know
that the key contains values and
the token is to know whether there
are any values deeper in the trie
so when we know when to stop
searching.
* If we can recalculate the sort
key from the combination of the
key and value, then we don't need
to store it.
* Keys with lots of values will
perform worse then keys with less
values since we have to look up
more keys but they will be empty
reads. The number of misses can be
controlled by how many elements we
are willing to store at a given
node before we subdivide.
In reality you could build a lot
of structures (e.g. red black
tree, binary tree) using the sort
key, the issue is the cost of
rebalancing/re-organizing the
structure in map form and whether
it has a convenient pre-order
traversal for lookups.
On Fri, May 24, 2019 at 8:14 AM
Reuven Lax <re...@google.com
<mailto:re...@google.com>> wrote:
Some great comments!
*Aljoscha*: absolutely this
would have to be implemented
by runners to be efficient. We
can of course provide a
default (inefficient)
implementation, but ideally
runners would provide better ones.
*Jan* Exactly. I think
MapState can be dropped or
backed by this. E.g.
*Robert* Great point about
standard coders not satisfying
this. That's why I suggested
that we provide a way to tag
the coders that do preserve
order, and only accept those
as key coders Alternatively we
could present a more limited
API - e.g. only allowing a
hard-coded set of types to be
used as keys - but that seems
counter to the direction Beam
usually goes. So users will
have two ways .of creating
multimap state specs:
private final
StateSpec<MultimapState<Long,
String>> state =
StateSpecs.multimap(VarLongCoder.of(),
StringUtf8Coder.of());
or
private final
StateSpec<MultimapState<Long,
String>> state =
StateSpecs.orderedMultimap(VarLongCoder.of(),
StringUtf8Coder.of());
The second one will validate
that the key coder preserves
order, and fails otherwise
(similar to coder determinism
checking in GroupByKey). (BTW
we would also have versions of
these functions that use coder
inference to "guess" the
coder, but those will do the
same checking)
Also the API I proposed did
support random access! We
could separate out
OrderedBagState again if we
think the use cases are
fundamentally different. I
merged the proposal into that
of MultimapState because there
seemed be 99% overlap.
Reuven
On Fri, May 24, 2019 at 6:19
AM Robert Bradshaw
<rober...@google.com
<mailto:rober...@google.com>>
wrote:
On Fri, May 24, 2019 at
5:32 AM Reuven Lax
<re...@google.com
<mailto:re...@google.com>>
wrote:
>
> On Thu, May 23, 2019 at
1:53 PM Ahmet Altay
<al...@google.com
<mailto:al...@google.com>>
wrote:
>>
>>
>>
>> On Thu, May 23, 2019 at
1:38 PM Lukasz Cwik
<lc...@google.com
<mailto:lc...@google.com>>
wrote:
>>>
>>>
>>>
>>> On Thu, May 23, 2019
at 11:37 AM Rui Wang
<ruw...@google.com
<mailto:ruw...@google.com>>
wrote:
>>>>>
>>>>> A few obvious
problems with this code:
>>>>> 1. Removing the
elements already processed
from the bag requires
clearing and rewriting the
entire bag. This is O(n^2)
in the number of input trades.
>>>>
>>>> why it's not O(2 * n)
to clearing and rewriting
trade state?
>>>>
>>>>>
>>>>> public interface
SortedMultimapState<K, V>
extends State {
>>>>> // Add a value to
the map.
>>>>> void put(K key, V
value);
>>>>> // Get all values
for a given key.
>>>>>
ReadableState<Iterable<V>>
get(K key);
>>>>> // Return all
entries in the map.
>>>>>
ReadableState<Iterable<KV<K,
V>>> allEntries();
>>>>> // Return all
entries in the map with
keys <= limit. returned
elements are sorted by the
key.
>>>>>
ReadableState<Iterable<KV<K,
V>>> entriesUntil(K limit);
>>>>>
>>>>> // Remove all values
with the given key;
>>>>> void remove(K key);
>>>>> // Remove all
entries in the map with
keys <= limit.
>>>>> void removeUntil(K
limit);
>>>>
>>>> Will
removeUntilExcl(K limit)
also useful? It will
remove all entries in the
map with keys < limit.
>>>>
>>>>>
>>>>> Runners will sort
based on the encoded value
of the key. In order to
make this easier for
users, I propose that we
introduce a new tag on
Coders PreservesOrder. A
Coder that contains this
tag guarantees that the
encoded value preserves
the same ordering as the
base Java type.
>>>>
>>>>
>>>> Could you clarify
what is "encoded value
preserves the same
ordering as the base Java
type"?
>>>
>>>
>>> Lets say A and B
represent two different
instances of the same Java
type like a double, then A
< B (using the languages
comparison operator) iff
encode(A) < encode(B)
(note the encoded versions
are compared
lexicographically)
>>
>>
>> Since coders are shared
across SDKs, do we expect
A < B iff e(A) < e(P)
property to hold for all
languages we support? What
happens A, B sort
differently in different
languages?
>
>
> That would have to be
the property of the coder
(which means that this
property probably needs to
be represented in the
portability representation
of the coder). I imagine
the common use cases will
be for simple coders like
int, long, string, etc.,
which are likely to sort
the same in most languages.
The standard coders for
both double and integral
types do not respect
the natural ordering
(consider negative
values). KV coders violate the
"natural" lexicographic
ordering on components as
well. I think
implicitly sorting on
encoded value would yield
many surprises. (The
state, of course, could
take a order-preserving, bytes
(string?)-producing
callable as a parameter of
course). (As for
naming, I'd probably call
this OrderedBagState or
something like
that...rather than Map
which tends to imply
random access.)