Hi Ufuk,

Thanks for your response. Unfortunately that does not work.
Having ValueStateDescriptors in the CoFlatMap on the connected Stream
requires a keyBy on the connected Stream.

Another solution I can think of would be this:

stream1.connect(stream2)
            .map(new MergeStreamsMapFunction()) // Holds transient state of
the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data,
ConfigMessage>
            .keyBy(new SomeIdKeySelector())         // KeyBy Id to allow
for ValueStateDescriptors and semantically correct partitioning according
to business logic
            .flatMap(new StatefulFlatMapFunction()) // Save latest received
ConfigMessage-Value in ValueStateDescriptor here
            .addSink(...);

I have yet to test this.
This seems a little complicated but it might work?

Best Regards,

Julian

2016-10-26 16:09 GMT+02:00 Ufuk Celebi <u...@apache.org>:

> Does the following work?
>
> stream1.keyBy().connect(stream2.broadcast())
>
> On Wed, Oct 26, 2016 at 2:01 PM, Julian Bauß <julian.ba...@gmail.com>
> wrote:
> > Hello Everybody,
> >
> > I'm currently trying to change the state of a CoFlatMapFunction with the
> > help of a connected configuration-stream. The code looks something like
> > this.
> >
> > streamToBeConfigured.connect(configMessageStream)
> > .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
> > .flatMap(new FunctionWithConfigurableState())
> > .addSink(...);
> >
> > The Stream with the actual functionality is keyedBy an Id but the
> > ConfigMessages don't contain any Id to key them by. They are just
> > "key=value"-Strings that should be broadcasted to all instances of the
> > CoFlatMapFunction() regardless of what Id they are keyed by.
> >
> > Is there any way to do that?
> >
> > Best Regards,
> >
> > Julian
>

Reply via email to