Hi Lucas,

Thanks for the detailed KIP. I've made a pass.

BB1: There's a structural discrepency between the  `clients.admin` and
`group.api.streams`  version of `StreamsGroupTopologyDescription`. The
`Node` interface for the one living in the clients module has
`predecessors` and `sucessors` methods while the one in the
`group.api.streams` package only defines `successors`.  I'm assuming this
is an oversight and not the intended design.

BB2: For the `predecessors` and `successors` do they only contain the
direct node and not the entire graph prior to or downstream of the current
node.  For example if you have a topology of  A -> B -> C -> D -> E  the
successors of `A` would include B only and the predecessors of `E` would
return a set comprised of `D`. Of couse if a single node had multiple
direct successors or predecessors then those would be included.

BB3: The TOPOLOGY_DESCRIPTION_UPDATE_FAILED is somewhat of a black box.
>From what I can see it's serving as a catch-all for any
non-Invalid/non-TooLarge plugin failure.
IIUC the client clears the pending ID on this error and won't retry until
the next heartbeat ID change. Combined with "no client-side retry," a
transient plugin failure means the topology is effectively unavailable
until something forces an epoch bump. Would it make sense to consider
distinguishing transient vs permanent (similar to the network-error
handling) so transient plugin errors can retried without losing the ID?

Thanks,
Bill

On Fri, May 8, 2026 at 11:43 AM Andrew Schofield <[email protected]>
wrote:

