Hi Guozhang, Sophie,

Thanks for both taking the time to review my proposal.

I did actually see the NamedTopology classes, and noted that they were
internal. I didn't realise they are part of an intended solution to this
problem, that's very interesting. I'm going to try to find some time to
take a look at your experimental work so I can understand it a bit better.

>From your description, it sounds like the NamedTopology approach should
enable users to solve this problem at the level that they wish to. My
concern is that users will need to be explicit about how their Topology is
structured, and will need to know in advance how their Topologies might
evolve in the future in order to correctly break them up by name. For
example, if a user mistakenly assumes one particular structure for their
application, but later makes changes that implicitly cause an existing
NamedTopology to have its internal Subtopologies re-ordered, the user will
need to clear all the local state for that NamedTopology, at least.

Unless I'm mistaken, StateStores are defined exclusively by the data in
their changelogs. Even if you make changes to a Topology that requires
clearing locally materialized state, the changelogs aren't reset[1], so the
newly rebuilt state is materialized from the pre-existing values. Even if
changes are made to the Subtopology that writes to the StateStore, the
existing data in the changelog hasn't changed. The contents of the
StateStore evolves. This is exactly the same as a traditional database
table, where a client may evolve its behaviour to subtly change the
semantics of the data written to the table, without deleting the existing

If a user makes a change that means a different Subtopology reads from the
StateStore, the semantics of, and the data in the store itself hasn't
actually changed. The only reason we need to reset this local state at all
is due to the conflict on-disk caused by the change in Subtopology ordinal.
If local StateStore data was decoupled from Tasks, this conflict would
disappear, and the application would work as expected.

A Subtopology is defined by all connected topics, including changelogs,
repartition topics, source topics and sink topics. Whereas a StateStore is
defined exclusively by its changelog. So why do we tightly couple
StateStore to Subtopology? This is my central argument for option A that I
outlined in the KIP, and I would like to discuss it further, even if only
to educate myself on why it's not possible :-D

I still think the NamedTopology work is valuable, but more as a means to
better organize large applications.


1: The only exception to this I can think of is when a user decides to
change the format (Serdes) or semantics of the data in the store, in which
case they would need to do a full reset by also clearing the changelog
topic for that store. Realistically, users that wish to do this would be
better off just creating a new store and deleting the old one, so I don't
think it's a case worth optimizing for.

On Fri, 4 Feb 2022 at 08:22, Sophie Blee-Goldman
<sop...@confluent.io.invalid> wrote:

