Hi Andrew, thanks for your careful review. AS28: We reworked that section and added some diagrams to illustrate problem and solutions. Let us know if it is better.
AS30: We updated that table, but we are not sure about the problem you mention. Can you clarify and send us the sequence of actions that would lead to that problem? AS32: You are right. Replaced StartMirrorTopicsRequestData.TopicData with a Controller.MirrorTopicMetadata record. Also renamed TopicData to TopicMetadata in all schemas. AS33: Good idea. Added per-topic error messages in the RPC responses you mentioned. AS29, AS31, AS34 (both of them), AS35, AS36: These small issues are all fixed now. On Tue, Jun 9, 2026 at 9:58 PM Andrew Schofield <[email protected]> wrote: > > Hi Fede and friends, > This KIP is shaping up nicely. The change to the way that state management > for the mirrors is done is much more elegant. The removal of the PID mapping > is also a nice improvement. > > Inevitably, I have some more comments. > > AS28: I understand KAFKA-18723 and how leader epochs work in that case, but > the enhancements that you made to the Fetch RPC and how they work escapes me. > Please could we have some diagrams to illustrate the various epochs? > > AS29: I like the addition of share group offset syncing. You need > AlterShareGroupOffsets and DescribeShareGroupOffsets in the tables of RPCs > and permissions. > > AS30: It is certainly true that EOS is not supported. Using cluster mirroring > on transactional data does risk breaking transactional atomicity for > transactions in flight at the time that mirroring was stopped. That's a known > limitation, and it's an effect of per-topic async replication. Writing ABORT > markers when stop mirroring is triggered is interesting. I just wonder > whether it's truly safe. > > Could you enhance the tables of records in the Exactly-Once Semantics section > to include leader epochs? I think that the leader epoch for the ABORT markers > will have been bumped as the mirroring stopped and the partitions became > writable. However, I worry that someone might switch the mirroring back in > the other direction before the transaction which was rudely partially aborted > has completed on the original source cluster, and that the effect of the > transaction coordinator completing the transaction, potentially writing > duplicate transaction markers after the ABORT markers which will have been > replicated back again. > > This is probably only a realistic possibility when KIP-939 is present, but it > is accepted and I'm sure it will land soon enough. I would say this KIP > should be prepared for the existence of long-running transactions so we don't > dig a hole to fall into later. > > AS31: In the section on the Admin Client, the description for > Admin.createClusterMirror(String, Map, CreateClusterMirrorOptions) is > duplicated. > > AS32: The use of StartMirrorTopicsRequestData.TopicData in an external > interface looks odd. > > AS33: I suggest per-topic error messages in the RPC responses such as > PauseMirrorTopics so you can pass back more context, such as the state if the > request failed due to a state mismatch. > > AS34: There is an unnamed field in DescribeClusterMirrorsResponse. The same > in LastMirrorEpochsValue. > > AS34: Some of the fields in BumpLeaderEpochsRequest start with lower-case > letters. Also, there are quite a few fields in the RPCs in general that > should be version "0+" which are "0". > > AS35: Sometimes the partition states in the RPCs are string, but sometimes > int8. > > AS36: The base level for share group support is probably 4.1. All of the RPCs > were present in 4.1, although they were only enabled by default in 4.2. > > Thanks, > Andrew > > On 2026/06/04 11:01:36 Federico Valeri wrote: > > Hello, additional updates and replies: > > > > VK12: Consumer data loss risk is a good point. We updated the "Group > > Offsets" paragraph. > > > > Some of you raised the point that using mirror.name suffixes for pause > > and stop operations is a bit inelegant and we agreed. Now we propose a > > better approach: > > > > We replaced the topic config based approach (mirror.name and > > .stopped/.paused suffix conventions) with a first class metadata > > record for tracking mirror topic state changes. A new > > MirrorTopicStateChangeRecord is added to the metadata log with three > > fields: TopicId (uuid), MirrorName (string), and DesiredState (int8, > > where 0 = start mirroring, 1 = stop, 2 = pause). When a user calls > > startMirrorTopics, stopMirrorTopics, or pauseMirrorTopics, the > > controller writes this record to the metadata log instead of > > manipulating config suffixes. Brokers receive the record via metadata > > updates and react accordingly, triggering partition level state > > transitions through the existing MirrorPartitionState state machine. > > More details in the updated KIP. > > > > Let us know what you think. > > > > Thanks > > Fede > > > > > > On Tue, Jun 2, 2026 at 8:50 AM Luke Chen <[email protected]> wrote: > > > > > > Hi Viquar, > > > > > > Thanks for the comment. > > > > > > Regarding VK12, thanks for raising the issue. > > > We didn't think about that. > > > I agree that compared with data re-processing, it's worse to have data > > > loss. > > > The proposed formula makes sense to me. > > > *syncedOffset = max(destinationLogStartOffset, min(sourceCommitted, > > > destinationLEO))* > > > > > > Let us have some discussion internally and then update the KIP and reply > > > to > > > the thread. > > > > > > Thanks, > > > Luke > > > > > > On Wed, May 20, 2026 at 2:39 AM Federico Valeri <[email protected]> > > > wrote: > > > > > > > Hi Rajini, > > > > > > > > RS14: Done. Missed that, sorry. > > > > > > > > Thanks again. > > > > > > > > On Tue, May 19, 2026 at 7:28 PM Rajini Sivaram <[email protected]> > > > > wrote: > > > > > > > > > > Hi Federico, > > > > > > > > > > Thanks for the updates. Just one minor point, apart from that, looks > > > > good. > > > > > > > > > > RS14: KIP still shows "bin/kafka-configs.sh --bootstrap-server :9094 > > > > > --entity-type mirrors". > > > > > Will be good to change that to `--entity-type cluster-mirrors`. > > > > > > > > > > Thanks, > > > > > > > > > > Rajini > > > > > > > > > > > > > > > On Tue, May 19, 2026 at 5:28 PM vaquar khan <[email protected]> > > > > wrote: > > > > > > > > > > > Hi Federico and team, > > > > > > > > > > > > Thank you for your detailed response on May 11, 2026. I greatly > > > > appreciate > > > > > > the collaborative effort over the past few months to harden the > > > > KIP-1279 > > > > > > architecture. > > > > > > > > > > > > * 1. Resolved Architectural & Stability Vulnerabilities* > > > > > > > > > > > > I am pleased to confirm that the KIP has successfully integrated > > > > > > fixes > > > > for > > > > > > the critical vulnerabilities I flagged, thereby protecting the > > > > cluster's > > > > > > state machine, memory bounds, and control plane. Consider the > > > > > > following > > > > > > items fully resolved on my end: > > > > > > > > > > > > - > > > > > > > > > > > > *VK4 / VK8 (Negative PID Bug & State Machine Failure):* I > > > > > > previously > > > > > > identified that the proposed -(sourceProducerId + 2) PID > > > > > > rewriting > > > > > > formula would fundamentally break hasProducerId() in > > > > > > AbstractRecordBatch.java, causing the broker to bypass > > > > > > ProducerStateManager.update() and default the Last Stable Offset > > > > (LSO) > > > > > > to the High Watermark. I am glad to see the KIP authors abandoned > > > > PID > > > > > > mapping entirely and adopted the deterministic MIRROR_PID_RESET > > > > control > > > > > > record barrier. This perfectly protects Kafka's exactly-once > > > > semantics. > > > > > > > > > > > > - > > > > > > > > > > > > *VK11 (TransactionIndex Rebuild Ambiguity):* Thank you for > > > > officially > > > > > > confirming that the TransactionIndex is strictly rebuilt locally > > > > during > > > > > > log appends rather than copied byte-for-byte. This resolves my > > > > concern > > > > > > regarding PID mismatches inducing consumer read anomalies. > > > > > > - > > > > > > > > > > > > *VK1 (Thundering Herd / OOM Heap Allocation):* Your clarification > > > > that > > > > > > fetcher threads multiplex partitions meaning the memory > > > > > > footprint is > > > > > > strictly bounded by num_fetcher_threads * response_max_bytes > > > > > > rather > > > > than > > > > > > a concurrent 1MB buffer per partition fully resolves my concern > > > > > > regarding > > > > > > 50GB broker-wide OOM spikes during mass partition wake-ups. > > > > > > - > > > > > > > > > > > > *VK3 (Control Plane Hotspots):* I had flagged the severe risk of > > > > > > metadata saturation on a single broker during a "link flap" > > > > > > event. > > > > Your > > > > > > confirmation that the __mirror_state topic utilizes a compound > > > > > > hash > > > > > > of (mirrorName, > > > > > > topicId, partition number) mathematically ensures that the > > > > > > 50,000+ > > > > state > > > > > > transitions will be safely distributed across the cluster, > > > > neutralizing > > > > > > the > > > > > > single-node hotspot risk. > > > > > > > > > > > > 2. Outstanding Critical Blocker: VK12 (Offset Sync Data Loss) > > > > > > > > > > > > Regarding VK12, there is a fundamental misunderstanding in your > > > > previous > > > > > > reply. You stated: *"The scenario described can only occur if > > > > > > offsets > > > > are > > > > > > force-written to an active group, which the design prevents."* > > > > > > > > > > > > My concern has absolutely nothing to do with overwriting active > > > > groups. My > > > > > > concern applies strictly to the normal synchronization of > > > > > > inactive/dead > > > > > > groups, and is based directly on the race condition currently > > > > documented in > > > > > > the official KIP-1279 text: > > > > > > > > > > > > *"During offset synchronization, the committed offset in the > > > > destination > > > > > > cluster may temporarily exceed the current log end offset (LEO) of > > > > > > the > > > > > > mirror topic... consumers attempting to resume from offset 100 will > > > > receive > > > > > > an OffsetOutOfRangeException. To handle this gracefully, consumers > > > > should > > > > > > configure auto.offset.reset=latest..."* > > > > > > > > > > > > If a failover happens during this documented divergence window, a > > > > > > reconnecting consumer will hit the OffsetOutOfRangeException. If the > > > > > > downstream consumer follows the KIP's official advice and relies on > > > > > > auto.offset.reset=latest, the consumer will jump to the absolute > > > > > > newest > > > > > > offset on the partition. *This completely skips any newly produced > > > > records > > > > > > that arrived between the failover and the consumer reconnecting, > > > > resulting > > > > > > in silent, unrecoverable data loss for the downstream application.* > > > > > > *Proposed Resolution: Double-Clamped Offset Safety Invariant* > > > > > > Instead > > > > of > > > > > > requiring consumers to use auto.offset.reset=latest and endorsing a > > > > known > > > > > > data loss vector, I propose that the ClusterMirrorCoordinator > > > > > > enforce a > > > > > > pre-persist offset validation invariant during offset > > > > > > synchronization. > > > > > > Before any translated offset is committed to the destination > > > > > > cluster, > > > > the > > > > > > coordinator must apply a double-clamp: > > > > > > > > > > > > *syncedOffset = max(destinationLSO, min(sourceCommitted, > > > > destinationLEO))* > > > > > > > > > > > > Where destinationLSO is the Log Start Offset (earliest readable > > > > position > > > > > > post-retention) and destinationLEO is the Log End Offset (latest > > > > replicated > > > > > > position) of the destination partition. > > > > > > > > > > > > This guarantees that every persisted offset falls within the > > > > > > physically > > > > > > valid range eliminating both the OffsetOutOfRangeException caused > > > > > > by > > > > > > exceeding the LEO during replication lag, and the expired-offset > > > > > > rewind > > > > > > caused by falling below the LSO due to destination retention > > > > > > policies. > > > > > > > > > > > > The worst-case trade-off under this invariant is bounded > > > > > > re-processing > > > > > > proportional to the replication lag at failover time not total > > > > partition > > > > > > depth which is perfectly consistent with Kafka's documented > > > > at-least-once > > > > > > delivery guarantees. Broad enterprise production evidence from > > > > large-scale > > > > > > cross-cluster failovers confirms that state checks alone (preventing > > > > active > > > > > > group overwrites) are insufficient; strict offset bounds clamping is > > > > > > required to achieve enterprise-grade data integrity. > > > > > > > > > > > > I look forward to your thoughts on implementing this final offset > > > > capping > > > > > > logic. Once VK12 is patched, I believe this architecture will be > > > > > > exceptionally robust and ready for enterprise deployment. > > > > > > > > > > > > Best Regards, > > > > > > > > > > > > Viquar Khan > > > > > > > > > > > > > > > > > > On Mon, 18 May 2026 at 14:23, Rajini Sivaram > > > > > > <[email protected]> > > > > > > wrote: > > > > > > > > > > > > > Hi Federico, > > > > > > > > > > > > > > Thanks for the updates! The KIP is looking good. A few more small > > > > > > comments. > > > > > > > > > > > > > > > > > > > > > RS13: A couple of places still refer to `kafka-mirror.sh` like > > > > > > > under > > > > > > > `Failover Process`. Can we change them to > > > > `*kafka-cluster-mirrors.sh*`? > > > > > > > > > > > > > > RS14: Should we change `--entity-type mirrors` for kafka-configs > > > > > > > to > > > > be ` > > > > > > > --entity-type *cluster-mirrors*` to be consistent? Also, > > > > > > > CLUSTER_MIRROR((byte) > > > > > > > 64, "mirror"); could be `*cluster-mirror*`? > > > > > > > > > > > > > > RS15: It may be useful to rename `mirror.topic.num.partitions` > > > > > > > and ` > > > > > > > mirror.topic.replication.factor` since they are very similar to ` > > > > > > > mirror.topic.properties.exclude`, but the `mirror.topic` prefix > > > > refers to > > > > > > > different topics (the internal topic for the first two and actual > > > > mirror > > > > > > > topics for the other one). > > > > > > > > > > > > > > RS16: ACL Sync: KIP says "Deletes ACLs that exist in destination > > > > > > > but > > > > not > > > > > > in > > > > > > > source using DeleteAcls request." > > > > > > > What happens if someone creates an ACL on the destination to deny > > > > > > > User:Alice access to all topics? > > > > > > > > > > > > > > 1. If that ACL also existed on the source cluster and then it > > > > > > > was > > > > > > > removed, will it get removed from the destination? > > > > > > > 2. If that ACL never existed on the source cluster, will it get > > > > > > removed > > > > > > > from the destination? > > > > > > > > > > > > > > RS17: The table in the Source ACLs section says: > > > > "DescribeClusterMirrors > > > > > > > MC Read Cluster Log truncation" > > > > > > > Should that be `ClusterMirror:Read` instead of `Cluster:Read`? > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > Rajini > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, May 15, 2026 at 8:41 AM Federico Valeri < > > > > [email protected]> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Rajini, we finally addressed the API and tool naming > > > > refactoring as > > > > > > > > you suggested in RS6. Please take a look when you have time. > > > > Thanks. > > > > > > > > > > > > > > > > > > > > > > > > On Mon, May 11, 2026 at 6:17 PM Federico Valeri < > > > > [email protected]> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > Hello all, I want to highlight a couple of new paragraphs: > > > > > > > > > > > > > > > > > > 1. Leader Epoch Invariant: Cluster mirroring enforces the > > > > invariant > > > > > > > > > that the destination leader epoch must always be greater than > > > > > > > > > or > > > > > > equal > > > > > > > > > to the source leader epoch (DLE>=SLE). Without this, consumers > > > > on the > > > > > > > > > destination cluster can get stuck in an infinite metadata > > > > > > > > > refresh > > > > > > loop > > > > > > > > > when they encounter committed offsets carrying source epochs > > > > higher > > > > > > > > > than the local epoch. The invariant is maintained through > > > > > > > > > three > > > > > > > > > mechanisms: reactive bumping (epoch fencing triggered when > > > > > > > > > SLE > > > > > DLE > > > > > > > > > during fetch), proactive bumping (scheduled when SLE > > > > > > > > > approaches > > > > DLE > > > > > > > > > within a threshold), and periodic bumping (checked during > > > > coordinator > > > > > > > > > metadata sync). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620973#KIP1279:ClusterMirroring-LeaderEpochInvariant > > > > > > > > > > > > > > > > > > 2. Group Offsets: The coordinator periodically syncs consumer > > > > > > > > > and > > > > > > > > > share group offsets from the source cluster to the destination > > > > for > > > > > > all > > > > > > > > > mirrored topics. Groups are filtered by configurable > > > > include/exclude > > > > > > > > > patterns, and offsets are only synced for groups that are not > > > > > > > > > currently active on the destination cluster, preventing > > > > overwrites of > > > > > > > > > local consumer progress. Because source and destination share > > > > > > > > > the > > > > > > same > > > > > > > > > topic offsets (no offset translation), synced offsets can be > > > > > > > > > used > > > > > > > > > directly without mapping. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620973#KIP1279:ClusterMirroring-GroupOffsets > > > > > > > > > > > > > > > > > > These new paragraphs directly address some of your questions, > > > > but let > > > > > > > > > me list them here: > > > > > > > > > > > > > > > > > > JR2: Yes, we removed the incorrect phrase and added more > > > > > > > > > details > > > > to > > > > > > > > > the paragraph. > > > > > > > > > > > > > > > > > > JR4: When source cluster topic has tiered storage enabled, CM > > > > works > > > > > > by > > > > > > > > > mirroring remote and local log into destination cluster. When > > > > > > > > > destination cluster topic has tiered storage enabled, CM > > > > > > > > > fails in > > > > > > > > > PREPARING state because the LME may be in remote storage, but > > > > works > > > > > > > > > fine if already MIRRORING because no truncation is needed. > > > > > > > > > > > > > > > > > > JR11: See "Leader Epoch Invariant" paragraph mentioned above. > > > > > > > > > > > > > > > > > > JR13: We can't support stateful Streams application because > > > > > > > > > asynchronous replication cannot preserve the transactional > > > > boundaries > > > > > > > > > between input offset commits, state store mutations written to > > > > > > > > > changelog topics, and intermediate records written to > > > > > > > > > repartition > > > > > > > > > topics. The synchronous extension of this design will be able > > > > > > > > > to > > > > > > > > > support them. Existing Features Integration paragraph updated. > > > > > > > > > > > > > > > > > > JR18: See "Group Offsets" paragraph mentioned above. > > > > > > > > > > > > > > > > > > IY1: See "Group Offsets" paragraph mentioned above. > > > > > > > > > > > > > > > > > > Thanks > > > > > > > > > Fede > > > > > > > > > > > > > > > > > > On Mon, May 11, 2026 at 6:08 PM Federico Valeri < > > > > > > [email protected]> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > Hi Vaquar, > > > > > > > > > > > > > > > > > > > > VK4/VK8: We don't do PID mapping anymore. The KIP was > > > > > > > > > > updated > > > > some > > > > > > > > > > time ago with the new approach based on the new PID reset > > > > control > > > > > > > > > > record. > > > > > > > > > > > > > > > > > > > > VK11: The transaction index is always built locally during > > > > > > > > > > log > > > > > > > append, > > > > > > > > > > never copied. > > > > > > > > > > > > > > > > > > > > VK1: The 50,000 * 1MB = 50GB calculation misunderstands the > > > > fetch > > > > > > > > > > model. Fetcher threads don't allocate one buffer per > > > > > > > > > > partition. > > > > > > > Actual > > > > > > > > > > peak memory is roughly num_fetcher_threads * > > > > response_max_bytes, > > > > > > not > > > > > > > > > > num_partitions * partition_max_bytes. With 1 fetcher thread > > > > and the > > > > > > > > > > default response max, the memory footprint is modest > > > > regardless of > > > > > > > > > > partition count. We are leveraging the same proven pattern > > > > used by > > > > > > > the > > > > > > > > > > internal replication. > > > > > > > > > > > > > > > > > > > > VK3: The __mirror_state topic uses hash-based partitioning > > > > based on > > > > > > > > > > mirrorName, topicId and partition number. With the > > > > > > > > > > production > > > > > > default > > > > > > > > > > of 50 partitions, 50,000 partition transitions distribute > > > > across > > > > > > ~50 > > > > > > > > > > partition leaders on different brokers, not a single broker. > > > > This > > > > > > is > > > > > > > > > > the same proven pattern as __consumer_offsets, which handles > > > > > > millions > > > > > > > > > > of commits. > > > > > > > > > > > > > > > > > > > > VK12: The scenario described can only occur if offsets are > > > > > > > > > > force-written to an active group, which the design prevents. > > > > > > > > > > > > > > > > > > > > Cheers > > > > > > > > > > Fede > > > > > > > > > > > > > > > > > > > > > > > > > > >
