Yes, if you have to restore from the changelog from scratch then this will
definitely impact
the application's performance. This is the current state of things for EOS
applications that use
some kind of local storage such as the in-memory or rocksdb state
stores.The point of EOS is
to be 100% correct, not to maximize performance -- it's a tradeoff and you
need to decide what
characteristics are most important for the specific application and use
case.

That said, obviously better performance is always a good thing when it's
possible to do without
sacrificing processing semantics. That's why I proposed to buffer updates;
if we can avoid dirtying
the store in the first place, then there's no need to wipe out all the
state and rebuild from the changelog
from scratch. So yes, this was intended as an alternative proposal which
would improve the performance
for any EOS application regardless of whether it uses local or remote
storage.

But as with all things, this has tradeoffs of its own: for one thing it's
probably a significantly larger
effort to implement, so if we want to correct the EOS + remote storage
situation quickly then this
approach would not be the best way to go. Also, buffering updates of course
requires additional
resources (ie storage and/or memory), so some users may actually prefer to
take an occasional
performance hit to keep their app lightweight.

Anyways, these are just some thoughts on how to improve the current
situation. Maybe there are
even more options to address this problem which haven't been considered
yet. Let us know if you
have a better idea :)

On Fri, Mar 19, 2021 at 11:50 PM Pushkar Deole <pdeole2...@gmail.com> wrote:

