Thanks for the KIP Lucas. I made a first pass. Couple of comments/questions.


MJS1(a): This is a follow up to ASH03. I am not sure if I understand the problem? The KIP says, we call `requiresTopologyPush` on every HB. So after an upgrade, the plugin will be called for every existing group, allowing the group to send its topology? So we can just create a UUID before this call? In the end, we need to somehow cache all currently in-use UUID anyway (ie, one for each active group)?


MJS1(b): Related to the above. The KIP does not say that UUIDs would be stored by the GC -- so after a broker bounce, or GC fail-over to a different broker, it seems we would forget all currently in-use UUIDs and generate new ones? This would align to what I did ask about above, and it should be fine from plugin POV to just get new UUIDs if none is cached?


MJS1(c): However, I am not even sure about TopologyDescriptionId? Should we use <groupId,topologyEpoch> instead, avoid this UUID all together? On the other hand, with regard to ASH03, we might introduce the problem Sanghyeok describe when doing this? On the other hand, on startup, the GC could also check all existing groups, and just call `requiresTopologyPush` pro-actively for each group?


MJS1(d): KIP-1313 proposes that clients create their UUID -- should we do the same to integrate with KIP-1313 (in case we keep UUID, and not move to <groupId,topologyEpoch>), to align the behavior across the board? In the end, the topology can only change during a roll, which aligns to a topology-epoch bump anyway?



MJS2(a): The KIP introduces a `NodeType` field for the topology description. I am wondering if we need it? We have sources, processors, sink. Only sources can have input topics, and only sinks can have output topic, and processor never have any input/output topic, so it seems just inspecting if input/output topic are present, tells us what node type we have, and have an explicit types seems to be redundant?


MJS2(b): I am wondering why we model GlobalStores with two TopologyNodes? We know that for this case, there is exactly one source node, and one processor. Should we simplify this?



MJS3: Do we actually need to add `UpdateStreamsGroupTopologyDescriptionResponse`, or could we use a "fire and forget" approache? If an topology update failed, and the plugin re-request the push, the next HB-response would take care of it naturally it seems? Atm the only value we get is to send back some error code. It this worth it?



MJS4: The KIP add `UNSUPPORTED_VERSION` error.

UNSUPPORTED_VERSION — the coordinator cannot serve this RPC because no topology 
description plugin is configured
Is this the right name for this error?



MJS5: the KIP proposed to bump the version of both StreamsGroupDescribeRequest and StreamsGroupDescribeResponse. No objection, but wondering why we prefer it over tagged fields?



MJS6: Is `org.apache.kafka.coordinator.group.api.streams` the right place for the plugin interface? It seems we add a very heavy dependency for people implementing the plugin. Would it make sense to add a new module `broker-plugins` instead to make it more light weight? Yes, KIP-714 does the same, but it might be a nice improvement for 714, too, to move their plugin into such a new module?


MJS7: The KIP says that `setTopology()` may throw TopologyDescriptionTooLargeException? Later in "plugin guidelines" it says:

Reject pushes that exceed it by completing the setTopology future with 
TopologyDescriptionTooLargeException

Which one is it? We might not want to do it both ways? Also applies to other exception.



MJS8: It seems the broker will need to cache all non-yet-completed ComparableFutures. What if the plugin has a bug, and never completes it's future? Would we get some leak? -- Or is your answer the same as for ASH01?



MJS9: Should we move `StreamsGroupTopologyDescription` to the same propose `broker-plugin` module? I am also wondering about Alieh's question: Do we keep it's own class in purpose, of should be unify with the existing interface? And why do we get a second `StreamsGroupTopologyDescription` for the admin client? -- I understand your argument about "this may evolve independently", but do we need 3 copies of the same?


MJS10(a): Do we need to expose `StreamsGroupTopologyDescriptionStatus` via `topologyDescriptionStatus()`? I am wondering if `StreamsGroupTopologyDescription` could model this directly? Also not sure if `topologyDescriptionStatus` should return an `Optional` or not?



MJS10(b): I can see argument to ignore what I said in MJS10(b), but if we have an Optional, why do we need status `AVAILABLE`?



MJS11: The KIP says we don't add any broker side metrics. KIP-714 did. Wondering why we won't need any for this KIP?



MJS12: If this feature is disabled client side, can the broker learn about it? Or might it keep requesting the topology over and over again, and the client would just keep ignoring the request? Would we want some error-code the clients sends to the broker for this case instead?


MJS13: In the "plugin guidelines", the KIP says:

Avoid concurrent or repetitive pushes

Does the plugin know the HB interval to implement this in reasonable way? Any better guidance we can give how to implement this?



-Matthias



On 5/5/26 6:26 AM, Lucas Brutschy via dev wrote:
Thanks Sanghyeok and Alieh!