> Hey Nick,
> thanks for the KIP, this is definitely a much-needed feature. I've actually
> been working on
> a somewhat similar feature for a while now and have a good chunk of the
> implementation
> completed -- but so far it's only exposed via internal APIs and hasn't been
> brought to a KIP
> yet, as it's a fairly large and complex project and I wanted to get all the
> details hashed out
> before settling on a public API.
> For some sense of how complicated it's been, you can check out the JIRA
> ticket we've been
> filing PRs under -- there are already 25 PRs to the feature. See
> KAFKA-12648
> <https://issues.apache.org/jira/browse/KAFKA-12648>. You can check
> out the new KafkaStreamsNamedTopologyWrapper class to see what the current
> API looks like
> -- I recommend taking a look to see if this might cover some or all of the
> things you wanted
> this KIP to do.
> For a high-level sketch, my work introduces the concept of a
> "NamedTopology" (which will be
> renamed to "ModularTopology" in the future, but is still referred to as
> "named" in the codebase
> so I'll keep using it for now) . Each KafkaStreams app can execute multiple
> named topologies,
> which are just regular topologies that are given a unique name. The
> essential feature of a
> named topology is that it can be dynamically added or removed without even
> stopping the
> application, much less resetting it. Technically a NamedTopology can be
> composed or one
> or more subtopologies, but if you want to be able to update the application
> at a subtopology
> level you can just name each  subtopology.
> So I believe the feature you want is actually already implemented, for the
> most part -- it's currently
> missing a few things that I just didn't bother to implement yet since I've
> been focused
> on getting a working, minimal POC that I could use for testing. (For
> example it doesn't yet
> support global state stores) But beyond that, the only remaining work to
> make this feature
> available is to settle on the APIs, get a KIP passed, and implement said
> APIs.
> Would you be interested in helping out with the NamedTopology work so we
> can turn it into a
> a full-fledged public feature? I'm happy to let you take the lead on the
> KIP, maybe by adapting
> this one if you think it makes sense to do so. The NamedTopology feature is
> somewhat larger
> in scope than strictly necessary for your purposes, however, so you could
> take on just a part
> of it and leave anything beyond that for me to do as followup.
> By the way: one advantage of the NamedTopology feature is that we don't
> have to worry about
> any compatibility issues or upgrade/migration path -- it's opt-in by
> definition. (Of course we would
> recommend using it to all users, like we do with named operators)
> Let me know what you think and how you want to proceed from here -- I
> wouldn't want you to
> spend time re-implementing more or less the same thing, but I most likely
> wasn't going to find time
> to put out a KIP for the NamedTopology feature in the near future. If you
> would be able to help
> drive this to completion, we'd each have significantly less work to do to
> achieve our goals :)
> Cheers,
> Sophie
> On Thu, Feb 3, 2022 at 6:12 PM Guozhang Wang <wangg...@gmail.com> wrote:
> > Hello Nick,
> >
> > Thanks for bringing this up and for the proposed options. I read though
> > your writeup and here are some of my thoughts:
> >
> > 1) When changing the topology of Kafka Streams, the developer need to
> first
> > decide if the whole topology's persisted state (including both the state
> > store as well as its changelogs, and the repartition topics, and the
> > source/sink external topics) or part of the persisted state can be
> reused.
> > This involves two types of changes:
> >
> > a) structural change of the topology, such like a new processor node is
> > added/removed, a new intermediate topic is added/removed etc.
> > b) semantic change of a processor, such as a numerical filter node
> changing
> > its filter threshold etc.
> >
> > Today both of them are more or less determined by developers manually.
> > However, though automatically determining on changes of type b) is hard
> if
> > not possible, automatic determining on the type of a) is doable since
> it's
> > depend on just the information of:
> > * number of sub-topologies, and their orders (i.e. sequence of ids)
> > * used state stores and changelog topics within the sub-topology
> > * used repartition topics
> > * etc
> >
> > So let's assume in the long run we can indeed automatically determine if
> a
> > topology or part of it (a sub-topology) is structurally the same, what we
> > can do is to "translate" the old persisted state names to the
> > new, isomorphic topology's names. Following this thought I'm leaning
> > towards the direction of option B in your proposal. But since in this KIP
> > automatic determining structural changes are out of the scope, I feel we
> > can consider adding some sort of a "migration tool" from an old topology
> to
> > new topology by renaming all the persisted states (store dirs and names,
> > topic names).
> >
> >
> > Guozhang
> >
> >
> > On Tue, Jan 25, 2022 at 9:10 AM Nick Telford <nick.telf...@gmail.com>
> > wrote:
> >
> > > Hi everyone,
> > >
> > > I'd like to start a discussion on Kafka Streams KIP-816 (
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-816%3A+Topology+changes+without+local+state+reset
> > > )
> > >
> > > This KIP outlines 3 possible solutions to the problem, and I plan to
> > > whittle this down to a definitive solution based on this discussion.
> > >
> > > Of the 3 proposed solutions:
> > > * 'A' is probably the "correct" solution, but is also quite a
> significant
> > > change.
> > > * 'B' is the least invasive, but most "hacky" solution.
> > > * 'C' requires a change to the wire protocol and will likely have
> > > unintended consequences. C is also the least complete solution, and
> will
> > > need significant additional work to make it work.
> > >
> > > Please let me know if the Motivation and Background sections need more
> > > clarity.
> > >
> > > Regards,
> > >
> > > Nick Telford
> > >
> >
> >
> > --
> > -- Guozhang
> >

Reply via email to