> Hi Lucas,
> Thanks for the KIP. A few comments from an initial review.
>
> AJS1: The config group.streams.topology.description.class sounds like it's
> configuring the class for the actual description, not the plugin for
> descriptions.
>
> AJS2: The Java TopologyDescription doesn't match the RPC schema. For
> example, sink topics are missing.
>
> AJS3: The topic names in the UpdateStreamsGroupTopologyDescription request
> should be marked as entityType: topic.
>
> AJS4: I wonder why TOPOLOGY_DESCRIPTION_UPDATE_FAILED is an error surfaced
> to the client. What would the client do?
>
> AJS5: I suggest having a successful TopologyDescriptionStatus (probably
> AVAILABLE) to be used when the StreamsGroupDescribeResponse does contain
> the topology description, as opposed to saying to ignore the status if the
> topology is non-null.
>
> AJS6: Would you consider the TopologyDescriptionId part of the topology
> description to return in the StreamsGroupDescribeResponse? My question
> really is whether this is a user-facing concept, which I expect it is.
>
> AJS7: Might the broker call
> StreamsGroupTopologyDescriptionPlugin.setTopology() for multiple versions
> of the same group's topology description? Since it's an async call, I
> wonder what concurrency protections there are. Does the broker just have
> one outstanding topology description per streams group at a time?
>
> AJS8: Is there supposed to be a visible relationship between a group epoch
> and the topology?
>
> Thanks,
> Andrew
>
> On 2026/05/05 13:26:38 Lucas Brutschy via dev wrote:
> > Thanks Sanghyeok and Alieh!
> >
> > ASH01: Both risks are real. But I would argue that anyways, the
> > plugins that Kafka currently defines need to be implemented correctly
> > for Kafka to reliably work. This is the case here as well - just like
> > a consumer group assignor needs to be correct and fast,
> > requiresTopologyPush needs to be implemented correct and fast. I did
> > give most of the responsibility to the plugin here, because it depends
> > on the plugin in which situations it requires a new topology push from
> > the client. I feel like adding extra logic to time out slow
> > requireTopologyPush calls, or enforcing minimum intervals would
> > require more configuration options and an extra level of deduplication
> > logic - confusing users that do not use the plugin (configurations are
> > irrelevant for them) and confusing users that do use the plugin (two
> > potentially conflicting levels need to be configured).
> >
> > ASH02: Good point and this is actually something that I considered.
> > But it's actually not that easy - in principle, there is only one
> > current topology description, but there may be an arbitrary number of
> > stale topologies active in the group. Would we require the plugin to
> > store all stale topologies? I think this would be a feasible
> > extension, but would definitely add some complexity. I would propose
> > adding this as a follow-up item. I can add this to the future work
> > section.
> >
> > ASH03: Good catch. I don't really want to have an "upgrade logic" that
> > we need to preserve forever in the group coordinator. I think it would
> > be okay to allow "ZERO_UUID" for any topology that exists when the
> > broker upgraded.
> >
> > ASH04: I noted this in the future work section. In principle we can
> > detect mismatches between topology descriptions on the client, but we
> > do not include it in this KIP, since it would complicate things. The
> > first successfully stored topology is authoritative.
> >
> > AS01) Both topologies are derived from the same Topology instance on
> > the client at a given epoch, so at the source they're consistent by
> > construction. During a topology update, the topology ID changes, and
> > initially we will not have a new topology description passed to the
> > plugin. In this case, we can get an intermediate NOT_STORED response
> > when we try to describe the streams group, until a client pushes the
> > new topology description. But the result will be consistent with the
> > topology information used for assignments. This assumes that all
> > clients with the same topology epoch use the same topology
> > description. Mismatch detection is noted as future work.
> >
> > AS02) I think your point about retrying with too large descriptions is
> > valid. It would actually make sense to leave topology size checking to
> > the plugin as well - it can decide the maximum topology size and stop
> > returning requiresTopologyPush for topologies that are confirmed to be
> > too large. I will make this change in the next revision of the KIP.
> >
> > AS03) Yes, this is mostly to avoid dependencies between Kafka
> > packages. Note that we do not necessarily need to keep the two
> > implementations in sync. The streams-side TopologyDescription may
> > evolve differently than the admin-side TopologyDescription. The two
> > are only weakly linked through the RPC definition.
> >
> > AS04) Agreed. However, topology has the slowest-changing lifecycle of
> > the three, so it should be less confusing than assignments and
> > members.
> >
> > AS05) Correct. The plugin is free to forward, persist, fan out, mirror
> > to multiple sinks, or anything else. The KIP intentionally doesn't
> > constrain the storage backend.
> >
> > On Tue, May 5, 2026 at 1:19 PM Alieh Saeedi <[email protected]>
> wrote:
> > >
> > > Thanks Lucas for the KIP. The KIP is already in very good shape and
> covers the edge cases. I still have a few questions and considerations I’d
> like to share.
> > >
> > > AS01:  Are Assignment topology (defined in KIP-1071) and the
> Description topology (defined in KIP-1331) guaranteed to be consistent
> views of the same logical topology, or can they drift? Are we guaranteeing
> that every assignment we surface references only nodes/topics present in
> the current description topology, or can operators see combinations that
> don’t line up?
> > >
> > > AS02: I'm cusrious about the rationale or empirical data behind the
> 350 KB default (e.g., based on observed real-world topologies)? Also the
> KIP says the broker measures topology size and rejects oversized payloads
> with TOPOLOGY_DESCRIPTION_TOO_LARGE. Should the Streams client attempt a
> best-effort pre-check of the serialized size to avoid repeated failing
> pushes and log a clearer local error? Or is the intent to keep the client
> simple and rely entirely on the broker response + plugin behavior for this
> case?
> > >
> > > AS03: Why do we introduce a separate Admin-side POJO instead of
> reusing TopologyDescription from the Streams API—for dependency/semantic
> reasons? And how do we plan to keep the two representations in sync?
> > >
> > > AS04: Somewhat related to AS01.... In practice we’ve seen that because
> members and assignments change so dynamically, a user may see different
> assignments or members over just a few seconds, or a member with a specific
> memberId may disappear entirely. Having the topology visible might help
> users understand what’s going on—but it could also make things more
> confusing, depending on the situation.
> > >
> > > AS05: I assume that even with a single plugin, multiple downstream
> systems can still benefit from it (the plugin can of course fan out to
> multiple downstream systems). Am I right?
> > >
> > > Thanks,
> > > Alieh
> > >
> > >
> > >
> > > On Mon, May 4, 2026 at 11:39 AM Lucas Brutschy via dev <
> [email protected]> wrote:
> > >>
> > >> Hi all,
> > >>
> > >> I would like to start the discussion on KIP-1331. The idea is to
> > >> optionally make a topology description available to the broker, in the
> > >> spirit of KIP-714. Looking forward to your feedback!
> > >>
> > >>
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-1331*3A*Streams*Group*Topology*Description*Plugin__;JSsrKysr!!Ayb5sqE7!sxqGDUcjOzRpt9Gk0jE1XnVSit-FZMIihk2UsXWUI0jmdYK2nTcO1hP-9WiW5sLBMw8amIUxG2PGvhdRhok$
> > >>
> > >> Best,
> > >> Lucas
> >
>

Reply via email to