> Thanks Sophie... that answers my question, however still worried about some
> other aspects:
>
> 1. If redis is to be restored from changelog topic: what would happen if i
> have 3 stream applications and 1 instance went down ... will other 2
> instances halt until entire existing state from redis is wiped out and
> entire state is restored back from changelog topic? If so then it would
> have a significant performance hit especially when this happens during
> heavy traffic hours
>
> 2. Will #1 be solved by the 2nd alternative that you mentioned in the
> comment i.e 'An alternative is to just start buffering updates in-memory
> (or in rocksdb, this could be configurable) and then avoid dirtying the
> remote storage in the first place as we would only flush the data out to it
> during a commit'  It looks to me that this won't need rebuilding entire
> state store because changelog is disabled, and this alternative would avoid
> making the state store inconsistent in first place, thus saving wipe out
> and rebuild ? If so then this also doesn't need to halt other stream
> applications and would prove much more better approach from performance
> point of view. Is that correct?
>
> On Sat, Mar 20, 2021 at 2:25 AM Sophie Blee-Goldman
> <sop...@confluent.io.invalid> wrote:
>
> > Hey Pushkar, yes, the data will still be backed by a changelog topic
> unless
> > the
> > user explicitly disables logging for that state store. The fault
> tolerance
> > mechanism
> > of Kafka Streams is based on changelogging, therefore there are no
> > correctness
> > guarantees if you decide to disable it.
> >
> > That said, I'm guessing many users do in fact disable the changelog when
> > plugging
> > in a remote store with it's own fault tolerance guarantees -- is that
> what
> > you're getting
> > at? We could definitely build in better support for that case, as either
> an
> > additional
> > optimization on top of KAFKA-12475
> > <https://issues.apache.org/jira/browse/KAFKA-12475> or as an alternative
> > implementation to fix the
> > underlying EOS problem. Check out my latest comment on the ticket here
> > <
> >
> https://issues.apache.org/jira/browse/KAFKA-12475?focusedCommentId=17305191&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17305191
> > >
> >
> > Does that address your question?
> >
> > On Fri, Mar 19, 2021 at 10:14 AM Pushkar Deole <pdeole2...@gmail.com>
> > wrote:
> >
> > > Hello Sophie,
> > >
> > > may be i am missing something here, however can you let me know how a
> > redis
> > > based state store will be wiped off the inconsistent state in case
> stream
> > > application dies in the middle of processing e.g. stream application
> > > consumed from source topic, processed source event and saved state to
> > redis
> > > and before producing event on destination topic, the stream application
> > had
> > > error.
> > > If this occurs with a rocksDB or in-memory state store, it will be
> > rebuilt
> > > from changelog topic, however for redis state store, how it will wiped
> > off
> > > the state ? are we saying here that the data stored in redis will still
> > be
> > > backed by changelog topic and redis will be restored from backed topic
> in
> > > case of stream application error?
> > >
> > > On Tue, Mar 16, 2021 at 12:18 AM Sophie Blee-Goldman
> > > <sop...@confluent.io.invalid> wrote:
> > >
> > > > This certainly does seem like a flaw in the Streams API, although of
> > > course
> > > > Streams is just
> > > > in general not designed for use with remote anything (API calls,
> > stores,
> > > > etc)
> > > >
> > > > That said, I don't see any reason why we *couldn't* have better
> support
> > > for
> > > > remote state stores.
> > > > Note that there's currently no deleteAll method on the store
> interface,
> > > and
> > > > not all databases
> > > > necessarily support that. But we could add a default implementation
> > which
> > > > just calls delete(key)
> > > > on all keys in the state store, and for the RocksDB-based state
> stores
> > we
> > > > still wipe out the state
> > > > as usual (and recommend the same for any custom StateStore which is
> > local
> > > > rather than remote).
> > > > Obviously falling back on individual delete(key) operations for all
> the
> > > > data in the entire store will
> > > > have worse performance, but that's better than silently breaking EOS
> > when
> > > > deleteAll is not available
> > > > on a remote store.
> > > >
> > > > Would you be interested in doing a KIP for this? We should also file
> a
> > > JIRA
> > > > with the above explanation,
> > > > so that other users of remote storage are aware of this limitation.
> And
> > > > definitely document this somewhere
> > > >
> > > >
> > > > On Mon, Mar 15, 2021 at 8:04 AM Bruno Cadonna
> > <br...@confluent.io.invalid
> > > >
> > > > wrote:
> > > >
> > > > > Hi Alex,
> > > > >
> > > > > I guess wiping out the state directory is easier code-wise, faster,
> > > > > and/or at the time of development the developers did not design for
> > > > > remote state stores. But I do actually not know the exact reason.
> > > > >
> > > > > Off the top of my head, I do not know how to solve this for remote
> > > state
> > > > > stores. Using the uncaught exception handler is not good, because a
> > > > > computing node could fail without giving the JVM the opportunity to
> > > > > throw an exception.
> > > > >
> > > > > In your tests, try to increase the commit interval to a high value
> > and
> > > > > see if you get inconsistencies. You should get an inconsistency if
> > the
> > > > > state store maintains counts for keys and after the last commit
> > before
> > > > > the failure, the Streams app puts an event with a new key K with
> > value
> > > 1
> > > > > into the state store. After failover, Streams would put the same
> > event
> > > > > with key K again into the state store. If the state store deleted
> all
> > > of
> > > > > its data, Streams would put again value 1, but if the state store
> did
> > > > > not delete all data, Streams would put value 2 which is wrong
> because
> > > it
> > > > > would count the same event twice.
> > > > >
> > > > > Best,
> > > > > Bruno
> > > > >
> > > > >
> > > > > On 15.03.21 15:20, Alex Craig wrote:
> > > > > > Bruno,
> > > > > > Thanks for the info!  that makes sense.  Of course now I have
> more
> > > > > > questions.  :)  Do you know why this is being done outside of the
> > > state
> > > > > > store API?  I assume there are reasons why a "deleteAll()" type
> of
> > > > > function
> > > > > > wouldn't work, thereby allowing a state store to purge itself?
> And
> > > > maybe
> > > > > > more importantly, is there a way to achieve a similar behavior
> > with a
> > > > 3rd
> > > > > > party store?  I'm not sure if hooking into the uncaught exception
> > > > handler
> > > > > > might be a good way to purge/drop a state store in the event of a
> > > fatal
> > > > > > error?  I did setup a MongoDB state store recently as part of a
> POC
> > > and
> > > > > was
> > > > > > testing it with EOS enabled.  (forcing crashes to occur and
> > checking
> > > > that
> > > > > > the result of my aggregation was still accurate)  I was unable to
> > > cause
> > > > > > inconsistent data in the mongo store (which is good!), though of
> > > > course I
> > > > > > may just have been getting lucky.  Thanks again for your help,
> > > > > >
> > > > > > Alex
> > > > > >
> > > > > > On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole <
> > pdeole2...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > >> Bruno,
> > > > > >>
> > > > > >> i tried to explain this in 'kafka user's language through above
> > > > > mentioned
> > > > > >> scenario, hope i put it properly -:) and correct me if i am
> wrong
> > > > > >>
> > > > > >> On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole <
> > pdeole2...@gmail.com
> > > >
> > > > > >> wrote:
> > > > > >>
> > > > > >>> This is what I understand could be the issue with external
> state
> > > > store:
> > > > > >>>
> > > > > >>> kafka stream application consumes source topic, does
> processing,
> > > > stores
> > > > > >>> state to kafka state store (this is backed by topic) and before
> > > > > producing
> > > > > >>> event on destination topic, the application fails with some
> > issue.
> > > In
> > > > > >>> this case, the transaction has failed, so kafka guarantees
> either
> > > all
> > > > > or
> > > > > >>> none, means offset written to source topic, state written to
> > state
> > > > > store
> > > > > >>> topic, output produced on destination topic... all of these
> > happen
> > > or
> > > > > >> none
> > > > > >>> of these and in this failure scenario it is none of these.
> > > > > >>>
> > > > > >>> Assume you have redis state store, and you updated the state
> into
> > > > redis
> > > > > >>> and stream application failed. Now, you have source topic and
> > > > > destination
> > > > > >>> topic consistent i.e. offset is not committed to source topic
> and
> > > > > output
> > > > > >>> not produced on destination topic, but you redis state store is
> > > > > >>> inconsistent with that since it is external state store and
> kafka
> > > > can't
> > > > > >>> guarantee rollback ot state written there
> > > > > >>>
> > > > > >>> On Mon, Mar 15, 2021 at 6:30 PM Alex Craig <
> > alexcrai...@gmail.com>
> > > > > >> wrote:
> > > > > >>>
> > > > > >>>> " Another issue with 3rd party state stores could be violation
> > of
> > > > > >>>> exactly-once guarantee provided by kafka streams in the event
> > of a
> > > > > >> failure
> > > > > >>>> of streams application instance"
> > > > > >>>>
> > > > > >>>> I've heard this before but would love to know more about how a
> > > > custom
> > > > > >>>> state
> > > > > >>>> store would be at any greater risk than RocksDB as far as
> > > > exactly-once
> > > > > >>>> guarantees are concerned.  They all implement the same
> > interface,
> > > so
> > > > > as
> > > > > >>>> long as you're correctly implementing get(), put(), delete(),
> > > > flush(),
> > > > > >>>> etc,
> > > > > >>>> you should be fine right?  In other words, I don't think there
> > is
> > > > any
> > > > > >>>> special "exactly once magic" that is baked into the RocksDB
> > store
> > > > > >> code.  I
> > > > > >>>> could be wrong though so I'd love to hear people's thoughts,
> > > thanks,
> > > > > >>>>
> > > > > >>>> Alex C
> > > > > >>>>
> > > > > >>>> On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan <
> > > > > mpart...@hpe.com>
> > > > > >>>> wrote:
> > > > > >>>>
> > > > > >>>>> Thanks for the responses. In the worst case, I might have to
> > keep
> > > > > both
> > > > > >>>>> rocksdb for local store and keep an external store like
> Redis.
> > > > > >>>>>
> > > > > >>>>> -mohan
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> On 3/13/21, 8:53 PM, "Pushkar Deole" <pdeole2...@gmail.com>
> > > > wrote:
> > > > > >>>>>
> > > > > >>>>>      Another issue with 3rd party state stores could be
> > violation
> > > > of
> > > > > >>>>>      exactly-once guarantee provided by kafka streams in the
> > > event
> > > > > of a
> > > > > >>>>> failure
> > > > > >>>>>      of streams application instance.
> > > > > >>>>>      Kafka provides exactly once guarantee for consumer ->
> > > process
> > > > ->
> > > > > >>>>> produce
> > > > > >>>>>      through transactions and with the use of state store
> like
> > > > redis,
> > > > > >>>> this
> > > > > >>>>>      guarantee is weaker
> > > > > >>>>>
> > > > > >>>>>      On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang <
> > > > > wangg...@gmail.com
> > > > > >>>
> > > > > >>>>> wrote:
> > > > > >>>>>
> > > > > >>>>>      > Hello Mohan,
> > > > > >>>>>      >
> > > > > >>>>>      > I think what you had in mind works with Redis, since
> it
> > > is a
> > > > > >>>> remote
> > > > > >>>>> state
> > > > > >>>>>      > store engine, it does not have the co-partitioning
> > > > > requirements
> > > > > >> as
> > > > > >>>>> local
> > > > > >>>>>      > state stores.
> > > > > >>>>>      >
> > > > > >>>>>      > One thing you'd need to tune KS though is that with
> > remote
> > > > > >> stores,
> > > > > >>>>> the
> > > > > >>>>>      > processing latency may be larger, and since Kafka
> > Streams
> > > > > >> process
> > > > > >>>> all
> > > > > >>>>>      > records of a single partition in order, synchronously,
> > you
> > > > may
> > > > > >>>> need
> > > > > >>>>> to tune
> > > > > >>>>>      > the poll interval configs etc to make sure KS would
> stay
> > > in
> > > > > the
> > > > > >>>>> consumer
> > > > > >>>>>      > group and not trigger unnecessary rebalances.
> > > > > >>>>>      >
> > > > > >>>>>      > Guozhang
> > > > > >>>>>      >
> > > > > >>>>>      > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan <
> > > > > >>>>> mpart...@hpe.com>
> > > > > >>>>>      > wrote:
> > > > > >>>>>      >
> > > > > >>>>>      > > Hi,
> > > > > >>>>>      > >
> > > > > >>>>>      > > I have a use case where messages come in with some
> key
> > > > gets
> > > > > >>>>> assigned some
> > > > > >>>>>      > > partition and the state gets created. Later, key
> > changes
> > > > > (but
> > > > > >>>> still
> > > > > >>>>>      > > contains the old key in the message) and gets sent
> to
> > a
> > > > > >>>> different
> > > > > >>>>>      > > partition. I want to be able to grab the old state
> > using
> > > > the
> > > > > >> old
> > > > > >>>>> key
> > > > > >>>>>      > before
> > > > > >>>>>      > > creating the new state on this instance. Redis as a
> > > state
> > > > > >> store
> > > > > >>>>> makes it
> > > > > >>>>>      > > easy to implement this where I can simply do a
> lookup
> > > > before
> > > > > >>>>> creating the
> > > > > >>>>>      > > state. I see an implementation here :
> > > > > >>>>>      > >
> > > > > >>>>>      >
> > > > > >>>>>
> > > > > >>>>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks
> > > > > >>>>>      > >
> > > > > >>>>>      > > Has anyone tried this ? Any caveats.
> > > > > >>>>>      > >
> > > > > >>>>>      > > Thanks
> > > > > >>>>>      > > Mohan
> > > > > >>>>>      > >
> > > > > >>>>>      > >
> > > > > >>>>>      >
> > > > > >>>>>      > --
> > > > > >>>>>      > -- Guozhang
> > > > > >>>>>      >
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to