Hi Lucas, Thanks for the detailed KIP. I've made a pass.
BB1: There's a structural discrepency between the `clients.admin` and `group.api.streams` version of `StreamsGroupTopologyDescription`. The `Node` interface for the one living in the clients module has `predecessors` and `sucessors` methods while the one in the `group.api.streams` package only defines `successors`. I'm assuming this is an oversight and not the intended design. BB2: For the `predecessors` and `successors` do they only contain the direct node and not the entire graph prior to or downstream of the current node. For example if you have a topology of A -> B -> C -> D -> E the successors of `A` would include B only and the predecessors of `E` would return a set comprised of `D`. Of couse if a single node had multiple direct successors or predecessors then those would be included. BB3: The TOPOLOGY_DESCRIPTION_UPDATE_FAILED is somewhat of a black box. >From what I can see it's serving as a catch-all for any non-Invalid/non-TooLarge plugin failure. IIUC the client clears the pending ID on this error and won't retry until the next heartbeat ID change. Combined with "no client-side retry," a transient plugin failure means the topology is effectively unavailable until something forces an epoch bump. Would it make sense to consider distinguishing transient vs permanent (similar to the network-error handling) so transient plugin errors can retried without losing the ID? Thanks, Bill On Fri, May 8, 2026 at 11:43 AM Andrew Schofield <[email protected]> wrote: > Hi Lucas, > Thanks for the KIP. A few comments from an initial review. > > AJS1: The config group.streams.topology.description.class sounds like it's > configuring the class for the actual description, not the plugin for > descriptions. > > AJS2: The Java TopologyDescription doesn't match the RPC schema. For > example, sink topics are missing. > > AJS3: The topic names in the UpdateStreamsGroupTopologyDescription request > should be marked as entityType: topic. > > AJS4: I wonder why TOPOLOGY_DESCRIPTION_UPDATE_FAILED is an error surfaced > to the client. What would the client do? > > AJS5: I suggest having a successful TopologyDescriptionStatus (probably > AVAILABLE) to be used when the StreamsGroupDescribeResponse does contain > the topology description, as opposed to saying to ignore the status if the > topology is non-null. > > AJS6: Would you consider the TopologyDescriptionId part of the topology > description to return in the StreamsGroupDescribeResponse? My question > really is whether this is a user-facing concept, which I expect it is. > > AJS7: Might the broker call > StreamsGroupTopologyDescriptionPlugin.setTopology() for multiple versions > of the same group's topology description? Since it's an async call, I > wonder what concurrency protections there are. Does the broker just have > one outstanding topology description per streams group at a time? > > AJS8: Is there supposed to be a visible relationship between a group epoch > and the topology? > > Thanks, > Andrew > > On 2026/05/05 13:26:38 Lucas Brutschy via dev wrote: > > Thanks Sanghyeok and Alieh! > > > > ASH01: Both risks are real. But I would argue that anyways, the > > plugins that Kafka currently defines need to be implemented correctly > > for Kafka to reliably work. This is the case here as well - just like > > a consumer group assignor needs to be correct and fast, > > requiresTopologyPush needs to be implemented correct and fast. I did > > give most of the responsibility to the plugin here, because it depends > > on the plugin in which situations it requires a new topology push from > > the client. I feel like adding extra logic to time out slow > > requireTopologyPush calls, or enforcing minimum intervals would > > require more configuration options and an extra level of deduplication > > logic - confusing users that do not use the plugin (configurations are > > irrelevant for them) and confusing users that do use the plugin (two > > potentially conflicting levels need to be configured). > > > > ASH02: Good point and this is actually something that I considered. > > But it's actually not that easy - in principle, there is only one > > current topology description, but there may be an arbitrary number of > > stale topologies active in the group. Would we require the plugin to > > store all stale topologies? I think this would be a feasible > > extension, but would definitely add some complexity. I would propose > > adding this as a follow-up item. I can add this to the future work > > section. > > > > ASH03: Good catch. I don't really want to have an "upgrade logic" that > > we need to preserve forever in the group coordinator. I think it would > > be okay to allow "ZERO_UUID" for any topology that exists when the > > broker upgraded. > > > > ASH04: I noted this in the future work section. In principle we can > > detect mismatches between topology descriptions on the client, but we > > do not include it in this KIP, since it would complicate things. The > > first successfully stored topology is authoritative. > > > > AS01) Both topologies are derived from the same Topology instance on > > the client at a given epoch, so at the source they're consistent by > > construction. During a topology update, the topology ID changes, and > > initially we will not have a new topology description passed to the > > plugin. In this case, we can get an intermediate NOT_STORED response > > when we try to describe the streams group, until a client pushes the > > new topology description. But the result will be consistent with the > > topology information used for assignments. This assumes that all > > clients with the same topology epoch use the same topology > > description. Mismatch detection is noted as future work. > > > > AS02) I think your point about retrying with too large descriptions is > > valid. It would actually make sense to leave topology size checking to > > the plugin as well - it can decide the maximum topology size and stop > > returning requiresTopologyPush for topologies that are confirmed to be > > too large. I will make this change in the next revision of the KIP. > > > > AS03) Yes, this is mostly to avoid dependencies between Kafka > > packages. Note that we do not necessarily need to keep the two > > implementations in sync. The streams-side TopologyDescription may > > evolve differently than the admin-side TopologyDescription. The two > > are only weakly linked through the RPC definition. > > > > AS04) Agreed. However, topology has the slowest-changing lifecycle of > > the three, so it should be less confusing than assignments and > > members. > > > > AS05) Correct. The plugin is free to forward, persist, fan out, mirror > > to multiple sinks, or anything else. The KIP intentionally doesn't > > constrain the storage backend. > > > > On Tue, May 5, 2026 at 1:19 PM Alieh Saeedi <[email protected]> > wrote: > > > > > > Thanks Lucas for the KIP. The KIP is already in very good shape and > covers the edge cases. I still have a few questions and considerations I’d > like to share. > > > > > > AS01: Are Assignment topology (defined in KIP-1071) and the > Description topology (defined in KIP-1331) guaranteed to be consistent > views of the same logical topology, or can they drift? Are we guaranteeing > that every assignment we surface references only nodes/topics present in > the current description topology, or can operators see combinations that > don’t line up? > > > > > > AS02: I'm cusrious about the rationale or empirical data behind the > 350 KB default (e.g., based on observed real-world topologies)? Also the > KIP says the broker measures topology size and rejects oversized payloads > with TOPOLOGY_DESCRIPTION_TOO_LARGE. Should the Streams client attempt a > best-effort pre-check of the serialized size to avoid repeated failing > pushes and log a clearer local error? Or is the intent to keep the client > simple and rely entirely on the broker response + plugin behavior for this > case? > > > > > > AS03: Why do we introduce a separate Admin-side POJO instead of > reusing TopologyDescription from the Streams API—for dependency/semantic > reasons? And how do we plan to keep the two representations in sync? > > > > > > AS04: Somewhat related to AS01.... In practice we’ve seen that because > members and assignments change so dynamically, a user may see different > assignments or members over just a few seconds, or a member with a specific > memberId may disappear entirely. Having the topology visible might help > users understand what’s going on—but it could also make things more > confusing, depending on the situation. > > > > > > AS05: I assume that even with a single plugin, multiple downstream > systems can still benefit from it (the plugin can of course fan out to > multiple downstream systems). Am I right? > > > > > > Thanks, > > > Alieh > > > > > > > > > > > > On Mon, May 4, 2026 at 11:39 AM Lucas Brutschy via dev < > [email protected]> wrote: > > >> > > >> Hi all, > > >> > > >> I would like to start the discussion on KIP-1331. The idea is to > > >> optionally make a topology description available to the broker, in the > > >> spirit of KIP-714. Looking forward to your feedback! > > >> > > >> > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-1331*3A*Streams*Group*Topology*Description*Plugin__;JSsrKysr!!Ayb5sqE7!sxqGDUcjOzRpt9Gk0jE1XnVSit-FZMIihk2UsXWUI0jmdYK2nTcO1hP-9WiW5sLBMw8amIUxG2PGvhdRhok$ > > >> > > >> Best, > > >> Lucas > > >
