Thanks, Lucas. Following up on the discussion, something came to mind that seems worth raising here. As we refine the design, I think it is important to consider that many of the current answers rely on the plugin to handle the responsibility. Since the user develops the plugin, this is a particularly sensitive area. We should not make it easy for users to accidentally shoot themselves in the foot. What happens if a buggy plugin causes the application to break or puts excessive pressure on the GC? If so, should the broker enforce any basic safeguards (e.g., per-group in-flight limit) or at least document recommended limits for plugin implementations?
Thanks, Alieh On Mon, May 11, 2026 at 3:11 PM Lucas Brutschy via dev <[email protected]> wrote: > Thanks for the plentiful comments! My responses below: > > AJS1) Fair point, I had "plugin" in there initially but removed it > thinking I would better follow conventions. But I see that it is > confusing. Changed back to > group.streams.topology.description.plugin.class. > > AJS2) Not sure which Java object you mean, but sink-topics are modeled > in the "Sink" node, so I don't see this missing. Predecessors are > omitted on the broker-side, since they are not present on the wire and > reconstruction seems unnecessary. > > AJS3) Good point. Added it. > > AJS4) I went back-and-forth on this one as well, but if we don't have > the error code, all we can return is UNKNOWN_SERVER_ERROR when the > topology description update fails. I don't think we want to do that, > since it may trigger alerts set up to detect implementation bugs in > the Kafka broker. So the point of TOPOLOGY_DESCRIPTION_UPDATE_FAILED > is just letting the client know that the update failed, but there is > nothing wrong with the Kafka broker. The client would likely log that > at INFO level, opposed to WARN or ERRROR for UNKNOWN_SERVER_ERROR. > > AJS5) Sure, we can do that. > > AJS6) It was not supposed to be a user-facing concept. And I think it > introduced more problems than it solved, so I removed the concept > altogether. > > AJS7) Good point. This, in part, prompted me to bring back a previous > version of the design, without the topology decription ID. Now, the > topology description is created using only the topologyEpoch and a > group creation timestamp. Then, the plugin can trivially implement > concurrency control: The plugin can decide to store all versions, > which means updates for separate topology epochs are independent. Or > it can just store the latest one, which means that a topology > description that comes later in the (groupCreationTimestamp, > topologyEpoch) order takes preference over any previous update. > > AJS8) No, there is no direct relationship between a group epoch and a > topology description. > > BB1) The asymmetry is intentional. Predecessors are redundant with > sucessor information. We can reconstruct it any time. In the admin > client, which are user-facing, we do reconstruct the predecessor for > usability. On the broker-side, the main purpose is passing the data > around, so we do not reconstruct the relation. I'll clarify this in > the javadoc. > > BB2) Direct neighbours only — exactly as you described. I'll clarify > this in the javadoc. > > BB3) The design intentionally pushes transient-retry policy entirely > onto the plugin: when setTopology fails for a recoverable reason, the > plugin is expected to keep returning true from requiresTopologyPush on > subsequent heartbeats. I will clarify this in the Plugin > Implementation section. > > MJS1) I dropped the UUID, as it introduced more complexities than it > solved. > > MJS2a) I think we benefit from not modeling this too tightly. A sink > is a sink, and the sink topic is a just a metadata annotation on top > of it. There are corner cases, which I think would be more difficult > to add later if we do not include NodeType. One is regular > expressions. We do not model regular expressions here (because > KIP-1071 does not support them yet -- and the existing client-side > describe doesn't support them). We can still add regular expressions > later, but then source topics will still be empty. Even clients that > do not know the regular expressions field in the future should > understand that it's a source node, even if maybe it does not > understand how the source topics are determined. Similarly for sink > nodes with topicNameExtractors, or any other nifty features people > will cook up in the future. There is no defined sink topic for that > node, but it's still a sink. The NodeType should define the identity, > not the metadata attached to the node. I actually weakened the > guarantees in the documentation around this a bit in the KIP, to make > this clear. > > MJS2b) Fair — but it would make the schema a lot less uniform, because > the corner case of global stores would require it's own record types > with some fields fixed. Let me know if you feel strongly about it, > otherwise, I'd vote for re-using the generic topologyRecords. > > MJS3) There is no Kafka RPC without a response, so we'd break with all > conventions and I don't see why we'd even consider it. Apart from > that, there are useful things to do in reaction to the response - e.g. > log that the topology is too large, and throttleTimeMs. > > MJS4) The intent here is "this broker doesn't advertise this API in > ApiVersions, but you sent it anyway, so I'm rejecting". This seems > like a perfectly fine way to use UNSUPPORTED_VERSION to me. > > MJS5) This question comes up in every KIP I have the feeling. There is > no strong reason to do either, but the preference is to use version > bumps, because they are more compact on the wire and old versions > without the field can be retired. I cleaned up some prose around this. > > MJS6) I think having the module `group-coordinator-api` is a decent > place to put it. It's the place for broker-side plugin interfaces > related to group management. Adding a new module broker-plugins for > all broker-side plugins seems like scope creep for this KIP, and I am > not even sure it would make things better. > > MJS7) Correct, this is a bit misleading. Exceptions are signaled by > completing the future exceptionally, never by throwing synchronously > from setTopology. I will clarify this in the KIP. > > MJS8) Yes, similar to ASH01 - the plugin needs to make sure that the > futures are completed. > > MJS9) I agree that 3 POJOs are somewhat awkward, but creating > dependencies, e.g. between the admin client and a broker-side plugin > seems even worse to me. Where would we put the common data structure? > > MJS10) In principle we are modeling a sum type here (topology > description | reason of absence), but Java doesn't natively model > these, so we store two separate fields. Including a status that > doesn't model the AVAILABLE case because it can be inferred seems more > of a pitfall than necessary. Avoiding redundancy in APIs is a good > pattern but not a strict rule, and I think having > topologyDescription().isPresent() == (status == AVAILABLE) is > perfectly fine. Also Optional would just add a layer of wrapping what > can be modeled directly via the enum. But I agree that my distinction > (omit the AVAILABLE in the RPC and include it in the user-facing API) > may be unnecessarily confusing, see AJS5. > > MJS11) Personally, I think only the plugin knows if something went > wrong and why, so the best place to define metrics is inside the > plugin. Similar reasoning to why I want to keep configs, deduplication > inside the plugin. If people strongly feel that we want to have AK > metrics for this plugin, we can add them. > > MJS12) If the feature is disabled on the client but the broker plugin > is enabled, the broker would indeed try to resolicitate on the > client-side with back-off. But the solicitation is very light-weight > (a single boolean, and requireTopologyId is supposed to be fast), and > clients will just ignore it, so I don't see a problem with this. > > MJS13) The plugin doesn't (and shouldn't need to) know the heartbeat > interval — it should throttle based on its own clock plus its own > in-flight tracking. Concrete strategy: for each (groupId, > groupCreationTimeMs, topologyEpoch) tuple the plugin tracks (i) > whether a push has been initiated (set when requiresTopologyPush first > returns true) and (ii) the last requiresTopologyPush=true time. While > (i) is set and the push hasn't completed, requiresTopologyPush returns > false. After the push completes successfully it returns false > permanently for that tuple. On a transient failure, after a back-off > window (independent of heartbeat cadence — say, exponential starting > at 1s) it returns true again. On permanent failure > (TOPOLOGY_DESCRIPTION_TOO_LARGE or plugin-semantic INVALID_REQUEST) it > returns false permanently and logs. > > Hope that makes sense! > > On Sat, May 9, 2026 at 4:43 AM Matthias J. Sax <[email protected]> wrote: > > > > Thanks for the KIP Lucas. I made a first pass. Couple of > comments/questions. > > > > > > MJS1(a): This is a follow up to ASH03. I am not sure if I understand the > > problem? The KIP says, we call `requiresTopologyPush` on every HB. So > > after an upgrade, the plugin will be called for every existing group, > > allowing the group to send its topology? So we can just create a UUID > > before this call? In the end, we need to somehow cache all currently > > in-use UUID anyway (ie, one for each active group)? > > > > > > MJS1(b): Related to the above. The KIP does not say that UUIDs would be > > stored by the GC -- so after a broker bounce, or GC fail-over to a > > different broker, it seems we would forget all currently in-use UUIDs > > and generate new ones? This would align to what I did ask about above, > > and it should be fine from plugin POV to just get new UUIDs if none is > > cached? > > > > > > MJS1(c): However, I am not even sure about TopologyDescriptionId? Should > > we use <groupId,topologyEpoch> instead, avoid this UUID all together? On > > the other hand, with regard to ASH03, we might introduce the problem > > Sanghyeok describe when doing this? On the other hand, on startup, the > > GC could also check all existing groups, and just call > > `requiresTopologyPush` pro-actively for each group? > > > > > > MJS1(d): KIP-1313 proposes that clients create their UUID -- should we > > do the same to integrate with KIP-1313 (in case we keep UUID, and not > > move to <groupId,topologyEpoch>), to align the behavior across the > > board? In the end, the topology can only change during a roll, which > > aligns to a topology-epoch bump anyway? > > > > > > > > MJS2(a): The KIP introduces a `NodeType` field for the topology > > description. I am wondering if we need it? We have sources, processors, > > sink. Only sources can have input topics, and only sinks can have output > > topic, and processor never have any input/output topic, so it seems just > > inspecting if input/output topic are present, tells us what node type we > > have, and have an explicit types seems to be redundant? > > > > > > MJS2(b): I am wondering why we model GlobalStores with two > > TopologyNodes? We know that for this case, there is exactly one source > > node, and one processor. Should we simplify this? > > > > > > > > MJS3: Do we actually need to add > > `UpdateStreamsGroupTopologyDescriptionResponse`, or could we use a "fire > > and forget" approache? If an topology update failed, and the plugin > > re-request the push, the next HB-response would take care of it > > naturally it seems? Atm the only value we get is to send back some error > > code. It this worth it? > > > > > > > > MJS4: The KIP add `UNSUPPORTED_VERSION` error. > > > > > UNSUPPORTED_VERSION — the coordinator cannot serve this RPC because no > topology description plugin is configured > > Is this the right name for this error? > > > > > > > > MJS5: the KIP proposed to bump the version of both > > StreamsGroupDescribeRequest and StreamsGroupDescribeResponse. No > > objection, but wondering why we prefer it over tagged fields? > > > > > > > > MJS6: Is `org.apache.kafka.coordinator.group.api.streams` the right > > place for the plugin interface? It seems we add a very heavy dependency > > for people implementing the plugin. Would it make sense to add a new > > module `broker-plugins` instead to make it more light weight? Yes, > > KIP-714 does the same, but it might be a nice improvement for 714, too, > > to move their plugin into such a new module? > > > > > > MJS7: The KIP says that `setTopology()` may throw > > TopologyDescriptionTooLargeException? Later in "plugin guidelines" it > says: > > > > > Reject pushes that exceed it by completing the setTopology future with > TopologyDescriptionTooLargeException > > > > Which one is it? We might not want to do it both ways? Also applies to > > other exception. > > > > > > > > MJS8: It seems the broker will need to cache all non-yet-completed > > ComparableFutures. What if the plugin has a bug, and never completes > > it's future? Would we get some leak? -- Or is your answer the same as > > for ASH01? > > > > > > > > MJS9: Should we move `StreamsGroupTopologyDescription` to the same > > propose `broker-plugin` module? I am also wondering about Alieh's > > question: Do we keep it's own class in purpose, of should be unify with > > the existing interface? And why do we get a second > > `StreamsGroupTopologyDescription` for the admin client? -- I understand > > your argument about "this may evolve independently", but do we need 3 > > copies of the same? > > > > > > MJS10(a): Do we need to expose `StreamsGroupTopologyDescriptionStatus` > > via `topologyDescriptionStatus()`? I am wondering if > > `StreamsGroupTopologyDescription` could model this directly? Also not > > sure if `topologyDescriptionStatus` should return an `Optional` or not? > > > > > > > > MJS10(b): I can see argument to ignore what I said in MJS10(b), but if > > we have an Optional, why do we need status `AVAILABLE`? > > > > > > > > MJS11: The KIP says we don't add any broker side metrics. KIP-714 did. > > Wondering why we won't need any for this KIP? > > > > > > > > MJS12: If this feature is disabled client side, can the broker learn > > about it? Or might it keep requesting the topology over and over again, > > and the client would just keep ignoring the request? Would we want some > > error-code the clients sends to the broker for this case instead? > > > > > > MJS13: In the "plugin guidelines", the KIP says: > > > > > Avoid concurrent or repetitive pushes > > > > Does the plugin know the HB interval to implement this in reasonable > > way? Any better guidance we can give how to implement this? > > > > > > > > -Matthias > > > > > > > > On 5/5/26 6:26 AM, Lucas Brutschy via dev wrote: > > > Thanks Sanghyeok and Alieh! > > > > > > ASH01: Both risks are real. But I would argue that anyways, the > > > plugins that Kafka currently defines need to be implemented correctly > > > for Kafka to reliably work. This is the case here as well - just like > > > a consumer group assignor needs to be correct and fast, > > > requiresTopologyPush needs to be implemented correct and fast. I did > > > give most of the responsibility to the plugin here, because it depends > > > on the plugin in which situations it requires a new topology push from > > > the client. I feel like adding extra logic to time out slow > > > requireTopologyPush calls, or enforcing minimum intervals would > > > require more configuration options and an extra level of deduplication > > > logic - confusing users that do not use the plugin (configurations are > > > irrelevant for them) and confusing users that do use the plugin (two > > > potentially conflicting levels need to be configured). > > > > > > ASH02: Good point and this is actually something that I considered. > > > But it's actually not that easy - in principle, there is only one > > > current topology description, but there may be an arbitrary number of > > > stale topologies active in the group. Would we require the plugin to > > > store all stale topologies? I think this would be a feasible > > > extension, but would definitely add some complexity. I would propose > > > adding this as a follow-up item. I can add this to the future work > > > section. > > > > > > ASH03: Good catch. I don't really want to have an "upgrade logic" that > > > we need to preserve forever in the group coordinator. I think it would > > > be okay to allow "ZERO_UUID" for any topology that exists when the > > > broker upgraded. > > > > > > ASH04: I noted this in the future work section. In principle we can > > > detect mismatches between topology descriptions on the client, but we > > > do not include it in this KIP, since it would complicate things. The > > > first successfully stored topology is authoritative. > > > > > > AS01) Both topologies are derived from the same Topology instance on > > > the client at a given epoch, so at the source they're consistent by > > > construction. During a topology update, the topology ID changes, and > > > initially we will not have a new topology description passed to the > > > plugin. In this case, we can get an intermediate NOT_STORED response > > > when we try to describe the streams group, until a client pushes the > > > new topology description. But the result will be consistent with the > > > topology information used for assignments. This assumes that all > > > clients with the same topology epoch use the same topology > > > description. Mismatch detection is noted as future work. > > > > > > AS02) I think your point about retrying with too large descriptions is > > > valid. It would actually make sense to leave topology size checking to > > > the plugin as well - it can decide the maximum topology size and stop > > > returning requiresTopologyPush for topologies that are confirmed to be > > > too large. I will make this change in the next revision of the KIP. > > > > > > AS03) Yes, this is mostly to avoid dependencies between Kafka > > > packages. Note that we do not necessarily need to keep the two > > > implementations in sync. The streams-side TopologyDescription may > > > evolve differently than the admin-side TopologyDescription. The two > > > are only weakly linked through the RPC definition. > > > > > > AS04) Agreed. However, topology has the slowest-changing lifecycle of > > > the three, so it should be less confusing than assignments and > > > members. > > > > > > AS05) Correct. The plugin is free to forward, persist, fan out, mirror > > > to multiple sinks, or anything else. The KIP intentionally doesn't > > > constrain the storage backend. > > > > > > On Tue, May 5, 2026 at 1:19 PM Alieh Saeedi <[email protected]> > wrote: > > >> > > >> Thanks Lucas for the KIP. The KIP is already in very good shape and > covers the edge cases. I still have a few questions and considerations I’d > like to share. > > >> > > >> AS01: Are Assignment topology (defined in KIP-1071) and the > Description topology (defined in KIP-1331) guaranteed to be consistent > views of the same logical topology, or can they drift? Are we guaranteeing > that every assignment we surface references only nodes/topics present in > the current description topology, or can operators see combinations that > don’t line up? > > >> > > >> AS02: I'm cusrious about the rationale or empirical data behind the > 350 KB default (e.g., based on observed real-world topologies)? Also the > KIP says the broker measures topology size and rejects oversized payloads > with TOPOLOGY_DESCRIPTION_TOO_LARGE. Should the Streams client attempt a > best-effort pre-check of the serialized size to avoid repeated failing > pushes and log a clearer local error? Or is the intent to keep the client > simple and rely entirely on the broker response + plugin behavior for this > case? > > >> > > >> AS03: Why do we introduce a separate Admin-side POJO instead of > reusing TopologyDescription from the Streams API—for dependency/semantic > reasons? And how do we plan to keep the two representations in sync? > > >> > > >> AS04: Somewhat related to AS01.... In practice we’ve seen that > because members and assignments change so dynamically, a user may see > different assignments or members over just a few seconds, or a member with > a specific memberId may disappear entirely. Having the topology visible > might help users understand what’s going on—but it could also make things > more confusing, depending on the situation. > > >> > > >> AS05: I assume that even with a single plugin, multiple downstream > systems can still benefit from it (the plugin can of course fan out to > multiple downstream systems). Am I right? > > >> > > >> Thanks, > > >> Alieh > > >> > > >> > > >> > > >> On Mon, May 4, 2026 at 11:39 AM Lucas Brutschy via dev < > [email protected]> wrote: > > >>> > > >>> Hi all, > > >>> > > >>> I would like to start the discussion on KIP-1331. The idea is to > > >>> optionally make a topology description available to the broker, in > the > > >>> spirit of KIP-714. Looking forward to your feedback! > > >>> > > >>> > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-1331*3A*Streams*Group*Topology*Description*Plugin__;JSsrKysr!!Ayb5sqE7!sxqGDUcjOzRpt9Gk0jE1XnVSit-FZMIihk2UsXWUI0jmdYK2nTcO1hP-9WiW5sLBMw8amIUxG2PGvhdRhok$ > > >>> > > >>> Best, > > >>> Lucas > > >
