Hi,

Any suggestions on this could be achieved?

Thanks

On Thu, Sep 7, 2017 at 8:02 AM, Navneeth Krishnan <reachnavnee...@gmail.com>
wrote:

> Hi All,
>
> Any suggestions on this would really help.
>
> Thanks.
>
> On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan <
> reachnavnee...@gmail.com> wrote:
>
>> Hi All,
>>
>> I looked into an earlier email about the topic broadcast config through
>> connected stream and I couldn't find the conclusion.
>>
>> I can't do the below approach since I need the config to be published to
>> all operator instances but I need keyed state for external querying.
>>
>> streamToBeConfigured.connect(configMessageStream)
>> .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
>> .flatMap(new FunctionWithConfigurableState())
>> .addSink(...);
>>
>> One of the resolution I found in that mail chain was below. I can use
>> this to solve my issue but is this still the recommended approach?
>>
>> stream1.connect(stream2)
>>             .map(new MergeStreamsMapFunction()) // Holds transient state
>> of the last ConfigMessage and maps Stream1's data to a Tuple2<Stream1Data,
>> ConfigMessage>
>>             .keyBy(new SomeIdKeySelector())         // KeyBy Id to allow
>> for ValueStateDescriptors and semantically correct partitioning according
>> to business logic
>>             .flatMap(new StatefulFlatMapFunction()) // Save latest
>> received ConfigMessage-Value in ValueStateDescriptor here
>>             .addSink(...);
>>
>> Thanks,
>> Navneeth
>>
>
>

Reply via email to