Hi Navneeth,

I think your plan to implement branching with the Processor API sounds
reasonable, but I'm
wondering why not just use the DSL branch operator and connect some
transformers? That
will let you mix the DSL and PAPI so you don't have to re-implement
anything that is already
in the DSL.

If you supplied a rocksdb store supplier, then yes the state will be held
in rocksdb and not in
memory. Note that this also the default, ie if you don't pass in any
supplier it will default to
using rocksdb. The logging option is, as you describe, to provide fault
tolerance and rebuild
the existing state from the changelog in case your app goes down. This will
also be used in
case you add another instance to scale out -- in that case the global state
will need to be rebuilt,
and any non-global state stores that are assigned to the new instance to
balance the cluster
will also need to be built. You almost definitely want to enable logging,
it will still contain only
the current value vs potentially containing nothing at all if you scale out
or instance restarts.
Global stores technically do not enable logging since it is unnecessary,
they just read from an
input topic so they can just re-read from that same topic rather than a
changelog which would
duplicate the same data.

The caching option differs slightly depending on whether you're using it in
the DSL or PAPI, but you
can read about it here:
https://kafka.apache.org/10/documentation/streams/developer-guide/memory-mgmt.html
Basically it adds an in-memory cache above the state stores to reduce disk
reads/writes. In the DSL,
it also reduces downstream traffic by buffering records with the same key.

On Fri, Oct 4, 2019 at 11:37 PM Navneeth Krishnan <reachnavnee...@gmail.com>
wrote:

> Hi All,
>
> I'm waiting for inputs to proceed with my POC and convert existing
> applications from flink to kafka streams. It would be really helpful if I
> can get some clarity on this.
>
> Thanks
>
> On Wed, Oct 2, 2019 at 7:05 PM Navneeth Krishnan <reachnavnee...@gmail.com
> >
> wrote:
>
> > Hello All,
> >
> > Any inputs?
> >
> > Thanks
> >
> > On Tue, Oct 1, 2019 at 12:40 PM Navneeth Krishnan <
> > reachnavnee...@gmail.com> wrote:
> >
> >> Hi All,
> >>
> >> I found the below example on how branching can be achieved with kafka
> >> streams. I want to implement the same with processor API and the way I
> get
> >> it is to have the same input topic  processed with multiple process
> >> functions and output the data based on the filters. Is this the right
> >> understanding?
> >>
> >> final StreamsBuilder builder = new StreamsBuilder();
> >> KStream<String, String> source_o365_user_activity =
> builder.stream("o365_user_activity");KStream<String, String>[] branches =
> source_o365_user_activity.branch(
> >>       (key, value) -> (value.contains("Operation\":\"SharingSet") &&
> value.contains("ItemType\":\"File")),
> >>       (key, value) ->
> (value.contains("Operation\":\"AddedToSecureLink") &&
> value.contains("ItemType\":\"File")),
> >>       (key, value) -> true
> >>      );
> >>
> >> branches[0].to("o365_sharing_set_by_date");
> >> branches[1].to("o365_added_to_secure_link_by_date");
> >> branches[2].to("o365_user_activity_by_date");
> >>
> >>
> >> Also is the global state kept in memory or on rocksdb? I'm providing a
> >> state store supplier with rocksdb as the option.
> >>
> >> I believe the logging option is to rebuild the state from start when the
> >> application goes down. For my scenario I just need the current value, in
> >> that case should logging be enabled and for global state store it says
> >> logging cannot be enabled. Also what is the caching option? The
> >> documentation doesn't have enough details on these.
> >>
> >> StoreBuilder<KeyValueStore<String, String>> globalState =
> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("test"),
> Serdes.String(), Serdes.String())
> >>         .withLoggingDisabled()
> >>         .withCachingEnabled();
> >>
> >>
> >> One last question, during auto scaling if I bring up a new node I would
> >> assume some of the traffic will be reallocated to the new node. What
> will
> >> happen to the rocksdb state that was accumulated in the already running
> >> node?
> >>
> >> Thanks, appreciate all the help.
> >>
> >>
>

Reply via email to