Thanks Max ^^
On Wed, Oct 28, 2015 at 8:41 PM, Maximilian Michels <[email protected]> wrote:
> Oups, forgot the mapper :)
>
> static class StatefulMapper extends RichMapFunction<Tuple2<Long,
> Long>, Tuple2<Long, Long>> {
>
> private OperatorState<Long> counter;
>
> @Override
> public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws
> Exception {
> System.out.println("Key: " + value.f0 +
> " Previous state was: "+ counter.value() +
> " Update state to: "+ value.f1);
> counter.update(value.f1);
> return value;
> }
>
> @Override
> public void open(Configuration config) {
> counter = getRuntimeContext().getKeyValueState("mystate",
> Long.class, -1L);
> }
> }
>
>
>
> On Wed, Oct 28, 2015 at 7:39 PM, Maximilian Michels <[email protected]>
> wrote:
>
> > Hi Andra,
> >
> > What you thought of turns out to be one of the core features of the Flink
> > streaming API. Flink's operators support state. State can be partitioned
> by
> > the the key using keyBy(field).
> >
> > You may use a MapFunction to achieve what you wanted like so:
> >
> > public static void main(String[] args) throws Exception {
> >
> > final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >
> > env.fromElements(new Tuple2<>(1L, 3L),
> > new Tuple2<>(2L, 5L),
> > new Tuple2<>(6L, 7L),
> > new Tuple2<>(1L, 5L))
> >
> > .keyBy(0)
> >
> > .map(new StatefulMapper())
> >
> > .print();
> >
> > env.execute();
> >
> > }
> >
> > The output is the following on my machine (discarded the output of the
> > print):
> >
> > Key: 2 Previous state was: -1 Update state to: 5
> > Key: 1 Previous state was: -1 Update state to: 3
> > Key: 6 Previous state was: -1 Update state to: 7
> > Key: 1 Previous state was: 3 Update state to: 5
> >
> >
> > Cheers,
> > Max
> >
> >
> >
> > On Wed, Oct 28, 2015 at 4:30 PM, Andra Lungu <[email protected]>
> > wrote:
> >
> >> Hey guys!
> >>
> >> I've been thinking about this one today:
> >>
> >> Say you have a stream of data in the form of (id, value) - This will
> >> evidently be a DataStream of Tuple2.
> >> I need to cache this data in some sort of static stream (perhaps even a
> >> DataSet).
> >> Then, if in the input stream, I see an id that was previously stored, I
> >> should update its value with the most recent entry.
> >>
> >> On an example:
> >>
> >> 1, 3
> >> 2, 5
> >> 6, 7
> >> 1, 5
> >>
> >> The value cached for the id 1 should be 5.
> >>
> >> How would you recommend caching the data? And what would be used for the
> >> update? A join function?
> >>
> >> As far as I see things, you cannot really combine DataSets with
> >> DataStreams
> >> although a DataSet is, in essence, just a finite stream.
> >> If this can indeed be done, some pseudocode would be nice :)
> >>
> >> Thanks!
> >> Andra
> >>
> >
> >
>