Thanks for the comments. I have updated the KIP, simplifying the plugin interface significantly.
The revised KIP moves the "is the plugin's topology up to date?" question out of the plugin and onto the broker. Two new tagged fields on the persisted streams-group metadata record — StoredTopologyEpoch and LastFailedTopologyEpoch — give the broker a durable answer. Heartbeats no longer call into the plugin to decide whether to ask the client for a push; the broker just compares those two persisted epochs against the group's current topology epoch and sets TopologyDescriptionRequired accordingly. As a side effect the plugin SPI shrinks. This simplifies plugin implementation and adds guardrails around that implementation. The other half of the change is a much clearer contract for what plugin authors have to do. Hope I have addressed the concerns in this thread, and looking forward to further comments. On Tue, May 12, 2026 at 10:39 AM Alieh Saeedi via dev <[email protected]> wrote: > > Thanks, Lucas. Following up on the discussion, something came to mind that > seems worth raising here. As we refine the design, I think it is important > to consider that many of the current answers rely on the plugin to handle > the responsibility. Since the user develops the plugin, this is a > particularly sensitive area. We should not make it easy for users to > accidentally shoot themselves in the foot. What happens if a buggy plugin > causes the application to break or puts excessive pressure on the GC? If > so, should the broker enforce any basic safeguards (e.g., per-group > in-flight limit) or at least document recommended limits for plugin > implementations? > > Thanks, > Alieh > > On Mon, May 11, 2026 at 3:11 PM Lucas Brutschy via dev <[email protected]> > wrote: > > > Thanks for the plentiful comments! My responses below: > > > > AJS1) Fair point, I had "plugin" in there initially but removed it > > thinking I would better follow conventions. But I see that it is > > confusing. Changed back to > > group.streams.topology.description.plugin.class. > > > > AJS2) Not sure which Java object you mean, but sink-topics are modeled > > in the "Sink" node, so I don't see this missing. Predecessors are > > omitted on the broker-side, since they are not present on the wire and > > reconstruction seems unnecessary. > > > > AJS3) Good point. Added it. > > > > AJS4) I went back-and-forth on this one as well, but if we don't have > > the error code, all we can return is UNKNOWN_SERVER_ERROR when the > > topology description update fails. I don't think we want to do that, > > since it may trigger alerts set up to detect implementation bugs in > > the Kafka broker. So the point of TOPOLOGY_DESCRIPTION_UPDATE_FAILED > > is just letting the client know that the update failed, but there is > > nothing wrong with the Kafka broker. The client would likely log that > > at INFO level, opposed to WARN or ERRROR for UNKNOWN_SERVER_ERROR. > > > > AJS5) Sure, we can do that. > > > > AJS6) It was not supposed to be a user-facing concept. And I think it > > introduced more problems than it solved, so I removed the concept > > altogether. > > > > AJS7) Good point. This, in part, prompted me to bring back a previous > > version of the design, without the topology decription ID. Now, the > > topology description is created using only the topologyEpoch and a > > group creation timestamp. Then, the plugin can trivially implement > > concurrency control: The plugin can decide to store all versions, > > which means updates for separate topology epochs are independent. Or > > it can just store the latest one, which means that a topology > > description that comes later in the (groupCreationTimestamp, > > topologyEpoch) order takes preference over any previous update. > > > > AJS8) No, there is no direct relationship between a group epoch and a > > topology description. > > > > BB1) The asymmetry is intentional. Predecessors are redundant with > > sucessor information. We can reconstruct it any time. In the admin > > client, which are user-facing, we do reconstruct the predecessor for > > usability. On the broker-side, the main purpose is passing the data > > around, so we do not reconstruct the relation. I'll clarify this in > > the javadoc. > > > > BB2) Direct neighbours only — exactly as you described. I'll clarify > > this in the javadoc. > > > > BB3) The design intentionally pushes transient-retry policy entirely > > onto the plugin: when setTopology fails for a recoverable reason, the > > plugin is expected to keep returning true from requiresTopologyPush on > > subsequent heartbeats. I will clarify this in the Plugin > > Implementation section. > > > > MJS1) I dropped the UUID, as it introduced more complexities than it > > solved. > > > > MJS2a) I think we benefit from not modeling this too tightly. A sink > > is a sink, and the sink topic is a just a metadata annotation on top > > of it. There are corner cases, which I think would be more difficult > > to add later if we do not include NodeType. One is regular > > expressions. We do not model regular expressions here (because > > KIP-1071 does not support them yet -- and the existing client-side > > describe doesn't support them). We can still add regular expressions > > later, but then source topics will still be empty. Even clients that > > do not know the regular expressions field in the future should > > understand that it's a source node, even if maybe it does not > > understand how the source topics are determined. Similarly for sink > > nodes with topicNameExtractors, or any other nifty features people > > will cook up in the future. There is no defined sink topic for that > > node, but it's still a sink. The NodeType should define the identity, > > not the metadata attached to the node. I actually weakened the > > guarantees in the documentation around this a bit in the KIP, to make > > this clear. > > > > MJS2b) Fair — but it would make the schema a lot less uniform, because > > the corner case of global stores would require it's own record types > > with some fields fixed. Let me know if you feel strongly about it, > > otherwise, I'd vote for re-using the generic topologyRecords. > > > > MJS3) There is no Kafka RPC without a response, so we'd break with all > > conventions and I don't see why we'd even consider it. Apart from > > that, there are useful things to do in reaction to the response - e.g. > > log that the topology is too large, and throttleTimeMs. > > > > MJS4) The intent here is "this broker doesn't advertise this API in > > ApiVersions, but you sent it anyway, so I'm rejecting". This seems > > like a perfectly fine way to use UNSUPPORTED_VERSION to me. > > > > MJS5) This question comes up in every KIP I have the feeling. There is > > no strong reason to do either, but the preference is to use version > > bumps, because they are more compact on the wire and old versions > > without the field can be retired. I cleaned up some prose around this. > > > > MJS6) I think having the module `group-coordinator-api` is a decent > > place to put it. It's the place for broker-side plugin interfaces > > related to group management. Adding a new module broker-plugins for > > all broker-side plugins seems like scope creep for this KIP, and I am > > not even sure it would make things better. > > > > MJS7) Correct, this is a bit misleading. Exceptions are signaled by > > completing the future exceptionally, never by throwing synchronously > > from setTopology. I will clarify this in the KIP. > > > > MJS8) Yes, similar to ASH01 - the plugin needs to make sure that the > > futures are completed. > > > > MJS9) I agree that 3 POJOs are somewhat awkward, but creating > > dependencies, e.g. between the admin client and a broker-side plugin > > seems even worse to me. Where would we put the common data structure? > > > > MJS10) In principle we are modeling a sum type here (topology > > description | reason of absence), but Java doesn't natively model > > these, so we store two separate fields. Including a status that > > doesn't model the AVAILABLE case because it can be inferred seems more > > of a pitfall than necessary. Avoiding redundancy in APIs is a good > > pattern but not a strict rule, and I think having > > topologyDescription().isPresent() == (status == AVAILABLE) is > > perfectly fine. Also Optional would just add a layer of wrapping what > > can be modeled directly via the enum. But I agree that my distinction > > (omit the AVAILABLE in the RPC and include it in the user-facing API) > > may be unnecessarily confusing, see AJS5. > > > > MJS11) Personally, I think only the plugin knows if something went > > wrong and why, so the best place to define metrics is inside the > > plugin. Similar reasoning to why I want to keep configs, deduplication > > inside the plugin. If people strongly feel that we want to have AK > > metrics for this plugin, we can add them. > > > > MJS12) If the feature is disabled on the client but the broker plugin > > is enabled, the broker would indeed try to resolicitate on the > > client-side with back-off. But the solicitation is very light-weight > > (a single boolean, and requireTopologyId is supposed to be fast), and > > clients will just ignore it, so I don't see a problem with this. > > > > MJS13) The plugin doesn't (and shouldn't need to) know the heartbeat > > interval — it should throttle based on its own clock plus its own > > in-flight tracking. Concrete strategy: for each (groupId, > > groupCreationTimeMs, topologyEpoch) tuple the plugin tracks (i) > > whether a push has been initiated (set when requiresTopologyPush first > > returns true) and (ii) the last requiresTopologyPush=true time. While > > (i) is set and the push hasn't completed, requiresTopologyPush returns > > false. After the push completes successfully it returns false > > permanently for that tuple. On a transient failure, after a back-off > > window (independent of heartbeat cadence — say, exponential starting > > at 1s) it returns true again. On permanent failure > > (TOPOLOGY_DESCRIPTION_TOO_LARGE or plugin-semantic INVALID_REQUEST) it > > returns false permanently and logs. > > > > Hope that makes sense! > > > > On Sat, May 9, 2026 at 4:43 AM Matthias J. Sax <[email protected]> wrote: > > > > > > Thanks for the KIP Lucas. I made a first pass. Couple of > > comments/questions. > > > > > > > > > MJS1(a): This is a follow up to ASH03. I am not sure if I understand the > > > problem? The KIP says, we call `requiresTopologyPush` on every HB. So > > > after an upgrade, the plugin will be called for every existing group, > > > allowing the group to send its topology? So we can just create a UUID > > > before this call? In the end, we need to somehow cache all currently > > > in-use UUID anyway (ie, one for each active group)? > > > > > > > > > MJS1(b): Related to the above. The KIP does not say that UUIDs would be > > > stored by the GC -- so after a broker bounce, or GC fail-over to a > > > different broker, it seems we would forget all currently in-use UUIDs > > > and generate new ones? This would align to what I did ask about above, > > > and it should be fine from plugin POV to just get new UUIDs if none is > > > cached? > > > > > > > > > MJS1(c): However, I am not even sure about TopologyDescriptionId? Should > > > we use <groupId,topologyEpoch> instead, avoid this UUID all together? On > > > the other hand, with regard to ASH03, we might introduce the problem > > > Sanghyeok describe when doing this? On the other hand, on startup, the > > > GC could also check all existing groups, and just call > > > `requiresTopologyPush` pro-actively for each group? > > > > > > > > > MJS1(d): KIP-1313 proposes that clients create their UUID -- should we > > > do the same to integrate with KIP-1313 (in case we keep UUID, and not > > > move to <groupId,topologyEpoch>), to align the behavior across the > > > board? In the end, the topology can only change during a roll, which > > > aligns to a topology-epoch bump anyway? > > > > > > > > > > > > MJS2(a): The KIP introduces a `NodeType` field for the topology > > > description. I am wondering if we need it? We have sources, processors, > > > sink. Only sources can have input topics, and only sinks can have output > > > topic, and processor never have any input/output topic, so it seems just > > > inspecting if input/output topic are present, tells us what node type we > > > have, and have an explicit types seems to be redundant? > > > > > > > > > MJS2(b): I am wondering why we model GlobalStores with two > > > TopologyNodes? We know that for this case, there is exactly one source > > > node, and one processor. Should we simplify this? > > > > > > > > > > > > MJS3: Do we actually need to add > > > `UpdateStreamsGroupTopologyDescriptionResponse`, or could we use a "fire > > > and forget" approache? If an topology update failed, and the plugin > > > re-request the push, the next HB-response would take care of it > > > naturally it seems? Atm the only value we get is to send back some error > > > code. It this worth it? > > > > > > > > > > > > MJS4: The KIP add `UNSUPPORTED_VERSION` error. > > > > > > > UNSUPPORTED_VERSION — the coordinator cannot serve this RPC because no > > topology description plugin is configured > > > Is this the right name for this error? > > > > > > > > > > > > MJS5: the KIP proposed to bump the version of both > > > StreamsGroupDescribeRequest and StreamsGroupDescribeResponse. No > > > objection, but wondering why we prefer it over tagged fields? > > > > > > > > > > > > MJS6: Is `org.apache.kafka.coordinator.group.api.streams` the right > > > place for the plugin interface? It seems we add a very heavy dependency > > > for people implementing the plugin. Would it make sense to add a new > > > module `broker-plugins` instead to make it more light weight? Yes, > > > KIP-714 does the same, but it might be a nice improvement for 714, too, > > > to move their plugin into such a new module? > > > > > > > > > MJS7: The KIP says that `setTopology()` may throw > > > TopologyDescriptionTooLargeException? Later in "plugin guidelines" it > > says: > > > > > > > Reject pushes that exceed it by completing the setTopology future with > > TopologyDescriptionTooLargeException > > > > > > Which one is it? We might not want to do it both ways? Also applies to > > > other exception. > > > > > > > > > > > > MJS8: It seems the broker will need to cache all non-yet-completed > > > ComparableFutures. What if the plugin has a bug, and never completes > > > it's future? Would we get some leak? -- Or is your answer the same as > > > for ASH01? > > > > > > > > > > > > MJS9: Should we move `StreamsGroupTopologyDescription` to the same > > > propose `broker-plugin` module? I am also wondering about Alieh's > > > question: Do we keep it's own class in purpose, of should be unify with > > > the existing interface? And why do we get a second > > > `StreamsGroupTopologyDescription` for the admin client? -- I understand > > > your argument about "this may evolve independently", but do we need 3 > > > copies of the same? > > > > > > > > > MJS10(a): Do we need to expose `StreamsGroupTopologyDescriptionStatus` > > > via `topologyDescriptionStatus()`? I am wondering if > > > `StreamsGroupTopologyDescription` could model this directly? Also not > > > sure if `topologyDescriptionStatus` should return an `Optional` or not? > > > > > > > > > > > > MJS10(b): I can see argument to ignore what I said in MJS10(b), but if > > > we have an Optional, why do we need status `AVAILABLE`? > > > > > > > > > > > > MJS11: The KIP says we don't add any broker side metrics. KIP-714 did. > > > Wondering why we won't need any for this KIP? > > > > > > > > > > > > MJS12: If this feature is disabled client side, can the broker learn > > > about it? Or might it keep requesting the topology over and over again, > > > and the client would just keep ignoring the request? Would we want some > > > error-code the clients sends to the broker for this case instead? > > > > > > > > > MJS13: In the "plugin guidelines", the KIP says: > > > > > > > Avoid concurrent or repetitive pushes > > > > > > Does the plugin know the HB interval to implement this in reasonable > > > way? Any better guidance we can give how to implement this? > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 5/5/26 6:26 AM, Lucas Brutschy via dev wrote: > > > > Thanks Sanghyeok and Alieh! > > > > > > > > ASH01: Both risks are real. But I would argue that anyways, the > > > > plugins that Kafka currently defines need to be implemented correctly > > > > for Kafka to reliably work. This is the case here as well - just like > > > > a consumer group assignor needs to be correct and fast, > > > > requiresTopologyPush needs to be implemented correct and fast. I did > > > > give most of the responsibility to the plugin here, because it depends > > > > on the plugin in which situations it requires a new topology push from > > > > the client. I feel like adding extra logic to time out slow > > > > requireTopologyPush calls, or enforcing minimum intervals would > > > > require more configuration options and an extra level of deduplication > > > > logic - confusing users that do not use the plugin (configurations are > > > > irrelevant for them) and confusing users that do use the plugin (two > > > > potentially conflicting levels need to be configured). > > > > > > > > ASH02: Good point and this is actually something that I considered. > > > > But it's actually not that easy - in principle, there is only one > > > > current topology description, but there may be an arbitrary number of > > > > stale topologies active in the group. Would we require the plugin to > > > > store all stale topologies? I think this would be a feasible > > > > extension, but would definitely add some complexity. I would propose > > > > adding this as a follow-up item. I can add this to the future work > > > > section. > > > > > > > > ASH03: Good catch. I don't really want to have an "upgrade logic" that > > > > we need to preserve forever in the group coordinator. I think it would > > > > be okay to allow "ZERO_UUID" for any topology that exists when the > > > > broker upgraded. > > > > > > > > ASH04: I noted this in the future work section. In principle we can > > > > detect mismatches between topology descriptions on the client, but we > > > > do not include it in this KIP, since it would complicate things. The > > > > first successfully stored topology is authoritative. > > > > > > > > AS01) Both topologies are derived from the same Topology instance on > > > > the client at a given epoch, so at the source they're consistent by > > > > construction. During a topology update, the topology ID changes, and > > > > initially we will not have a new topology description passed to the > > > > plugin. In this case, we can get an intermediate NOT_STORED response > > > > when we try to describe the streams group, until a client pushes the > > > > new topology description. But the result will be consistent with the > > > > topology information used for assignments. This assumes that all > > > > clients with the same topology epoch use the same topology > > > > description. Mismatch detection is noted as future work. > > > > > > > > AS02) I think your point about retrying with too large descriptions is > > > > valid. It would actually make sense to leave topology size checking to > > > > the plugin as well - it can decide the maximum topology size and stop > > > > returning requiresTopologyPush for topologies that are confirmed to be > > > > too large. I will make this change in the next revision of the KIP. > > > > > > > > AS03) Yes, this is mostly to avoid dependencies between Kafka > > > > packages. Note that we do not necessarily need to keep the two > > > > implementations in sync. The streams-side TopologyDescription may > > > > evolve differently than the admin-side TopologyDescription. The two > > > > are only weakly linked through the RPC definition. > > > > > > > > AS04) Agreed. However, topology has the slowest-changing lifecycle of > > > > the three, so it should be less confusing than assignments and > > > > members. > > > > > > > > AS05) Correct. The plugin is free to forward, persist, fan out, mirror > > > > to multiple sinks, or anything else. The KIP intentionally doesn't > > > > constrain the storage backend. > > > > > > > > On Tue, May 5, 2026 at 1:19 PM Alieh Saeedi <[email protected]> > > wrote: > > > >> > > > >> Thanks Lucas for the KIP. The KIP is already in very good shape and > > covers the edge cases. I still have a few questions and considerations I’d > > like to share. > > > >> > > > >> AS01: Are Assignment topology (defined in KIP-1071) and the > > Description topology (defined in KIP-1331) guaranteed to be consistent > > views of the same logical topology, or can they drift? Are we guaranteeing > > that every assignment we surface references only nodes/topics present in > > the current description topology, or can operators see combinations that > > don’t line up? > > > >> > > > >> AS02: I'm cusrious about the rationale or empirical data behind the > > 350 KB default (e.g., based on observed real-world topologies)? Also the > > KIP says the broker measures topology size and rejects oversized payloads > > with TOPOLOGY_DESCRIPTION_TOO_LARGE. Should the Streams client attempt a > > best-effort pre-check of the serialized size to avoid repeated failing > > pushes and log a clearer local error? Or is the intent to keep the client > > simple and rely entirely on the broker response + plugin behavior for this > > case? > > > >> > > > >> AS03: Why do we introduce a separate Admin-side POJO instead of > > reusing TopologyDescription from the Streams API—for dependency/semantic > > reasons? And how do we plan to keep the two representations in sync? > > > >> > > > >> AS04: Somewhat related to AS01.... In practice we’ve seen that > > because members and assignments change so dynamically, a user may see > > different assignments or members over just a few seconds, or a member with > > a specific memberId may disappear entirely. Having the topology visible > > might help users understand what’s going on—but it could also make things > > more confusing, depending on the situation. > > > >> > > > >> AS05: I assume that even with a single plugin, multiple downstream > > systems can still benefit from it (the plugin can of course fan out to > > multiple downstream systems). Am I right? > > > >> > > > >> Thanks, > > > >> Alieh > > > >> > > > >> > > > >> > > > >> On Mon, May 4, 2026 at 11:39 AM Lucas Brutschy via dev < > > [email protected]> wrote: > > > >>> > > > >>> Hi all, > > > >>> > > > >>> I would like to start the discussion on KIP-1331. The idea is to > > > >>> optionally make a topology description available to the broker, in > > the > > > >>> spirit of KIP-714. Looking forward to your feedback! > > > >>> > > > >>> > > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-1331*3A*Streams*Group*Topology*Description*Plugin__;JSsrKysr!!Ayb5sqE7!sxqGDUcjOzRpt9Gk0jE1XnVSit-FZMIihk2UsXWUI0jmdYK2nTcO1hP-9WiW5sLBMw8amIUxG2PGvhdRhok$ > > > >>> > > > >>> Best, > > > >>> Lucas > > > > >
