Hi All, I found the below example on how branching can be achieved with kafka streams. I want to implement the same with processor API and the way I get it is to have the same input topic processed with multiple process functions and output the data based on the filters. Is this the right understanding?
final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity");KStream<String, String>[] branches = source_o365_user_activity.branch( (key, value) -> (value.contains("Operation\":\"SharingSet") && value.contains("ItemType\":\"File")), (key, value) -> (value.contains("Operation\":\"AddedToSecureLink") && value.contains("ItemType\":\"File")), (key, value) -> true ); branches[0].to("o365_sharing_set_by_date"); branches[1].to("o365_added_to_secure_link_by_date"); branches[2].to("o365_user_activity_by_date"); Also is the global state kept in memory or on rocksdb? I'm providing a state store supplier with rocksdb as the option. I believe the logging option is to rebuild the state from start when the application goes down. For my scenario I just need the current value, in that case should logging be enabled and for global state store it says logging cannot be enabled. Also what is the caching option? The documentation doesn't have enough details on these. StoreBuilder<KeyValueStore<String, String>> globalState = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("test"), Serdes.String(), Serdes.String()) .withLoggingDisabled() .withCachingEnabled(); One last question, during auto scaling if I bring up a new node I would assume some of the traffic will be reallocated to the new node. What will happen to the rocksdb state that was accumulated in the already running node? Thanks, appreciate all the help.