That's a very nice application of the Stream API and partitioned state. :D

I think we should run some tests on a cluster  based on this to see what
kind of throughput the partitioned state system can handle and also how it
behaves with larger numbers of keys. The KVStore is just an interface and
the really heavy lifting is done by the state system so this would be a
good test for it.


On Tue, 8 Sep 2015 at 15:10 Gyula Fóra <gyula.f...@gmail.com> wrote:

> @Stephan:
>
> Technically speaking this is really just a partitioned key-value state and
> a fancy operator executing special operations on this state.
>
> From the user's perspective though this is something hard to implement. If
> you want to share state between two stream for instance this way (getting
> updates from one stream and enriching the other one) you would probably use
> a connected datastream and custom implement the Key-value store logic. But
> once you have one(or more) update stream and many get streams this
> implementation will not work. So either the user end up replicating the
> whole state in multiple connected operators, or custom implement some
> inefficient wrapper class to take care of all the put/get operations.
>
> The Idea behind this is to give a very simple abstraction for this type of
> processing that uses the flink runtime efficiently instead of relying on
> custom implementations.
>
> Let me give you a stupid example:
>
> You receive Temperature data in the form of (city, temperature), and you
> are computing a rolling avg for each city.
> Now you have 2 other incoming streams: first is a stream of some other info
> about the city let's say population (city, population) and you want to
> combine it with the last known avg temperature to produce (city, temp, pop)
> triplets. The second stream is a pair of cities (city,city) and you are
> interested in the difference of the temperature.
>
> For enriching the (city, pop) to (city,temp,pop) you would probably use a
> CoFlatMap and store the last known rolling avg as state. For computing the
> (city,city) temperature difference it is a little more difficult, as you
> need to get the temperature for both cities then combine in a second
> operator. If you don't want to replicate your state, you have to combine
> these two problems to a common wrapper type and execute them on a same
> operator which will keep the avg state.
>
> With the KVStore abstraction this is very simple:
> you create a KVStore<City, Temp>
> For enriching you use kvStore.getWithKeySelector() which will give you
> ((cit,pop), temp) pairs and you are done. For computing the difference, you
> can use kvStore.multiget(...) and get for the 2 cities at the same type.
> The kv store will abstract away the getting of the 2 keys separately and
> merging them so it will return [(city1, t1), (city2,t2)].
>
> This might be slightly artificial example but I think it makes the point.
> Implementing these jobs efficiently is not trivial for the users but I
> think it is a very common problem.
>
> Stephan Ewen <se...@apache.org> ezt írta (időpont: 2015. szept. 8., K,
> 14:53):
>
> > @Gyula
> >
> > Can you explain a bit what this KeyValue store would do more then the
> > partitioned key/value state?
> >
> > On Tue, Sep 8, 2015 at 2:49 PM, Gábor Gévay <gga...@gmail.com> wrote:
> >
> > > Hello,
> > >
> > > As for use cases, in my old job at Ericsson we were building a
> > > streaming system that was processing data from telephone networks, and
> > > it was using key-value stores a LOT. For example, keeping track of
> > > various state info of the users (which cell are they currently
> > > connected to, what bearers do they have, ...); mapping from IDs of
> > > users in one subsystem of the telephone network to the IDs of the same
> > > users in an other subsystem; mapping from IDs of phone calls to lists
> > > of IDs of participating users; etc.
> > > So I imagine they would like this a lot. (At least, if they were
> > > considering moving to Flink :))
> > >
> > > Best,
> > > Gabor
> > >
> > >
> > >
> > >
> > > 2015-09-08 13:35 GMT+02:00 Gyula Fóra <gyf...@apache.org>:
> > > > Hey All,
> > > >
> > > > The last couple of days I have been playing around with the idea of
> > > > building a streaming key-value store abstraction using stateful
> > streaming
> > > > operators that can be used within Flink Streaming programs
> seamlessly.
> > > >
> > > > Operations executed on this KV store would be fault tolerant as it
> > > > integrates with the checkpointing mechanism, and if we add timestamps
> > to
> > > > each put/get/... operation we can use the watermarks to create fully
> > > > deterministic results. This functionality is very useful for many
> > > > applications, and is very hard to implement properly with some
> > dedicates
> > > kv
> > > > store.
> > > >
> > > > The KVStore abstraction could look as follows:
> > > >
> > > > KVStore<K,V> store = new KVStore<>;
> > > >
> > > > Operations:
> > > >
> > > > store.put(DataStream<Tuple2<K,V>>)
> > > > store.get(DataStream<K>) -> DataStream<KV<K,V>>
> > > > store.remove(DataStream<K>) -> DataStream<KV<K,V>>
> > > > store.multiGet(DataStream<K[]>) -> DataStream<KV<K,V>[]>
> > > > store.getWithKeySelector(DataStream<X>, KeySelector<X,K>) ->
> > > > DataStream<KV<X,V>[]>
> > > >
> > > > For the resulting streams I used a special KV abstraction which let's
> > us
> > > > return null values.
> > > >
> > > > The implementation uses a simple streaming operator for executing
> most
> > of
> > > > the operations (for multi get there is an additional merge operator)
> > with
> > > > either local or partitioned states for storing the kev-value pairs
> (my
> > > > current prototype uses local states). And it can either execute
> > > operations
> > > > eagerly (which would not provide deterministic results), or buffer up
> > > > operations and execute them in order upon watermarks.
> > > >
> > > > As for use cases you can probably come up with many I will save that
> > for
> > > > now :D
> > > >
> > > > I have a prototype implementation here that can execute the
> operations
> > > > described above (does not handle watermarks and time yet):
> > > >
> > > > https://github.com/gyfora/flink/tree/KVStore
> > > > And also an example job:
> > > >
> > > >
> > >
> >
> https://github.com/gyfora/flink/blob/KVStore/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/KVStreamExample.java
> > > >
> > > > What do you think?
> > > > If you like it I will work on writing tests and it still needs a lot
> of
> > > > tweaking and refactoring. This might be something we want to include
> > with
> > > > the standard streaming libraries at one point.
> > > >
> > > > Cheers,
> > > > Gyula
> > >
> >
>

Reply via email to