@Stephan:

Technically speaking this is really just a partitioned key-value state and
a fancy operator executing special operations on this state.

>From the user's perspective though this is something hard to implement. If
you want to share state between two stream for instance this way (getting
updates from one stream and enriching the other one) you would probably use
a connected datastream and custom implement the Key-value store logic. But
once you have one(or more) update stream and many get streams this
implementation will not work. So either the user end up replicating the
whole state in multiple connected operators, or custom implement some
inefficient wrapper class to take care of all the put/get operations.

The Idea behind this is to give a very simple abstraction for this type of
processing that uses the flink runtime efficiently instead of relying on
custom implementations.

Let me give you a stupid example:

You receive Temperature data in the form of (city, temperature), and you
are computing a rolling avg for each city.
Now you have 2 other incoming streams: first is a stream of some other info
about the city let's say population (city, population) and you want to
combine it with the last known avg temperature to produce (city, temp, pop)
triplets. The second stream is a pair of cities (city,city) and you are
interested in the difference of the temperature.

For enriching the (city, pop) to (city,temp,pop) you would probably use a
CoFlatMap and store the last known rolling avg as state. For computing the
(city,city) temperature difference it is a little more difficult, as you
need to get the temperature for both cities then combine in a second
operator. If you don't want to replicate your state, you have to combine
these two problems to a common wrapper type and execute them on a same
operator which will keep the avg state.

With the KVStore abstraction this is very simple:
you create a KVStore<City, Temp>
For enriching you use kvStore.getWithKeySelector() which will give you
((cit,pop), temp) pairs and you are done. For computing the difference, you
can use kvStore.multiget(...) and get for the 2 cities at the same type.
The kv store will abstract away the getting of the 2 keys separately and
merging them so it will return [(city1, t1), (city2,t2)].

This might be slightly artificial example but I think it makes the point.
Implementing these jobs efficiently is not trivial for the users but I
think it is a very common problem.

Stephan Ewen <se...@apache.org> ezt írta (időpont: 2015. szept. 8., K,
14:53):

> @Gyula
>
> Can you explain a bit what this KeyValue store would do more then the
> partitioned key/value state?
>
> On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay <gga...@gmail.com> wrote:
>
> > Hello,
> >
> > As for use cases, in my old job at Ericsson we were building a
> > streaming system that was processing data from telephone networks, and
> > it was using key-value stores a LOT. For example, keeping track of
> > various state info of the users (which cell are they currently
> > connected to, what bearers do they have, ...); mapping from IDs of
> > users in one subsystem of the telephone network to the IDs of the same
> > users in an other subsystem; mapping from IDs of phone calls to lists
> > of IDs of participating users; etc.
> > So I imagine they would like this a lot. (At least, if they were
> > considering moving to Flink :))
> >
> > Best,
> > Gabor
> >
> >
> >
> >
> > 2015-09-08 13:35 GMT+02:00 Gyula Fóra <gyf...@apache.org>:
> > > Hey All,
> > >
> > > The last couple of days I have been playing around with the idea of
> > > building a streaming key-value store abstraction using stateful
> streaming
> > > operators that can be used within Flink Streaming programs seamlessly.
> > >
> > > Operations executed on this KV store would be fault tolerant as it
> > > integrates with the checkpointing mechanism, and if we add timestamps
> to
> > > each put/get/... operation we can use the watermarks to create fully
> > > deterministic results. This functionality is very useful for many
> > > applications, and is very hard to implement properly with some
> dedicates
> > kv
> > > store.
> > >
> > > The KVStore abstraction could look as follows:
> > >
> > > KVStore<K,V> store = new KVStore<>;
> > >
> > > Operations:
> > >
> > > store.put(DataStream<Tuple2<K,V>>)
> > > store.get(DataStream<K>) -> DataStream<KV<K,V>>
> > > store.remove(DataStream<K>) -> DataStream<KV<K,V>>
> > > store.multiGet(DataStream<K[]>) -> DataStream<KV<K,V>[]>
> > > store.getWithKeySelector(DataStream<X>, KeySelector<X,K>) ->
> > > DataStream<KV<X,V>[]>
> > >
> > > For the resulting streams I used a special KV abstraction which let's
> us
> > > return null values.
> > >
> > > The implementation uses a simple streaming operator for executing most
> of
> > > the operations (for multi get there is an additional merge operator)
> with
> > > either local or partitioned states for storing the kev-value pairs (my
> > > current prototype uses local states). And it can either execute
> > operations
> > > eagerly (which would not provide deterministic results), or buffer up
> > > operations and execute them in order upon watermarks.
> > >
> > > As for use cases you can probably come up with many I will save that
> for
> > > now :D
> > >
> > > I have a prototype implementation here that can execute the operations
> > > described above (does not handle watermarks and time yet):
> > >
> > > https://github.com/gyfora/flink/tree/KVStore
> > > And also an example job:
> > >
> > >
> >
> https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java
> > >
> > > What do you think?
> > > If you like it I will work on writing tests and it still needs a lot of
> > > tweaking and refactoring. This might be something we want to include
> with
> > > the standard streaming libraries at one point.
> > >
> > > Cheers,
> > > Gyula
> >
>

Reply via email to