Hello All, Any inputs?
Thanks On Tue, Oct 1, 2019 at 12:40 PM Navneeth Krishnan <reachnavnee...@gmail.com> wrote: > Hi All, > > I found the below example on how branching can be achieved with kafka > streams. I want to implement the same with processor API and the way I get > it is to have the same input topic processed with multiple process > functions and output the data based on the filters. Is this the right > understanding? > > final StreamsBuilder builder = new StreamsBuilder(); > KStream<String, String> source_o365_user_activity = > builder.stream("o365_user_activity");KStream<String, String>[] branches = > source_o365_user_activity.branch( > (key, value) -> (value.contains("Operation\":\"SharingSet") && > value.contains("ItemType\":\"File")), > (key, value) -> (value.contains("Operation\":\"AddedToSecureLink") && > value.contains("ItemType\":\"File")), > (key, value) -> true > ); > > branches[0].to("o365_sharing_set_by_date"); > branches[1].to("o365_added_to_secure_link_by_date"); > branches[2].to("o365_user_activity_by_date"); > > > Also is the global state kept in memory or on rocksdb? I'm providing a > state store supplier with rocksdb as the option. > > I believe the logging option is to rebuild the state from start when the > application goes down. For my scenario I just need the current value, in > that case should logging be enabled and for global state store it says > logging cannot be enabled. Also what is the caching option? The > documentation doesn't have enough details on these. > > StoreBuilder<KeyValueStore<String, String>> globalState = > Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("test"), > Serdes.String(), Serdes.String()) > .withLoggingDisabled() > .withCachingEnabled(); > > > One last question, during auto scaling if I bring up a new node I would > assume some of the traffic will be reallocated to the new node. What will > happen to the rocksdb state that was accumulated in the already running > node? > > Thanks, appreciate all the help. > >