Hi everyone,

Guozhang, the scope of my KIP is specifically about deploying structural
changes to existing applications, i.e. "upgrades". Sharing state between
different applications was not in the scope of my original proposal.

John's email has it exactly right, and I think this points to my KIP not
explaining the problem correctly. Any suggestions on how I could better
clarify the intent of my proposal in the KIP?

John, regarding your comments:

A) being difficult to clean up state after migrations. Unless I've missed
something, this shouldn't be a problem. Tasks are already internally aware
of which stores they own from the Topology structure, irrespective of where
on-disk the StateStore data is, they should be able to find it. I think the
only real issue with this approach is that it will require changing, most
likely, quite a bit of code. We'll need to separate the concept of "state
directory" from "task directory", (which will still be needed to store Task
meta-data, like .lock files). At the very least, I think significant
changes may need to be made to StateDirectory and StateManager, but I
haven't investigated in detail. Perhaps it would make sense to first
explore this approach with a prototype to see how invasive it would become?

B) My intent was always that this process would occur between
KafkaStreams.start() and threads actually starting, so that the migration
would occur safely. I'm not sure what kind of unexpected structural changes
could be detected by such a process; it might just be useful for general
validation. The main reasons I prefer (A) is that: 1) (B) requires
additional state meta-data, whereas (A) does not, which is an increase in
system complexity and; 2) I believe that (A) actually addresses a semantic
bug: specifically that StateStores are tightly coupled to Tasks, which is
unnecessary. Reducing this coupling would add no complexity, and
potentially simplify other processes in the future.

Regards,

Nick

On Sat, 5 Feb 2022 at 17:19, John Roesler <vvcep...@apache.org> wrote:

