Hi All,

I found the below example on how branching can be achieved with kafka
streams. I want to implement the same with processor API and the way I get
it is to have the same input topic  processed with multiple process
functions and output the data based on the filters. Is this the right
understanding?

final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_o365_user_activity =
builder.stream("o365_user_activity");KStream<String, String>[]
branches = source_o365_user_activity.branch(
      (key, value) -> (value.contains("Operation\":\"SharingSet") &&
value.contains("ItemType\":\"File")),
      (key, value) ->
(value.contains("Operation\":\"AddedToSecureLink") &&
value.contains("ItemType\":\"File")),
      (key, value) -> true
     );

branches[0].to("o365_sharing_set_by_date");
branches[1].to("o365_added_to_secure_link_by_date");
branches[2].to("o365_user_activity_by_date");


Also is the global state kept in memory or on rocksdb? I'm providing a
state store supplier with rocksdb as the option.

I believe the logging option is to rebuild the state from start when the
application goes down. For my scenario I just need the current value, in
that case should logging be enabled and for global state store it says
logging cannot be enabled. Also what is the caching option? The
documentation doesn't have enough details on these.

StoreBuilder<KeyValueStore<String, String>> globalState =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("test"),
Serdes.String(), Serdes.String())
        .withLoggingDisabled()
        .withCachingEnabled();


One last question, during auto scaling if I bring up a new node I would
assume some of the traffic will be reallocated to the new node. What will
happen to the rocksdb state that was accumulated in the already running
node?

Thanks, appreciate all the help.

Reply via email to