ASH01: Both risks are real. But I would argue that anyways, the
plugins that Kafka currently defines need to be implemented correctly
for Kafka to reliably work. This is the case here as well - just like
a consumer group assignor needs to be correct and fast,
requiresTopologyPush needs to be implemented correct and fast. I did
give most of the responsibility to the plugin here, because it depends
on the plugin in which situations it requires a new topology push from
the client. I feel like adding extra logic to time out slow
requireTopologyPush calls, or enforcing minimum intervals would
require more configuration options and an extra level of deduplication
logic - confusing users that do not use the plugin (configurations are
irrelevant for them) and confusing users that do use the plugin (two
potentially conflicting levels need to be configured).

ASH02: Good point and this is actually something that I considered.
But it's actually not that easy - in principle, there is only one
current topology description, but there may be an arbitrary number of
stale topologies active in the group. Would we require the plugin to
store all stale topologies? I think this would be a feasible
extension, but would definitely add some complexity. I would propose
adding this as a follow-up item. I can add this to the future work
section.

ASH03: Good catch. I don't really want to have an "upgrade logic" that
we need to preserve forever in the group coordinator. I think it would
be okay to allow "ZERO_UUID" for any topology that exists when the
broker upgraded.

ASH04: I noted this in the future work section. In principle we can
detect mismatches between topology descriptions on the client, but we
do not include it in this KIP, since it would complicate things. The
first successfully stored topology is authoritative.

AS01) Both topologies are derived from the same Topology instance on
the client at a given epoch, so at the source they're consistent by
construction. During a topology update, the topology ID changes, and
initially we will not have a new topology description passed to the
plugin. In this case, we can get an intermediate NOT_STORED response
when we try to describe the streams group, until a client pushes the
new topology description. But the result will be consistent with the
topology information used for assignments. This assumes that all
clients with the same topology epoch use the same topology
description. Mismatch detection is noted as future work.

AS02) I think your point about retrying with too large descriptions is
valid. It would actually make sense to leave topology size checking to
the plugin as well - it can decide the maximum topology size and stop
returning requiresTopologyPush for topologies that are confirmed to be
too large. I will make this change in the next revision of the KIP.

AS03) Yes, this is mostly to avoid dependencies between Kafka
packages. Note that we do not necessarily need to keep the two
implementations in sync. The streams-side TopologyDescription may
evolve differently than the admin-side TopologyDescription. The two
are only weakly linked through the RPC definition.

AS04) Agreed. However, topology has the slowest-changing lifecycle of
the three, so it should be less confusing than assignments and
members.

AS05) Correct. The plugin is free to forward, persist, fan out, mirror
to multiple sinks, or anything else. The KIP intentionally doesn't
constrain the storage backend.

On Tue, May 5, 2026 at 1:19 PM Alieh Saeedi <[email protected]> wrote:

Thanks Lucas for the KIP. The KIP is already in very good shape and covers the 
edge cases. I still have a few questions and considerations I’d like to share.

AS01:  Are Assignment topology (defined in KIP-1071) and the Description 
topology (defined in KIP-1331) guaranteed to be consistent views of the same 
logical topology, or can they drift? Are we guaranteeing that every assignment 
we surface references only nodes/topics present in the current description 
topology, or can operators see combinations that don’t line up?

AS02: I'm cusrious about the rationale or empirical data behind the 350 KB 
default (e.g., based on observed real-world topologies)? Also the KIP says the 
broker measures topology size and rejects oversized payloads with 
TOPOLOGY_DESCRIPTION_TOO_LARGE. Should the Streams client attempt a best-effort 
pre-check of the serialized size to avoid repeated failing pushes and log a 
clearer local error? Or is the intent to keep the client simple and rely 
entirely on the broker response + plugin behavior for this case?

AS03: Why do we introduce a separate Admin-side POJO instead of reusing 
TopologyDescription from the Streams API—for dependency/semantic reasons? And 
how do we plan to keep the two representations in sync?

AS04: Somewhat related to AS01.... In practice we’ve seen that because members 
and assignments change so dynamically, a user may see different assignments or 
members over just a few seconds, or a member with a specific memberId may 
disappear entirely. Having the topology visible might help users understand 
what’s going on—but it could also make things more confusing, depending on the 
situation.

AS05: I assume that even with a single plugin, multiple downstream systems can 
still benefit from it (the plugin can of course fan out to multiple downstream 
systems). Am I right?

Thanks,
Alieh



On Mon, May 4, 2026 at 11:39 AM Lucas Brutschy via dev <[email protected]> 
wrote:

Hi all,

I would like to start the discussion on KIP-1331. The idea is to
optionally make a topology description available to the broker, in the
spirit of KIP-714. Looking forward to your feedback!

https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-1331*3A*Streams*Group*Topology*Description*Plugin__;JSsrKysr!!Ayb5sqE7!sxqGDUcjOzRpt9Gk0jE1XnVSit-FZMIihk2UsXWUI0jmdYK2nTcO1hP-9WiW5sLBMw8amIUxG2PGvhdRhok$

Best,
Lucas

Reply via email to