Thanks Fede, a couple of follow up questions ... > The workaround would be to recreate the conflicting destination topic (which gets a new random UUID), then retry the mirror.
I would clarify it in the KIP. > Describe mirror should help for tracking. Do you jave any specific use case that would require LME? Actually the KIP is missing the definition of DescribeMirrorsResult so I am not sure what's going to contain to track the mirror. I see that's the same for the other ***Result class. Are the going to be mapped 1:1 with the response messages within the protocol? For example DescribeMirrorsResult class maps on DescribeMirrorsResponse protocol message? Shouldn't we have the ***Result class definitions anyway? On Fri, 13 Mar 2026 at 13:57, Federico Valeri <[email protected]> wrote: > Hi Paolo, thanks for the review and sorry for the late reply, but I > was a bit busy this week. > > >When the unclean.leader.election.enable is set to true, the broker will > >log a warning at every configuration synchronization period. > >Be more explicit about what the warning says. > > The message says that log divergence cannot be reconciled across > clusters with ULE enabled. That said, we now have an alternative > design to support ULE, so this will probably go away. > > https://cwiki.apache.org/confluence/display/KAFKA/Unclean+Leader+Election+in+Cluster+Mirroring > > >This topic ID is not used by other topics in the current cluster > >In such a case, which should be very unlikely, what's going to happen? > >Isn't it possible to mirror the topic? > > Topic IDs are UUIDs (128-bit), so the collision probability is zero in > practice. But if it did happen, the CreateTopic request with > MirrorInfo would fail validation since the destination cluster already > has a different topic using that UUID. The topic is not mirrorable in > that state. The workaround would be to recreate the conflicting > destination topic (which gets a new random UUID), then retry the > mirror. > > >To enable it, all cluster nodes (controllers and brokers) must explicitly > >enable unstable API versions and unstable feature versions in all > >configuration files. After starting the cluster with a minimum metadata > >version, operators can dynamically enable the mirror version feature to > >activate Cluster Mirroring. > >AFAIU there is going to be a dedicated feature flag for it, right? If yes > >can we state it clearly also specifying the exact name (i.e. > mirror.version > >or something similar)? > > The feature flag is mentioned in the "Compatibility, Deprecation, and > Migration Plan" section, but I added some more information. > > >When running the kafka-mirrors.sh tool to list the mirrors, other than > >showing the SOURCE-BOOTSTRAP, it could be useful to have also the > clusterId > >which, as a unique identifier, could be helpful in automated systems using > >the cluster mirroring. Of course, it would be important to have in the > >ListMirrorsResponse as well as an additional field. > > That's a good idea as SOURCE-BOOTSTRAP can change (e.g., broker > rotation, DNS updates), but clusterId is a stable, immutable > identifier assigned at cluster creation. Kip updated. > > >What happens in case of Kafka downgrade from a version supporting > mirroring > >to an older one not supporting it. > >The mirror won't be running but the topic configuration will still have > >config parameters like mirror.name and so on, right? Are they just > ignored > >by the older Kafka version and the cluster will work without issues? > > As you say, no mirroring would be possible and you would have to > manually clean up the internal topic. All mirror topics would become > writable and that's fine. The real problem is that cluster metadata > would not be compatible, as we are adding a new configuration resource > type that an older controller wouldn't recognize, causing it to fail > on loading. This is why downgrades are only supported between versions > that share a valid metadata format. > > >Is there any migration path for users who want to migrate from using > Mirror > >Maker 2 to the cluster mirroring? > >I mean, something like a tool useful to create a corresponding cluster > >mirroring configuration starting from a MM2 one. Nothing that runs the > >migration automatically but something that can be provided to the users as > >output to be validated and put in place by them. > > Not really. I don't think it is easy to map the two system's > configurations, but in any case it is out of scope for this KIP. > > >The Admin Client is missing methods to pause and stop mirroring (but we > >have corresponding protocol messages). Is it on purpose? Any specific > >reasons? They would be important from an automatic operator perspective > use > >case. > > Now you should see them. Thanks for poiting out. > > >Also a method to provide the LastMirroredOffset from the source cluster > >could be useful for progress and tracking purposes. > > Describe mirror should help for tracking. Do you jave any specific use > case that would require LME? > > >Finally, what about a method to get the mirror states? I don't think the > >describe method provides such information > > A mirror does not have state, it is just a logical grouping of "mirror > partitions" that have state. Describe shows each partition state. > > >In general, I think that the Admin Client section needs to cover in more > >details the new classes definition like CreateMirrorOptions, > >CreateMirrorResult, ... and so on for all the defined new methods. > > For now, I added some javadoc to all admin methods. We don't have > custom options at the moment. I will add more details about results. > > >AddTopicsToMirrorResult addTopicsToMirror(Map<String, String> > >topicToMirrorName, AddTopicsToMirrorOptions options); > >Isn't it missing the mirrorName (as you have in the removeTopicsFromMirror > >counterpart)? > >What's the topicToMirrorName parameter if it's defined as a Map? The > method > >is also plural using "topics" so comparing to the removeTopicsFromMirror > >method, I would assume the parameter really is Set<String> topics? > >Comparing to the corresponding protocol message AddTopicsToMirrorRequest, > I > >see a list of topics but each of them has id, name and corresponding > >mirror. So it's unclear how the addTopicsToMirror is defined. > > The addTopicsToMirror signature uses Map<String, String> > topicToMirrorName because a topic can be added to a specific mirror, > and the map allows adding multiple topics to potentially different > mirrors in a single call. In contrast, removeTopicsFromMirror takes > Set<String> topics because the mirror name is already known from the > topic's existing mirror.name config, so you only need to specify which > topics to remove. > > Anyway, the question of whether it's a valid use case to add different > topics to different mirrors in a single call is worth raising. If not, > a simpler signature like removeTopicsFromMirror with a mirrorName > parameter and a Set<String> topics would be more consistent. > > AddTopicsToMirrorResult addTopicsToMirror( > String mirrorName, > Set<String> topics, > AddTopicsToMirrorOptions options > ); > > >RemoveTopicsFromMirrorResult removeTopicsFromMirror(String mirrorName, > >Set<String> topics, RemoveTopicsFromMirrorOptions options); > >This method gets a mirrorName but if I look at the corresponding protocol > >message RemoveTopicsFromMirrorRequest, it says "Allows users to detach > >topics from their associated mirror" so the mirror is actually not > provided > >and it's exactly what I see in the JSON definition (only topics list with > >id and name). > > You're right, there's an inconsistency. The Admin API method has a > mirrorName parameter, but the underlying RemoveTopicsFromMirrorRequest > protocol message only carries topic id and name, with no mirror name > field. Since the topic already knows its mirror via the mirror.name > config, the mirror name in the protocol message is redundant and > correctly omitted. > > >Finally, regarding the protocol change: > >* ListMirrorsResponse I would add the clusterId in the JSON definition > >(it's related to my comments in the previous email when using the tool). > >* WriteMirrorStatesRequest has the following in the JSON which should not > >be part of it "{ "name": "RemovedTopics", "type": "[]string", "versions": > >"0+", "about": "The topic names to be removed." }" > > Good catch for the 2nd point, while the 1st point is now fixed. > > On Fri, Mar 6, 2026 at 2:42 PM Paolo Patierno <[email protected]> > wrote: > > > > Hi Fede, > > something more ... > > > > Is there any migration path for users who want to migrate from using > Mirror > > Maker 2 to the cluster mirroring? > > I mean, something like a tool useful to create a corresponding cluster > > mirroring configuration starting from a MM2 one. Nothing that runs the > > migration automatically but something that can be provided to the users > as > > output to be validated and put in place by them. > > > > The Admin Client is missing methods to pause and stop mirroring (but we > > have corresponding protocol messages). Is it on purpose? Any specific > > reasons? They would be important from an automatic operator perspective > use > > case. > > Also a method to provide the LastMirroredOffset from the source cluster > > could be useful for progress and tracking purposes. > > Finally, what about a method to get the mirror states? I don't think the > > describe method provides such information. > > In general, I think that the Admin Client section needs to cover in more > > details the new classes definition like CreateMirrorOptions, > > CreateMirrorResult, ... and so on for all the defined new methods. > > > > > AddTopicsToMirrorResult addTopicsToMirror(Map<String, String> > > topicToMirrorName, AddTopicsToMirrorOptions options); > > > > Isn't it missing the mirrorName (as you have in the > removeTopicsFromMirror > > counterpart)? > > What's the topicToMirrorName parameter if it's defined as a Map? The > method > > is also plural using "topics" so comparing to the removeTopicsFromMirror > > method, I would assume the parameter really is Set<String> topics? > > Comparing to the corresponding protocol message > AddTopicsToMirrorRequest, I > > see a list of topics but each of them has id, name and corresponding > > mirror. So it's unclear how the addTopicsToMirror is defined. > > > > > RemoveTopicsFromMirrorResult removeTopicsFromMirror(String mirrorName, > > Set<String> topics, RemoveTopicsFromMirrorOptions options); > > > > This method gets a mirrorName but if I look at the corresponding protocol > > message RemoveTopicsFromMirrorRequest, it says "Allows users to detach > > topics from their associated mirror" so the mirror is actually not > provided > > and it's exactly what I see in the JSON definition (only topics list with > > id and name). > > > > Finally, regarding the protocol change: > > > > * ListMirrorsResponse I would add the clusterId in the JSON definition > > (it's related to my comments in the previous email when using the tool). > > * WriteMirrorStatesRequest has the following in the JSON which should not > > be part of it "{ "name": "RemovedTopics", "type": "[]string", "versions": > > "0+", "about": "The topic names to be removed." }" > > > > Thanks, > > Paolo. > > > > On Fri, 6 Mar 2026 at 13:08, Paolo Patierno <[email protected]> > > wrote: > > > > > Hi Fede, > > > thank you for the proposal. I had a first pass with following thoughts > and > > > questions. > > > > > > > When the unclean.leader.election.enable is set to true, the broker > will > > > log a warning at every configuration synchronization period. > > > Be more explicit about what the warning says. > > > > > > > This topic ID is not used by other topics in the current cluster > > > In such a case, which should be very unlikely, what's going to happen? > > > Isn't it possible to mirror the topic? > > > > > > > To enable it, all cluster nodes (controllers and brokers) must > > > explicitly enable unstable API versions and unstable feature versions > in > > > all configuration files. After starting the cluster with a minimum > metadata > > > version, operators can dynamically enable the mirror version feature to > > > activate Cluster Mirroring. > > > AFAIU there is going to be a dedicated feature flag for it, right? If > yes > > > can we state it clearly also specifying the exact name (i.e. > mirror.version > > > or something similar)? > > > > > > When running the kafka-mirrors.sh tool to list the mirrors, other than > > > showing the SOURCE-BOOTSTRAP, it could be useful to have also the > clusterId > > > which, as a unique identifier, could be helpful in automated systems > using > > > the cluster mirroring. Of course, it would be important to have in the > > > ListMirrorsResponse as well as an additional field. > > > > > > What happens in case of Kafka downgrade from a version supporting > > > mirroring to an older one not supporting it. > > > The mirror won't be running but the topic configuration will still have > > > config parameters like mirror.name and so on, right? Are they just > > > ignored by the older Kafka version and the cluster will work without > issues? > > > > > > Thanks, > > > Paolo > > > > > > On Fri, 6 Mar 2026 at 10:43, Luke Chen <[email protected]> wrote: > > > > > >> Hi Andrew and all, > > >> > > >> About AS5, yes, I've created a sub-document > > >> < > > >> > https://cwiki.apache.org/confluence/display/KAFKA/Unclean+Leader+Election+in+Cluster+Mirroring > > >> >to > > >> explain the algorithm to support unclean leader election in cluster > > >> mirroring. > > >> Thanks for your comments, I'm inspired by that! :) > > >> > > >> About your idea, to store the owner of the leader epoch when > leadership > > >> change, I think it might not be needed because the most important > thing > > >> should be this: > > >> > you might find that both ends have declared a local epoch N, but > someone > > >> has to win. > > >> > > >> That is, as long as we have a way to declare who is the owner of > leader > > >> epoch N, then the 2 clusters can sync up successfully. > > >> And that's why I proposed to the "last mirrored leader epoch" > semantic in > > >> the sub-proposal > > >> < > > >> > https://cwiki.apache.org/confluence/display/KAFKA/Unclean+Leader+Election+in+Cluster+Mirroring > > >> >, > > >> which is a solution to draw a line between these 2 clusters to declare > > >> records beyond the "last mirrored leader epoch" N, it belongs to who. > I > > >> think this should work well, as long as all replicas in the cluster > can > > >> truncate the log correctly. > > >> > > >> What do you think? > > >> > > >> Any feedback is appreciated. > > >> > > >> Thank you, > > >> Luke > > >> > > >> On Fri, Mar 6, 2026 at 6:02 PM Andrew Schofield < > [email protected]> > > >> wrote: > > >> > > >> > Hi Fede, > > >> > Thanks for your response. > > >> > > > >> > AS1: Thanks for the clarification. > > >> > > > >> > AS2: I expect you'll include a version bump of > AlterShareGroupOffsets in > > >> > this KIP, but that's a small matter compared with the rest of the > > >> protocol > > >> > changes. > > >> > > > >> > AS3: OK. > > >> > > > >> > AS4: Thanks for the details. My only comment is that it might be a > bit > > >> > laborious when you want to failover all topics. I suggest adding > > >> > `--all-topics` so you could do: > > >> > > > >> > $ bin/kafka-mirror.sh --bootstrap-server :9094 --remove --all-topics > > >> > --mirror my-mirror > > >> > > > >> > AS5: Thanks for the response. I understand there are good reasons > for > > >> the > > >> > way epochs are handled in the KIP. I see that there is a > sub-document > > >> for > > >> > the KIP about unclean leader election. I'll spend some time > reviewing > > >> that. > > >> > > > >> > Thanks, > > >> > Andrew > > >> > > > >> > On 2026/02/18 13:27:07 Federico Valeri wrote: > > >> > > Hi Andrew, thanks for the review. > > >> > > > > >> > > Let me try to answer your questions and then other authors can > join > > >> > > the discussion. > > >> > > > > >> > > AS1 > > >> > > ------ > > >> > > > > >> > > Destination topics are created with the same topic IDs using the > > >> > > extended CreateTopics API. Then, data is replicated starting from > > >> > > offset 0 with byte-for-byte batch copying, so destination offsets > > >> > > always match source offsets. When failing over, we record the last > > >> > > mirrored offset (LMO) in the destination cluster. When failing > back, > > >> > > the LMO is used for truncating and then start mirroring the delta, > > >> > > otherwise we start mirroring from scratch by truncating to zero. > > >> > > > > >> > > Retention: If the mirror leader attempts to fetch an offset that > is > > >> > > below the current log start offset of the source leader (e.g. > fetching > > >> > > offset 50 when log start offset is 100), the source broker > returns an > > >> > > OffsetOutOfRangeException that the mirror leader handles by > truncating > > >> > > to the source's current log start offset and resuming fetching > from > > >> > > that point. Compaction: The mirror leader replicates these > compacted > > >> > > log segments exactly as they exist in the source cluster, > maintaining > > >> > > the same offset assignments and gaps. > > >> > > > > >> > > Do you have any specific corner case in mind? > > >> > > > > >> > > AS2 > > >> > > ------ > > >> > > > > >> > > Agreed. The current AlterShareGroupOffsetsRequest (v0) only > includes > > >> > > PartitionIndex and StartOffset with no epoch field. When mirroring > > >> > > share group offsets across clusters, the epoch is needed to > ensure the > > >> > > offset alteration targets the correct leader generation. > > >> > > > > >> > > AS3 > > >> > > ------ > > >> > > > > >> > > Right, the enum is now fixed. Yes, we will parse from the right > and > > >> > > apply the same naming rules used for topic name ;) > > >> > > > > >> > > AS4 > > >> > > ------- > > >> > > > > >> > > Agreed. I'll try to improve those paragraphs because they are > crucial > > >> > > from an operational point of view. > > >> > > > > >> > > Let me shortly explain how it is supposed to work: > > >> > > > > >> > > 9091 (source) -----> 9094 (destination) > > >> > > > > >> > > The single operation that allows an operator to switch all topics > at > > >> > > once in case of disaster is the following: > > >> > > > > >> > > bin/kafka-mirror.sh --bootstrap-server :9094 --remove --topic .* > > >> > > --mirror my-mirror > > >> > > > > >> > > 9091 (source) --x--> 9094 (destination) > > >> > > > > >> > > After that, all mirror topics become detached from the source > cluster > > >> > > and start accepting writes (the two cluster are allowed to > diverge). > > >> > > > > >> > > When the source cluster is back, the operator can failback by > creating > > >> > > a mirror with the same name on the source cluster (new > destination): > > >> > > > > >> > > echo "bootstrap.servers=localhost:9094" > > /tmp/my-mirror.properties > > >> > > bin/kafka-mirrors.sh --bootstrap-server :9091 --create --mirror > > >> > > my-mirror --mirror-config /tmp/my-mirror.properties > > >> > > bin/kafka-mirrors.sh --bootstrap-server :"9091 --add --topic .* > > >> > > --mirror my-mirror > > >> > > > > >> > > 9091 (destination) <----- 9094 (source) > > >> > > > > >> > > AS5 > > >> > > ------- > > >> > > > > >> > > This is the core of our design and we reached that empirically by > > >> > > trying out different options. We didn't want to change local > > >> > > replication, and this is something you need to do when preserving > the > > >> > > source leader epoch. The current design is simple and keeps the > epoch > > >> > > domains entirely separate. Destination cluster is in charge of the > > >> > > leader epoch for its own log. The source epoch is only used > during the > > >> > > fetch protocol to validate responses and detect divergence. > > >> > > > > >> > > The polarity idea of tracking whether an epoch bump originated > from > > >> > > replication vs. local leadership change is interesting, but adds > > >> > > significant complexity and coupling between source and destination > > >> > > epochs. Could you clarify what specific scenario polarity tracking > > >> > > would address that the current separation doesn't handle? One > case we > > >> > > don't support is unclean leader election reconciliation across > > >> > > clusters, is that the gap you're aiming at? > > >> > > > > >> > > I tried to rewrite the unclean leader election paragraph in the > > >> > > rejected alternatives to be easier to digest. Let me know if it > works. > > >> > > > > >> > > On Tue, Feb 17, 2026 at 2:57 PM Andrew Schofield > > >> > > <[email protected]> wrote: > > >> > > > > > >> > > > Hi Fede and friends, > > >> > > > Thanks for the KIP. > > >> > > > > > >> > > > It’s a comprehensive design, easy to read and has clearly taken > a > > >> lot > > >> > of work. > > >> > > > The principle of integrating mirroring into the brokers makes > total > > >> > sense to me. > > >> > > > > > >> > > > The main comment I have is that mirroring like this cannot > handle > > >> > situations > > >> > > > in which multiple topic-partitions are logically related, such > as > > >> > transactions, > > >> > > > with total fidelity. Each topic-partition is being replicated > as a > > >> > separate entity. > > >> > > > The KIP calls this out and describes the behaviour thoroughly. > > >> > > > > > >> > > > A few initial comments. > > >> > > > > > >> > > > AS1) Is it true that offsets are always preserved by this KIP? I > > >> > *think* so but > > >> > > > not totally sure that it’s true in all cases. It would > certainly be > > >> > nice. > > >> > > > > > >> > > > AS2) I think you need to add epoch information to > > >> > AlterShareGroupOffsetsRequest. > > >> > > > It really should already be there in hindsight, but I think > this KIP > > >> > requires it. > > >> > > > > > >> > > > AS3) The CoordinatorType enum for MIRROR will need to be 3 > because 2 > > >> > is SHARE. > > >> > > > I’m sure you’ll parse the keys from the right ;) > > >> > > > > > >> > > > AS4) The procedure for achieving a failover could be clearer. > Let’s > > >> > say that I am > > >> > > > using cluster mirroring to achieve DR replication. My source > cluster > > >> > is utterly lost > > >> > > > due to a disaster. What’s the single operation that I perform to > > >> > switch all of the > > >> > > > topics mirrored from the lost source cluster to become the > active > > >> > topics? > > >> > > > Similarly for failback. > > >> > > > > > >> > > > AS5) The only piece that I’m really unsure of is the epoch > > >> management. > > >> > I would > > >> > > > have thought that the cluster which currently has the writable > > >> > topic-partition > > >> > > > would be in charge of the leader epoch and it would not be > > >> necessary to > > >> > > > perform all of the gymnastics described in the section on epoch > > >> > rewriting. > > >> > > > I have read the Rejected Alternatives section too, but I don’t > fully > > >> > grasp > > >> > > > why it was necessary to reject it. > > >> > > > > > >> > > > I wonder if we could store the “polarity” of an epoch, > essentially > > >> > whether the > > >> > > > epoch bump was observed by replication from a source cluster, or > > >> > whether > > >> > > > it was bumped by a local leadership change when the topic is > locally > > >> > writable. > > >> > > > When a topic-partition switches from read-only to writable, we > > >> should > > >> > definitely > > >> > > > bump the epoch, and we could record the fact that it was a local > > >> epoch. > > >> > > > When connectivity is re-established, you might find that both > ends > > >> have > > >> > > > declared a local epoch N, but someone has to win. > > >> > > > > > >> > > > Thanks, > > >> > > > Andrew > > >> > > > > > >> > > > > On 14 Feb 2026, at 07:17, Federico Valeri < > [email protected]> > > >> > wrote: > > >> > > > > > > >> > > > > Hi, we would like to start a discussion thread about KIP-1279: > > >> > Cluster > > >> > > > > Mirroring. > > >> > > > > > > >> > > > > Cluster Mirroring is a new Kafka feature that enables native, > > >> > > > > broker-level topic replication across clusters. Unlike > > >> MirrorMaker 2 > > >> > > > > (which runs as an external Connect-based tool), Cluster > Mirroring > > >> is > > >> > > > > built into the broker itself, allowing tighter integration > with > > >> the > > >> > > > > controller, coordinator, and partition lifecycle. > > >> > > > > > > >> > > > > > > >> > > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1279%3A+Cluster+Mirroring > > >> > > > > > > >> > > > > There are a few missing bits, but most of the design is > there, so > > >> we > > >> > > > > think it is the right time to involve the community and get > > >> feedback. > > >> > > > > Please help validating our approach. > > >> > > > > > > >> > > > > Thanks > > >> > > > > Fede > > >> > > > > > >> > > > > >> > > > >> > > > > > > > > > -- > > > Paolo Patierno > > > > > > *Senior Principal Software Engineer @ IBM**CNCF Ambassador* > > > > > > Twitter : @ppatierno <http://twitter.com/ppatierno> > > > Linkedin : paolopatierno <http://it.linkedin.com/in/paolopatierno> > > > GitHub : ppatierno <https://github.com/ppatierno> > > > > > > > > > -- > > Paolo Patierno > > > > *Senior Principal Software Engineer @ IBM**CNCF Ambassador* > > > > Twitter : @ppatierno <http://twitter.com/ppatierno> > > Linkedin : paolopatierno <http://it.linkedin.com/in/paolopatierno> > > GitHub : ppatierno <https://github.com/ppatierno> > -- Paolo Patierno *Senior Principal Software Engineer @ IBM**CNCF Ambassador* Twitter : @ppatierno <http://twitter.com/ppatierno> Linkedin : paolopatierno <http://it.linkedin.com/in/paolopatierno> GitHub : ppatierno <https://github.com/ppatierno>
