Hi All, I'm waiting for inputs to proceed with my POC and convert existing applications from flink to kafka streams. It would be really helpful if I can get some clarity on this.
Thanks On Wed, Oct 2, 2019 at 7:05 PM Navneeth Krishnan <reachnavnee...@gmail.com> wrote: > Hello All, > > Any inputs? > > Thanks > > On Tue, Oct 1, 2019 at 12:40 PM Navneeth Krishnan < > reachnavnee...@gmail.com> wrote: > >> Hi All, >> >> I found the below example on how branching can be achieved with kafka >> streams. I want to implement the same with processor API and the way I get >> it is to have the same input topic processed with multiple process >> functions and output the data based on the filters. Is this the right >> understanding? >> >> final StreamsBuilder builder = new StreamsBuilder(); >> KStream<String, String> source_o365_user_activity = >> builder.stream("o365_user_activity");KStream<String, String>[] branches = >> source_o365_user_activity.branch( >> (key, value) -> (value.contains("Operation\":\"SharingSet") && >> value.contains("ItemType\":\"File")), >> (key, value) -> (value.contains("Operation\":\"AddedToSecureLink") && >> value.contains("ItemType\":\"File")), >> (key, value) -> true >> ); >> >> branches[0].to("o365_sharing_set_by_date"); >> branches[1].to("o365_added_to_secure_link_by_date"); >> branches[2].to("o365_user_activity_by_date"); >> >> >> Also is the global state kept in memory or on rocksdb? I'm providing a >> state store supplier with rocksdb as the option. >> >> I believe the logging option is to rebuild the state from start when the >> application goes down. For my scenario I just need the current value, in >> that case should logging be enabled and for global state store it says >> logging cannot be enabled. Also what is the caching option? The >> documentation doesn't have enough details on these. >> >> StoreBuilder<KeyValueStore<String, String>> globalState = >> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("test"), >> Serdes.String(), Serdes.String()) >> .withLoggingDisabled() >> .withCachingEnabled(); >> >> >> One last question, during auto scaling if I bring up a new node I would >> assume some of the traffic will be reallocated to the new node. What will >> happen to the rocksdb state that was accumulated in the already running >> node? >> >> Thanks, appreciate all the help. >> >>