Hi Navneeth, I think your plan to implement branching with the Processor API sounds reasonable, but I'm wondering why not just use the DSL branch operator and connect some transformers? That will let you mix the DSL and PAPI so you don't have to re-implement anything that is already in the DSL.
If you supplied a rocksdb store supplier, then yes the state will be held in rocksdb and not in memory. Note that this also the default, ie if you don't pass in any supplier it will default to using rocksdb. The logging option is, as you describe, to provide fault tolerance and rebuild the existing state from the changelog in case your app goes down. This will also be used in case you add another instance to scale out -- in that case the global state will need to be rebuilt, and any non-global state stores that are assigned to the new instance to balance the cluster will also need to be built. You almost definitely want to enable logging, it will still contain only the current value vs potentially containing nothing at all if you scale out or instance restarts. Global stores technically do not enable logging since it is unnecessary, they just read from an input topic so they can just re-read from that same topic rather than a changelog which would duplicate the same data. The caching option differs slightly depending on whether you're using it in the DSL or PAPI, but you can read about it here: https://kafka.apache.org/10/documentation/streams/developer-guide/memory-mgmt.html Basically it adds an in-memory cache above the state stores to reduce disk reads/writes. In the DSL, it also reduces downstream traffic by buffering records with the same key. On Fri, Oct 4, 2019 at 11:37 PM Navneeth Krishnan <reachnavnee...@gmail.com> wrote: > Hi All, > > I'm waiting for inputs to proceed with my POC and convert existing > applications from flink to kafka streams. It would be really helpful if I > can get some clarity on this. > > Thanks > > On Wed, Oct 2, 2019 at 7:05 PM Navneeth Krishnan <reachnavnee...@gmail.com > > > wrote: > > > Hello All, > > > > Any inputs? > > > > Thanks > > > > On Tue, Oct 1, 2019 at 12:40 PM Navneeth Krishnan < > > reachnavnee...@gmail.com> wrote: > > > >> Hi All, > >> > >> I found the below example on how branching can be achieved with kafka > >> streams. I want to implement the same with processor API and the way I > get > >> it is to have the same input topic processed with multiple process > >> functions and output the data based on the filters. Is this the right > >> understanding? > >> > >> final StreamsBuilder builder = new StreamsBuilder(); > >> KStream<String, String> source_o365_user_activity = > builder.stream("o365_user_activity");KStream<String, String>[] branches = > source_o365_user_activity.branch( > >> (key, value) -> (value.contains("Operation\":\"SharingSet") && > value.contains("ItemType\":\"File")), > >> (key, value) -> > (value.contains("Operation\":\"AddedToSecureLink") && > value.contains("ItemType\":\"File")), > >> (key, value) -> true > >> ); > >> > >> branches[0].to("o365_sharing_set_by_date"); > >> branches[1].to("o365_added_to_secure_link_by_date"); > >> branches[2].to("o365_user_activity_by_date"); > >> > >> > >> Also is the global state kept in memory or on rocksdb? I'm providing a > >> state store supplier with rocksdb as the option. > >> > >> I believe the logging option is to rebuild the state from start when the > >> application goes down. For my scenario I just need the current value, in > >> that case should logging be enabled and for global state store it says > >> logging cannot be enabled. Also what is the caching option? The > >> documentation doesn't have enough details on these. > >> > >> StoreBuilder<KeyValueStore<String, String>> globalState = > Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("test"), > Serdes.String(), Serdes.String()) > >> .withLoggingDisabled() > >> .withCachingEnabled(); > >> > >> > >> One last question, during auto scaling if I bring up a new node I would > >> assume some of the traffic will be reallocated to the new node. What > will > >> happen to the rocksdb state that was accumulated in the already running > >> node? > >> > >> Thanks, appreciate all the help. > >> > >> >