Hi All,

I'm waiting for inputs to proceed with my POC and convert existing
applications from flink to kafka streams. It would be really helpful if I
can get some clarity on this.

Thanks

On Wed, Oct 2, 2019 at 7:05 PM Navneeth Krishnan <reachnavnee...@gmail.com>
wrote:

> Hello All,
>
> Any inputs?
>
> Thanks
>
> On Tue, Oct 1, 2019 at 12:40 PM Navneeth Krishnan <
> reachnavnee...@gmail.com> wrote:
>
>> Hi All,
>>
>> I found the below example on how branching can be achieved with kafka
>> streams. I want to implement the same with processor API and the way I get
>> it is to have the same input topic  processed with multiple process
>> functions and output the data based on the filters. Is this the right
>> understanding?
>>
>> final StreamsBuilder builder = new StreamsBuilder();
>> KStream<String, String> source_o365_user_activity = 
>> builder.stream("o365_user_activity");KStream<String, String>[] branches = 
>> source_o365_user_activity.branch(
>>       (key, value) -> (value.contains("Operation\":\"SharingSet") && 
>> value.contains("ItemType\":\"File")),
>>       (key, value) -> (value.contains("Operation\":\"AddedToSecureLink") && 
>> value.contains("ItemType\":\"File")),
>>       (key, value) -> true
>>      );
>>
>> branches[0].to("o365_sharing_set_by_date");
>> branches[1].to("o365_added_to_secure_link_by_date");
>> branches[2].to("o365_user_activity_by_date");
>>
>>
>> Also is the global state kept in memory or on rocksdb? I'm providing a
>> state store supplier with rocksdb as the option.
>>
>> I believe the logging option is to rebuild the state from start when the
>> application goes down. For my scenario I just need the current value, in
>> that case should logging be enabled and for global state store it says
>> logging cannot be enabled. Also what is the caching option? The
>> documentation doesn't have enough details on these.
>>
>> StoreBuilder<KeyValueStore<String, String>> globalState = 
>> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("test"), 
>> Serdes.String(), Serdes.String())
>>         .withLoggingDisabled()
>>         .withCachingEnabled();
>>
>>
>> One last question, during auto scaling if I bring up a new node I would
>> assume some of the traffic will be reallocated to the new node. What will
>> happen to the rocksdb state that was accumulated in the already running
>> node?
>>
>> Thanks, appreciate all the help.
>>
>>

Reply via email to