Thanks Matthias!
MJS14: I'd lean toward deferring this. TopologyDescription doesn't
expose the read-only-source distinction today — Topology#describe()
for an addReadOnlyStateStore call returns a regular Source + Processor
pair, and the source-doubles-as-changelog relationship is a
Streams-runtime detail not surfaced in the public API. Surfacing it on
the wire really wants the corresponding accessor on
TopologyDescription first, which feels like its own scope. A plugin
can also derive it today by cross-referencing source topics against
state-store changelog topics. If demand emerges, a future version bump
can add a new nodeType cleanly.
MJS15: I'd lean toward keeping the named Source / Processor fields.
The Streams TopologyDescription.GlobalStore which is already a public
interface already has a fixed shape — exactly one source and one
processor — and the wire schema mirrors that. Switching to
[]TopologyNode would just move that invariant from the schema to a
validation check (must contain exactly one SOURCE and one PROCESSOR),
which loses the documentation value without buying actual flexibility.
The Subtopology analogy is structurally weaker than it looks:
subtopologies genuinely vary in shape, global stores don't. If
GlobalStore semantically extends later (which is not on the horizon
right now), that would be a coordinated KIP updating both Streams'
public API and the wire schema together.
MJS16: Okay, I made that clearer. It's ultimately a contract
violation, but specifying the behavior in the KIP makes sense. The
wrappers catch a synchronous throw, log the cause server-side, and
complete the future with
StreamsTopologyDescriptionPermanentFailureException("Topology
description plugin failed (internal error)", cause). The wire response
carries STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED with the generic
message; the original exception details don't reach the client.
Treated as permanent, so LastFailedTopologyEpoch ratchets. I didn't
add dynamic disable - a contract violation on one request shouldn't
take the plugin down cluster-wide, and the ratchet already prevents
hot-looping.
MJS17: Added a short inline explanation in the KIP.
MJS18: Reworded as suggested
MJS19: Right, that was confusing. Made the behavior clearer: v3+
returns GROUP_DELETION_FAILED with the cause in ErrorMessage; v2
downgrades to UNKNOWN_SERVER_ERROR with no ErrorMessage.
On Mon, Jun 1, 2026 at 7:14 AM Matthias J. Sax <[email protected]> wrote:
>
> Thanks Lucas!
>
> Few minor questions:
>
>
>
> MJS14: While we in general only have source, processor, and sink nodes,
> there is actually a special case when we use
> `Topology#addReadOnlyStateStore`. Technically, we still add a source and
> processor, but the attached state store doesn't get its own dedicated
> changelog topic, but will use the source topic as changelog. I am
> wondering if it would be worth adding a "read-only source" node type to
> the list? Just an idea.
>
>
>
> MJS15: For `GlobalStore` we define `Source` and `Processor`. I am
> wondering if we would want to also only just use type `[]TopologyNode`
> instead, similar to what we do for `Subtopology`? If we want to keep the
> format flexible, the current `Source` and `Processor` might be limiting?
>
>
>
> MJS16: About the broker plugin, the JavaDocs says:
>
> > Failures must be signalled by completing the future exceptionally —
> > implementations must not throw synchronously
>
> What happens if an exception it thrown directly from any method? I am
> wondering if we should treat it as "bad plugin", catch and log, and
> disable the plugin dynamically?
>
> What do we send to the client? I guess we can use
> `STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED` error code, but not sure if
> we should include the exception error message for this case (to avoid
> leaking something into the client, where it might not belong).
>
>
>
> MJS17:
>
> > The only difference is that there is no predecessor relation.
>
> While explained on the discuss thread, might be good to add this
> explanation to the KIP.
>
>
>
> MJS18:
>
> > Plugins that throw any other exception are treated as transient
>
> I think you meant the exception is treated as transient? I read like,
> "the plugin is treated as transient".
>
> Also, the plugin should not throw at all, right, but complete the Future
> exceptionally?
>
>
>
> MJS19:
>
> > Older clients that do receive GROUP_DELETION_FAILED (because they
> > understand version 3 but predate this KIP's error-code addition) decode the
> > code as UNKNOWN_SERVER_ERROR via the standard forward-compatibility
> > fallback in Errors.forCode;
>
> Not sure if I can follow. How can an older client understand version 3
> but not the new error code? Also, older clients should get a v2 response?
>
>
>
> -Matthias
>
>
>
>
>
>
>
>
>
>
>
> On 5/26/26 7:59 AM, Lucas Brutschy via dev wrote:
> > Hi all,
> >
> > following some off-list discussion. I made two changes to the KIP's
> > error handling: First, the DeleteGroups path now carries a new generic
> > delete-failed error code along with an error message. This aligns
> > better with existing RPCs and extends to future use cases where group
> > deletion may fail. Second, the dedicated "too large" code is gone and
> > all plugin failures surface as a single generic update-failed code
> > with the cause in the error message. The permanent-vs-transient
> > distinction moves into two new exception types used in the plugin
> > interface, but these are not error codes in the protocol: one for
> > permanent errors and one for transient errors, each matching one error
> > handling case (re-solicitate or not).
> >
> > As there weren't any new comments in the last week, I am planning to
> > open the vote tomorrow.
> >
> > Cheers,
> > Lucas
> >
> > On Wed, May 13, 2026 at 5:28 PM Lucas Brutschy <[email protected]>
> > wrote:
> >>
> >> Thanks for the comments. I have updated the KIP, simplifying the
> >> plugin interface significantly.
> >>
> >> The revised KIP moves the "is the plugin's topology up to date?"
> >> question out of the plugin and onto the broker. Two new tagged fields
> >> on the persisted streams-group metadata record — StoredTopologyEpoch
> >> and LastFailedTopologyEpoch — give the broker a durable answer.
> >> Heartbeats no longer call into the plugin to decide whether to ask the
> >> client for a push; the broker just compares those two persisted epochs
> >> against the group's current topology epoch and sets
> >> TopologyDescriptionRequired accordingly. As a side effect the plugin
> >> SPI shrinks. This simplifies plugin implementation and adds guardrails
> >> around that implementation.
> >>
> >> The other half of the change is a much clearer contract for what
> >> plugin authors have to do.
> >>
> >> Hope I have addressed the concerns in this thread, and looking forward
> >> to further comments.
> >>
> >> On Tue, May 12, 2026 at 10:39 AM Alieh Saeedi via dev
> >> <[email protected]> wrote:
> >>>
> >>> Thanks, Lucas. Following up on the discussion, something came to mind that
> >>> seems worth raising here. As we refine the design, I think it is important
> >>> to consider that many of the current answers rely on the plugin to handle
> >>> the responsibility. Since the user develops the plugin, this is a
> >>> particularly sensitive area. We should not make it easy for users to
> >>> accidentally shoot themselves in the foot. What happens if a buggy plugin
> >>> causes the application to break or puts excessive pressure on the GC? If
> >>> so, should the broker enforce any basic safeguards (e.g., per-group
> >>> in-flight limit) or at least document recommended limits for plugin
> >>> implementations?
> >>>
> >>> Thanks,
> >>> Alieh
> >>>
> >>> On Mon, May 11, 2026 at 3:11 PM Lucas Brutschy via dev
> >>> <[email protected]>
> >>> wrote:
> >>>
> >>>> Thanks for the plentiful comments! My responses below:
> >>>>
> >>>> AJS1) Fair point, I had "plugin" in there initially but removed it
> >>>> thinking I would better follow conventions. But I see that it is
> >>>> confusing. Changed back to
> >>>> group.streams.topology.description.plugin.class.
> >>>>
> >>>> AJS2) Not sure which Java object you mean, but sink-topics are modeled
> >>>> in the "Sink" node, so I don't see this missing. Predecessors are
> >>>> omitted on the broker-side, since they are not present on the wire and
> >>>> reconstruction seems unnecessary.
> >>>>
> >>>> AJS3) Good point. Added it.
> >>>>
> >>>> AJS4) I went back-and-forth on this one as well, but if we don't have
> >>>> the error code, all we can return is UNKNOWN_SERVER_ERROR when the
> >>>> topology description update fails. I don't think we want to do that,
> >>>> since it may trigger alerts set up to detect implementation bugs in
> >>>> the Kafka broker. So the point of TOPOLOGY_DESCRIPTION_UPDATE_FAILED
> >>>> is just letting the client know that the update failed, but there is
> >>>> nothing wrong with the Kafka broker. The client would likely log that
> >>>> at INFO level, opposed to WARN or ERRROR for UNKNOWN_SERVER_ERROR.
> >>>>
> >>>> AJS5) Sure, we can do that.
> >>>>
> >>>> AJS6) It was not supposed to be a user-facing concept. And I think it
> >>>> introduced more problems than it solved, so I removed the concept
> >>>> altogether.
> >>>>
> >>>> AJS7) Good point. This, in part, prompted me to bring back a previous
> >>>> version of the design, without the topology decription ID. Now, the
> >>>> topology description is created using only the topologyEpoch and a
> >>>> group creation timestamp. Then, the plugin can trivially implement
> >>>> concurrency control: The plugin can decide to store all versions,
> >>>> which means updates for separate topology epochs are independent. Or
> >>>> it can just store the latest one, which means that a topology
> >>>> description that comes later in the (groupCreationTimestamp,
> >>>> topologyEpoch) order takes preference over any previous update.
> >>>>
> >>>> AJS8) No, there is no direct relationship between a group epoch and a
> >>>> topology description.
> >>>>
> >>>> BB1) The asymmetry is intentional. Predecessors are redundant with
> >>>> sucessor information. We can reconstruct it any time. In the admin
> >>>> client, which are user-facing, we do reconstruct the predecessor for
> >>>> usability. On the broker-side, the main purpose is passing the data
> >>>> around, so we do not reconstruct the relation. I'll clarify this in
> >>>> the javadoc.
> >>>>
> >>>> BB2) Direct neighbours only — exactly as you described. I'll clarify
> >>>> this in the javadoc.
> >>>>
> >>>> BB3) The design intentionally pushes transient-retry policy entirely
> >>>> onto the plugin: when setTopology fails for a recoverable reason, the
> >>>> plugin is expected to keep returning true from requiresTopologyPush on
> >>>> subsequent heartbeats. I will clarify this in the Plugin
> >>>> Implementation section.
> >>>>
> >>>> MJS1) I dropped the UUID, as it introduced more complexities than it
> >>>> solved.
> >>>>
> >>>> MJS2a) I think we benefit from not modeling this too tightly. A sink
> >>>> is a sink, and the sink topic is a just a metadata annotation on top
> >>>> of it. There are corner cases, which I think would be more difficult
> >>>> to add later if we do not include NodeType. One is regular
> >>>> expressions. We do not model regular expressions here (because
> >>>> KIP-1071 does not support them yet -- and the existing client-side
> >>>> describe doesn't support them). We can still add regular expressions
> >>>> later, but then source topics will still be empty. Even clients that
> >>>> do not know the regular expressions field in the future should
> >>>> understand that it's a source node, even if maybe it does not
> >>>> understand how the source topics are determined. Similarly for sink
> >>>> nodes with topicNameExtractors, or any other nifty features people
> >>>> will cook up in the future. There is no defined sink topic for that
> >>>> node, but it's still a sink. The NodeType should define the identity,
> >>>> not the metadata attached to the node. I actually weakened the
> >>>> guarantees in the documentation around this a bit in the KIP, to make
> >>>> this clear.
> >>>>
> >>>> MJS2b) Fair — but it would make the schema a lot less uniform, because
> >>>> the corner case of global stores would require it's own record types
> >>>> with some fields fixed. Let me know if you feel strongly about it,
> >>>> otherwise, I'd vote for re-using the generic topologyRecords.
> >>>>
> >>>> MJS3) There is no Kafka RPC without a response, so we'd break with all
> >>>> conventions and I don't see why we'd even consider it. Apart from
> >>>> that, there are useful things to do in reaction to the response - e.g.
> >>>> log that the topology is too large, and throttleTimeMs.
> >>>>
> >>>> MJS4) The intent here is "this broker doesn't advertise this API in
> >>>> ApiVersions, but you sent it anyway, so I'm rejecting". This seems
> >>>> like a perfectly fine way to use UNSUPPORTED_VERSION to me.
> >>>>
> >>>> MJS5) This question comes up in every KIP I have the feeling. There is
> >>>> no strong reason to do either, but the preference is to use version
> >>>> bumps, because they are more compact on the wire and old versions
> >>>> without the field can be retired. I cleaned up some prose around this.
> >>>>
> >>>> MJS6) I think having the module `group-coordinator-api` is a decent
> >>>> place to put it. It's the place for broker-side plugin interfaces
> >>>> related to group management. Adding a new module broker-plugins for
> >>>> all broker-side plugins seems like scope creep for this KIP, and I am
> >>>> not even sure it would make things better.
> >>>>
> >>>> MJS7) Correct, this is a bit misleading. Exceptions are signaled by
> >>>> completing the future exceptionally, never by throwing synchronously
> >>>> from setTopology. I will clarify this in the KIP.
> >>>>
> >>>> MJS8) Yes, similar to ASH01 - the plugin needs to make sure that the
> >>>> futures are completed.
> >>>>
> >>>> MJS9) I agree that 3 POJOs are somewhat awkward, but creating
> >>>> dependencies, e.g. between the admin client and a broker-side plugin
> >>>> seems even worse to me. Where would we put the common data structure?
> >>>>
> >>>> MJS10) In principle we are modeling a sum type here (topology
> >>>> description | reason of absence), but Java doesn't natively model
> >>>> these, so we store two separate fields. Including a status that
> >>>> doesn't model the AVAILABLE case because it can be inferred seems more
> >>>> of a pitfall than necessary. Avoiding redundancy in APIs is a good
> >>>> pattern but not a strict rule, and I think having
> >>>> topologyDescription().isPresent() == (status == AVAILABLE) is
> >>>> perfectly fine. Also Optional would just add a layer of wrapping what
> >>>> can be modeled directly via the enum. But I agree that my distinction
> >>>> (omit the AVAILABLE in the RPC and include it in the user-facing API)
> >>>> may be unnecessarily confusing, see AJS5.
> >>>>
> >>>> MJS11) Personally, I think only the plugin knows if something went
> >>>> wrong and why, so the best place to define metrics is inside the
> >>>> plugin. Similar reasoning to why I want to keep configs, deduplication
> >>>> inside the plugin. If people strongly feel that we want to have AK
> >>>> metrics for this plugin, we can add them.
> >>>>
> >>>> MJS12) If the feature is disabled on the client but the broker plugin
> >>>> is enabled, the broker would indeed try to resolicitate on the
> >>>> client-side with back-off. But the solicitation is very light-weight
> >>>> (a single boolean, and requireTopologyId is supposed to be fast), and
> >>>> clients will just ignore it, so I don't see a problem with this.
> >>>>
> >>>> MJS13) The plugin doesn't (and shouldn't need to) know the heartbeat
> >>>> interval — it should throttle based on its own clock plus its own
> >>>> in-flight tracking. Concrete strategy: for each (groupId,
> >>>> groupCreationTimeMs, topologyEpoch) tuple the plugin tracks (i)
> >>>> whether a push has been initiated (set when requiresTopologyPush first
> >>>> returns true) and (ii) the last requiresTopologyPush=true time. While
> >>>> (i) is set and the push hasn't completed, requiresTopologyPush returns
> >>>> false. After the push completes successfully it returns false
> >>>> permanently for that tuple. On a transient failure, after a back-off
> >>>> window (independent of heartbeat cadence — say, exponential starting
> >>>> at 1s) it returns true again. On permanent failure
> >>>> (TOPOLOGY_DESCRIPTION_TOO_LARGE or plugin-semantic INVALID_REQUEST) it
> >>>> returns false permanently and logs.
> >>>>
> >>>> Hope that makes sense!
> >>>>
> >>>> On Sat, May 9, 2026 at 4:43 AM Matthias J. Sax <[email protected]> wrote:
> >>>>>
> >>>>> Thanks for the KIP Lucas. I made a first pass. Couple of
> >>>> comments/questions.
> >>>>>
> >>>>>
> >>>>> MJS1(a): This is a follow up to ASH03. I am not sure if I understand the
> >>>>> problem? The KIP says, we call `requiresTopologyPush` on every HB. So
> >>>>> after an upgrade, the plugin will be called for every existing group,
> >>>>> allowing the group to send its topology? So we can just create a UUID
> >>>>> before this call? In the end, we need to somehow cache all currently
> >>>>> in-use UUID anyway (ie, one for each active group)?
> >>>>>
> >>>>>
> >>>>> MJS1(b): Related to the above. The KIP does not say that UUIDs would be
> >>>>> stored by the GC -- so after a broker bounce, or GC fail-over to a
> >>>>> different broker, it seems we would forget all currently in-use UUIDs
> >>>>> and generate new ones? This would align to what I did ask about above,
> >>>>> and it should be fine from plugin POV to just get new UUIDs if none is
> >>>>> cached?
> >>>>>
> >>>>>
> >>>>> MJS1(c): However, I am not even sure about TopologyDescriptionId? Should
> >>>>> we use <groupId,topologyEpoch> instead, avoid this UUID all together? On
> >>>>> the other hand, with regard to ASH03, we might introduce the problem
> >>>>> Sanghyeok describe when doing this? On the other hand, on startup, the
> >>>>> GC could also check all existing groups, and just call
> >>>>> `requiresTopologyPush` pro-actively for each group?
> >>>>>
> >>>>>
> >>>>> MJS1(d): KIP-1313 proposes that clients create their UUID -- should we
> >>>>> do the same to integrate with KIP-1313 (in case we keep UUID, and not
> >>>>> move to <groupId,topologyEpoch>), to align the behavior across the
> >>>>> board? In the end, the topology can only change during a roll, which
> >>>>> aligns to a topology-epoch bump anyway?
> >>>>>
> >>>>>
> >>>>>
> >>>>> MJS2(a): The KIP introduces a `NodeType` field for the topology
> >>>>> description. I am wondering if we need it? We have sources, processors,
> >>>>> sink. Only sources can have input topics, and only sinks can have output
> >>>>> topic, and processor never have any input/output topic, so it seems just
> >>>>> inspecting if input/output topic are present, tells us what node type we
> >>>>> have, and have an explicit types seems to be redundant?
> >>>>>
> >>>>>
> >>>>> MJS2(b): I am wondering why we model GlobalStores with two
> >>>>> TopologyNodes? We know that for this case, there is exactly one source
> >>>>> node, and one processor. Should we simplify this?
> >>>>>
> >>>>>
> >>>>>
> >>>>> MJS3: Do we actually need to add
> >>>>> `UpdateStreamsGroupTopologyDescriptionResponse`, or could we use a "fire
> >>>>> and forget" approache? If an topology update failed, and the plugin
> >>>>> re-request the push, the next HB-response would take care of it
> >>>>> naturally it seems? Atm the only value we get is to send back some error
> >>>>> code. It this worth it?
> >>>>>
> >>>>>
> >>>>>
> >>>>> MJS4: The KIP add `UNSUPPORTED_VERSION` error.
> >>>>>
> >>>>>> UNSUPPORTED_VERSION — the coordinator cannot serve this RPC because no
> >>>> topology description plugin is configured
> >>>>> Is this the right name for this error?
> >>>>>
> >>>>>
> >>>>>
> >>>>> MJS5: the KIP proposed to bump the version of both
> >>>>> StreamsGroupDescribeRequest and StreamsGroupDescribeResponse. No
> >>>>> objection, but wondering why we prefer it over tagged fields?
> >>>>>
> >>>>>
> >>>>>
> >>>>> MJS6: Is `org.apache.kafka.coordinator.group.api.streams` the right
> >>>>> place for the plugin interface? It seems we add a very heavy dependency
> >>>>> for people implementing the plugin. Would it make sense to add a new
> >>>>> module `broker-plugins` instead to make it more light weight? Yes,
> >>>>> KIP-714 does the same, but it might be a nice improvement for 714, too,
> >>>>> to move their plugin into such a new module?
> >>>>>
> >>>>>
> >>>>> MJS7: The KIP says that `setTopology()` may throw
> >>>>> TopologyDescriptionTooLargeException? Later in "plugin guidelines" it
> >>>> says:
> >>>>>
> >>>>>> Reject pushes that exceed it by completing the setTopology future with
> >>>> TopologyDescriptionTooLargeException
> >>>>>
> >>>>> Which one is it? We might not want to do it both ways? Also applies to
> >>>>> other exception.
> >>>>>
> >>>>>
> >>>>>
> >>>>> MJS8: It seems the broker will need to cache all non-yet-completed
> >>>>> ComparableFutures. What if the plugin has a bug, and never completes
> >>>>> it's future? Would we get some leak? -- Or is your answer the same as
> >>>>> for ASH01?
> >>>>>
> >>>>>
> >>>>>
> >>>>> MJS9: Should we move `StreamsGroupTopologyDescription` to the same
> >>>>> propose `broker-plugin` module? I am also wondering about Alieh's
> >>>>> question: Do we keep it's own class in purpose, of should be unify with
> >>>>> the existing interface? And why do we get a second
> >>>>> `StreamsGroupTopologyDescription` for the admin client? -- I understand
> >>>>> your argument about "this may evolve independently", but do we need 3
> >>>>> copies of the same?
> >>>>>
> >>>>>
> >>>>> MJS10(a): Do we need to expose `StreamsGroupTopologyDescriptionStatus`
> >>>>> via `topologyDescriptionStatus()`? I am wondering if
> >>>>> `StreamsGroupTopologyDescription` could model this directly? Also not
> >>>>> sure if `topologyDescriptionStatus` should return an `Optional` or not?
> >>>>>
> >>>>>
> >>>>>
> >>>>> MJS10(b): I can see argument to ignore what I said in MJS10(b), but if
> >>>>> we have an Optional, why do we need status `AVAILABLE`?
> >>>>>
> >>>>>
> >>>>>
> >>>>> MJS11: The KIP says we don't add any broker side metrics. KIP-714 did.
> >>>>> Wondering why we won't need any for this KIP?
> >>>>>
> >>>>>
> >>>>>
> >>>>> MJS12: If this feature is disabled client side, can the broker learn
> >>>>> about it? Or might it keep requesting the topology over and over again,
> >>>>> and the client would just keep ignoring the request? Would we want some
> >>>>> error-code the clients sends to the broker for this case instead?
> >>>>>
> >>>>>
> >>>>> MJS13: In the "plugin guidelines", the KIP says:
> >>>>>
> >>>>>> Avoid concurrent or repetitive pushes
> >>>>>
> >>>>> Does the plugin know the HB interval to implement this in reasonable
> >>>>> way? Any better guidance we can give how to implement this?
> >>>>>
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>>
> >>>>> On 5/5/26 6:26 AM, Lucas Brutschy via dev wrote:
> >>>>>> Thanks Sanghyeok and Alieh!
> >>>>>>
> >>>>>> ASH01: Both risks are real. But I would argue that anyways, the
> >>>>>> plugins that Kafka currently defines need to be implemented correctly
> >>>>>> for Kafka to reliably work. This is the case here as well - just like
> >>>>>> a consumer group assignor needs to be correct and fast,
> >>>>>> requiresTopologyPush needs to be implemented correct and fast. I did
> >>>>>> give most of the responsibility to the plugin here, because it depends
> >>>>>> on the plugin in which situations it requires a new topology push from
> >>>>>> the client. I feel like adding extra logic to time out slow
> >>>>>> requireTopologyPush calls, or enforcing minimum intervals would
> >>>>>> require more configuration options and an extra level of deduplication
> >>>>>> logic - confusing users that do not use the plugin (configurations are
> >>>>>> irrelevant for them) and confusing users that do use the plugin (two
> >>>>>> potentially conflicting levels need to be configured).
> >>>>>>
> >>>>>> ASH02: Good point and this is actually something that I considered.
> >>>>>> But it's actually not that easy - in principle, there is only one
> >>>>>> current topology description, but there may be an arbitrary number of
> >>>>>> stale topologies active in the group. Would we require the plugin to
> >>>>>> store all stale topologies? I think this would be a feasible
> >>>>>> extension, but would definitely add some complexity. I would propose
> >>>>>> adding this as a follow-up item. I can add this to the future work
> >>>>>> section.
> >>>>>>
> >>>>>> ASH03: Good catch. I don't really want to have an "upgrade logic" that
> >>>>>> we need to preserve forever in the group coordinator. I think it would
> >>>>>> be okay to allow "ZERO_UUID" for any topology that exists when the
> >>>>>> broker upgraded.
> >>>>>>
> >>>>>> ASH04: I noted this in the future work section. In principle we can
> >>>>>> detect mismatches between topology descriptions on the client, but we
> >>>>>> do not include it in this KIP, since it would complicate things. The
> >>>>>> first successfully stored topology is authoritative.
> >>>>>>
> >>>>>> AS01) Both topologies are derived from the same Topology instance on
> >>>>>> the client at a given epoch, so at the source they're consistent by
> >>>>>> construction. During a topology update, the topology ID changes, and
> >>>>>> initially we will not have a new topology description passed to the
> >>>>>> plugin. In this case, we can get an intermediate NOT_STORED response
> >>>>>> when we try to describe the streams group, until a client pushes the
> >>>>>> new topology description. But the result will be consistent with the
> >>>>>> topology information used for assignments. This assumes that all
> >>>>>> clients with the same topology epoch use the same topology
> >>>>>> description. Mismatch detection is noted as future work.
> >>>>>>
> >>>>>> AS02) I think your point about retrying with too large descriptions is
> >>>>>> valid. It would actually make sense to leave topology size checking to
> >>>>>> the plugin as well - it can decide the maximum topology size and stop
> >>>>>> returning requiresTopologyPush for topologies that are confirmed to be
> >>>>>> too large. I will make this change in the next revision of the KIP.
> >>>>>>
> >>>>>> AS03) Yes, this is mostly to avoid dependencies between Kafka
> >>>>>> packages. Note that we do not necessarily need to keep the two
> >>>>>> implementations in sync. The streams-side TopologyDescription may
> >>>>>> evolve differently than the admin-side TopologyDescription. The two
> >>>>>> are only weakly linked through the RPC definition.
> >>>>>>
> >>>>>> AS04) Agreed. However, topology has the slowest-changing lifecycle of
> >>>>>> the three, so it should be less confusing than assignments and
> >>>>>> members.
> >>>>>>
> >>>>>> AS05) Correct. The plugin is free to forward, persist, fan out, mirror
> >>>>>> to multiple sinks, or anything else. The KIP intentionally doesn't
> >>>>>> constrain the storage backend.
> >>>>>>
> >>>>>> On Tue, May 5, 2026 at 1:19 PM Alieh Saeedi <[email protected]>
> >>>> wrote:
> >>>>>>>
> >>>>>>> Thanks Lucas for the KIP. The KIP is already in very good shape and
> >>>> covers the edge cases. I still have a few questions and considerations
> >>>> I’d
> >>>> like to share.
> >>>>>>>
> >>>>>>> AS01: Are Assignment topology (defined in KIP-1071) and the
> >>>> Description topology (defined in KIP-1331) guaranteed to be consistent
> >>>> views of the same logical topology, or can they drift? Are we
> >>>> guaranteeing
> >>>> that every assignment we surface references only nodes/topics present in
> >>>> the current description topology, or can operators see combinations that
> >>>> don’t line up?
> >>>>>>>
> >>>>>>> AS02: I'm cusrious about the rationale or empirical data behind the
> >>>> 350 KB default (e.g., based on observed real-world topologies)? Also the
> >>>> KIP says the broker measures topology size and rejects oversized payloads
> >>>> with TOPOLOGY_DESCRIPTION_TOO_LARGE. Should the Streams client attempt a
> >>>> best-effort pre-check of the serialized size to avoid repeated failing
> >>>> pushes and log a clearer local error? Or is the intent to keep the client
> >>>> simple and rely entirely on the broker response + plugin behavior for
> >>>> this
> >>>> case?
> >>>>>>>
> >>>>>>> AS03: Why do we introduce a separate Admin-side POJO instead of
> >>>> reusing TopologyDescription from the Streams API—for dependency/semantic
> >>>> reasons? And how do we plan to keep the two representations in sync?
> >>>>>>>
> >>>>>>> AS04: Somewhat related to AS01.... In practice we’ve seen that
> >>>> because members and assignments change so dynamically, a user may see
> >>>> different assignments or members over just a few seconds, or a member
> >>>> with
> >>>> a specific memberId may disappear entirely. Having the topology visible
> >>>> might help users understand what’s going on—but it could also make things
> >>>> more confusing, depending on the situation.
> >>>>>>>
> >>>>>>> AS05: I assume that even with a single plugin, multiple downstream
> >>>> systems can still benefit from it (the plugin can of course fan out to
> >>>> multiple downstream systems). Am I right?
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Alieh
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, May 4, 2026 at 11:39 AM Lucas Brutschy via dev <
> >>>> [email protected]> wrote:
> >>>>>>>>
> >>>>>>>> Hi all,
> >>>>>>>>
> >>>>>>>> I would like to start the discussion on KIP-1331. The idea is to
> >>>>>>>> optionally make a topology description available to the broker, in
> >>>> the
> >>>>>>>> spirit of KIP-714. Looking forward to your feedback!
> >>>>>>>>
> >>>>>>>>
> >>>> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-1331*3A*Streams*Group*Topology*Description*Plugin__;JSsrKysr!!Ayb5sqE7!sxqGDUcjOzRpt9Gk0jE1XnVSit-FZMIihk2UsXWUI0jmdYK2nTcO1hP-9WiW5sLBMw8amIUxG2PGvhdRhok$
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Lucas
> >>>>>
> >>>>
>