> Hello all,
>
> Thanks for the KIP, Nick!
>
> Based on this conversation, I think I might have misread the
> KIP, but it looks like Nick is just proposing a small fix to
> the existing compatability mechanism.
>
> Although we tell people to avoid changing topologies on the
> fly in general, we also tell them that, if they name all the
> persistent resources (stores and repartition nodes), then
> they can change the topologies without breaking anything
> (provided the change itself is logically sound).
>
> It seems like this KIP is just pointing out a flaw in that
> mechanism, that the (named) stores are kept inside the task
> directories, so if some change renumbers the tasks, Streams
> won't be able to find the local store files anymore. IIUC,
> the changelog topic will still be fine, so Streams would
> just allocate a new state directory in the new task name and
> restore the changelog into it.
>
> So, I think all this KIP is after is a way to preserve the
> local state files of a named store in the face of task
> renumbering. That's not to say that there's not some overlap
> with the NamedTopologies work, or that there's no value in
> being able to automatically reuse unnamed stores. But it
> probably makes sense to let Nick fix this one specific
> problem instead of coupling it to other large-scale
> engineering projects.
>
> Regarding the KIP itself:
>
> (A) is quite clean, but it does make it more challenging to
> clean up state when tasks migrate to other nodes. If that's
> the only problem, then I agree this is probably the best
> solution.
>
> (B) also makes a lot of sense to me, and I actually don't
> think it's a hack. It might also be useful for detecting
> when a topology has changed unexpectedly, for example. On
> the other hand, to safely move a state directory from one
> task directory to the other, we have to be sure no other
> thread is using either directory. To do that, we could
> either perform the operation in `KafkaStreams.start()`
> before any threads are started (we already know the topology
> at this point), or we can try to grab the directory locks on
> both tasks (but that sounds like a recipe for deadlock).
>
> In a nutshell, I'm supportive of this KIP, and I'd sugest we
> do a little more discovery on the implications of dropping
> the task level of the directory hierarchy before committing
> to A. And/or be a little more specific about how we can
> safely move state directories around before committing to B.
>
> Thanks again!
> -John
>
> On Fri, 2022-02-04 at 11:09 -0800, Guozhang Wang wrote:
> > Hi folks,
> >
> > I think the NamedTopology work would help with the convenience of the
> > solution for this KIP, but I feel it is not by itself the solution here.
> If
> > I'm not mistaken, the scope of this KIP is trying to tackle that,
> *assuming
> > the developer already knows* a new topology or part of the topology e.g.
> > like a state store of the topology does not change, then how to
> effectively
> > be able to reuse that part of the topology. Today it is very hard to
> reuse
> > part (say a state store, an internal topic) of a previous topology's
> > persistent state because:
> >
> > 1) the names of those persistent states are prefixed by the application
> id.
> > 2) the names of those persistent states are suffixed by the index, which
> > reflects the structure of the topology.
> > 3) the dir path of the persistent states are "prefixed" by the task id,
> > which is hence dependent on the sub-topology id.
> >
> > My quick thoughts are that 1) is easy to go around as long as users reuse
> > the same appId, 3) can be tackled with the help of the named topology but
> > each named topology can still be composed of multiple sub-topologies so
> > extra work is still needed to align the sub-topology ids, but we still
> need
> > something to tackle 2) here, which I was pondering between those options
> > and at the moment leaning towards option 2).
> >
> > Does that make sense to you?
> >
> >
> >
> >
> >
> >
> >
> >
> > On Fri, Feb 4, 2022 at 4:38 AM Nick Telford <nick.telf...@gmail.com>
> wrote:
> >
> > > Hi Guozhang, Sophie,
> > >
> > > Thanks for both taking the time to review my proposal.
> > >
> > > I did actually see the NamedTopology classes, and noted that they were
> > > internal. I didn't realise they are part of an intended solution to
> this
> > > problem, that's very interesting. I'm going to try to find some time to
> > > take a look at your experimental work so I can understand it a bit
> better.
> > >
> > > From your description, it sounds like the NamedTopology approach should
> > > enable users to solve this problem at the level that they wish to. My
> > > concern is that users will need to be explicit about how their
> Topology is
> > > structured, and will need to know in advance how their Topologies might
> > > evolve in the future in order to correctly break them up by name. For
> > > example, if a user mistakenly assumes one particular structure for
> their
> > > application, but later makes changes that implicitly cause an existing
> > > NamedTopology to have its internal Subtopologies re-ordered, the user
> will
> > > need to clear all the local state for that NamedTopology, at least.
> > >
> > > Unless I'm mistaken, StateStores are defined exclusively by the data in
> > > their changelogs. Even if you make changes to a Topology that requires
> > > clearing locally materialized state, the changelogs aren't reset[1],
> so the
> > > newly rebuilt state is materialized from the pre-existing values. Even
> if
> > > changes are made to the Subtopology that writes to the StateStore, the
> > > existing data in the changelog hasn't changed. The contents of the
> > > StateStore evolves. This is exactly the same as a traditional database
> > > table, where a client may evolve its behaviour to subtly change the
> > > semantics of the data written to the table, without deleting the
> existing
> > > data.
> > >
> > > If a user makes a change that means a different Subtopology reads from
> the
> > > StateStore, the semantics of, and the data in the store itself hasn't
> > > actually changed. The only reason we need to reset this local state at
> all
> > > is due to the conflict on-disk caused by the change in Subtopology
> ordinal.
> > > If local StateStore data was decoupled from Tasks, this conflict would
> > > disappear, and the application would work as expected.
> > >
> > > A Subtopology is defined by all connected topics, including changelogs,
> > > repartition topics, source topics and sink topics. Whereas a
> StateStore is
> > > defined exclusively by its changelog. So why do we tightly couple
> > > StateStore to Subtopology? This is my central argument for option A
> that I
> > > outlined in the KIP, and I would like to discuss it further, even if
> only
> > > to educate myself on why it's not possible :-D
> > >
> > > I still think the NamedTopology work is valuable, but more as a means
> to
> > > better organize large applications.
> > >
> > > Regards,
> > > Nick
> > >
> > > 1: The only exception to this I can think of is when a user decides to
> > > change the format (Serdes) or semantics of the data in the store, in
> which
> > > case they would need to do a full reset by also clearing the changelog
> > > topic for that store. Realistically, users that wish to do this would
> be
> > > better off just creating a new store and deleting the old one, so I
> don't
> > > think it's a case worth optimizing for.
> > >
> > > On Fri, 4 Feb 2022 at 08:22, Sophie Blee-Goldman
> > > <sop...@confluent.io.invalid> wrote:
> > >
> > > > Hey Nick,
> > > >
> > > > thanks for the KIP, this is definitely a much-needed feature. I've
> > > actually
> > > > been working on
> > > > a somewhat similar feature for a while now and have a good chunk of
> the
> > > > implementation
> > > > completed -- but so far it's only exposed via internal APIs and
> hasn't
> > > been
> > > > brought to a KIP
> > > > yet, as it's a fairly large and complex project and I wanted to get
> all
> > > the
> > > > details hashed out
> > > > before settling on a public API.
> > > >
> > > > For some sense of how complicated it's been, you can check out the
> JIRA
> > > > ticket we've been
> > > > filing PRs under -- there are already 25 PRs to the feature. See
> > > > KAFKA-12648
> > > > <https://issues.apache.org/jira/browse/KAFKA-12648>. You can check
> > > > out the new KafkaStreamsNamedTopologyWrapper class to see what the
> > > current
> > > > API looks like
> > > > -- I recommend taking a look to see if this might cover some or all
> of
> > > the
> > > > things you wanted
> > > > this KIP to do.
> > > >
> > > > For a high-level sketch, my work introduces the concept of a
> > > > "NamedTopology" (which will be
> > > > renamed to "ModularTopology" in the future, but is still referred to
> as
> > > > "named" in the codebase
> > > > so I'll keep using it for now) . Each KafkaStreams app can execute
> > > multiple
> > > > named topologies,
> > > > which are just regular topologies that are given a unique name. The
> > > > essential feature of a
> > > > named topology is that it can be dynamically added or removed without
> > > even
> > > > stopping the
> > > > application, much less resetting it. Technically a NamedTopology can
> be
> > > > composed or one
> > > > or more subtopologies, but if you want to be able to update the
> > > application
> > > > at a subtopology
> > > > level you can just name each  subtopology.
> > > >
> > > > So I believe the feature you want is actually already implemented,
> for
> > > the
> > > > most part -- it's currently
> > > > missing a few things that I just didn't bother to implement yet since
> > > I've
> > > > been focused
> > > > on getting a working, minimal POC that I could use for testing. (For
> > > > example it doesn't yet
> > > > support global state stores) But beyond that, the only remaining
> work to
> > > > make this feature
> > > > available is to settle on the APIs, get a KIP passed, and implement
> said
> > > > APIs.
> > > >
> > > > Would you be interested in helping out with the NamedTopology work
> so we
> > > > can turn it into a
> > > > a full-fledged public feature? I'm happy to let you take the lead on
> the
> > > > KIP, maybe by adapting
> > > > this one if you think it makes sense to do so. The NamedTopology
> feature
> > > is
> > > > somewhat larger
> > > > in scope than strictly necessary for your purposes, however, so you
> could
> > > > take on just a part
> > > > of it and leave anything beyond that for me to do as followup.
> > > >
> > > > By the way: one advantage of the NamedTopology feature is that we
> don't
> > > > have to worry about
> > > > any compatibility issues or upgrade/migration path -- it's opt-in by
> > > > definition. (Of course we would
> > > > recommend using it to all users, like we do with named operators)
> > > >
> > > > Let me know what you think and how you want to proceed from here -- I
> > > > wouldn't want you to
> > > > spend time re-implementing more or less the same thing, but I most
> likely
> > > > wasn't going to find time
> > > > to put out a KIP for the NamedTopology feature in the near future.
> If you
> > > > would be able to help
> > > > drive this to completion, we'd each have significantly less work to
> do to
> > > > achieve our goals :)
> > > >
> > > > Cheers,
> > > > Sophie
> > > >
> > > >
> > > > On Thu, Feb 3, 2022 at 6:12 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> > > >
> > > > > Hello Nick,
> > > > >
> > > > > Thanks for bringing this up and for the proposed options. I read
> though
> > > > > your writeup and here are some of my thoughts:
> > > > >
> > > > > 1) When changing the topology of Kafka Streams, the developer need
> to
> > > > first
> > > > > decide if the whole topology's persisted state (including both the
> > > state
> > > > > store as well as its changelogs, and the repartition topics, and
> the
> > > > > source/sink external topics) or part of the persisted state can be
> > > > reused.
> > > > > This involves two types of changes:
> > > > >
> > > > > a) structural change of the topology, such like a new processor
> node is
> > > > > added/removed, a new intermediate topic is added/removed etc.
> > > > > b) semantic change of a processor, such as a numerical filter node
> > > > changing
> > > > > its filter threshold etc.
> > > > >
> > > > > Today both of them are more or less determined by developers
> manually.
> > > > > However, though automatically determining on changes of type b) is
> hard
> > > > if
> > > > > not possible, automatic determining on the type of a) is doable
> since
> > > > it's
> > > > > depend on just the information of:
> > > > > * number of sub-topologies, and their orders (i.e. sequence of ids)
> > > > > * used state stores and changelog topics within the sub-topology
> > > > > * used repartition topics
> > > > > * etc
> > > > >
> > > > > So let's assume in the long run we can indeed automatically
> determine
> > > if
> > > > a
> > > > > topology or part of it (a sub-topology) is structurally the same,
> what
> > > we
> > > > > can do is to "translate" the old persisted state names to the
> > > > > new, isomorphic topology's names. Following this thought I'm
> leaning
> > > > > towards the direction of option B in your proposal. But since in
> this
> > > KIP
> > > > > automatic determining structural changes are out of the scope, I
> feel
> > > we
> > > > > can consider adding some sort of a "migration tool" from an old
> > > topology
> > > > to
> > > > > new topology by renaming all the persisted states (store dirs and
> > > names,
> > > > > topic names).
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Tue, Jan 25, 2022 at 9:10 AM Nick Telford <
> nick.telf...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > I'd like to start a discussion on Kafka Streams KIP-816 (
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-816%3A+Topology+changes+without+local+state+reset
> > > > > > )
> > > > > >
> > > > > > This KIP outlines 3 possible solutions to the problem, and I
> plan to
> > > > > > whittle this down to a definitive solution based on this
> discussion.
> > > > > >
> > > > > > Of the 3 proposed solutions:
> > > > > > * 'A' is probably the "correct" solution, but is also quite a
> > > > significant
> > > > > > change.
> > > > > > * 'B' is the least invasive, but most "hacky" solution.
> > > > > > * 'C' requires a change to the wire protocol and will likely have
> > > > > > unintended consequences. C is also the least complete solution,
> and
> > > > will
> > > > > > need significant additional work to make it work.
> > > > > >
> > > > > > Please let me know if the Motivation and Background sections need
> > > more
> > > > > > clarity.
> > > > > >
> > > > > > Regards,
> > > > > >
> > > > > > Nick Telford
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
> >
>
>

Reply via email to