Hi Andrew, thanks for your careful review.

AS28: We reworked that section and added some diagrams to illustrate
problem and solutions. Let us know if it is better.

AS30: We updated that table, but we are not sure about the problem you
mention. Can you clarify and send us the sequence of actions that
would lead to that problem?

AS32: You are right. Replaced StartMirrorTopicsRequestData.TopicData
with a Controller.MirrorTopicMetadata record. Also renamed TopicData
to TopicMetadata in all schemas.

AS33: Good idea. Added per-topic error messages in the RPC responses
you mentioned.

AS29, AS31, AS34 (both of them), AS35, AS36: These small issues are
all fixed now.




On Tue, Jun 9, 2026 at 9:58 PM Andrew Schofield <[email protected]> wrote:
>
> Hi Fede and friends,
> This KIP is shaping up nicely. The change to the way that state management 
> for the mirrors is done is much more elegant. The removal of the PID mapping 
> is also a nice improvement.
>
> Inevitably, I have some more comments.
>
> AS28: I understand KAFKA-18723 and how leader epochs work in that case, but 
> the enhancements that you made to the Fetch RPC and how they work escapes me. 
> Please could we have some diagrams to illustrate the various epochs?
>
> AS29: I like the addition of share group offset syncing. You need 
> AlterShareGroupOffsets and DescribeShareGroupOffsets in the tables of RPCs 
> and permissions.
>
> AS30: It is certainly true that EOS is not supported. Using cluster mirroring 
> on transactional data does risk breaking transactional atomicity for 
> transactions in flight at the time that mirroring was stopped. That's a known 
> limitation, and it's an effect of per-topic async replication. Writing ABORT 
> markers when stop mirroring is triggered is interesting. I just wonder 
> whether it's truly safe.
>
> Could you enhance the tables of records in the Exactly-Once Semantics section 
> to include leader epochs? I think that the leader epoch for the ABORT markers 
> will have been bumped as the mirroring stopped and the partitions became 
> writable. However, I worry that someone might switch the mirroring back in 
> the other direction before the transaction which was rudely partially aborted 
> has completed on the original source cluster, and that the effect of the 
> transaction coordinator completing the transaction, potentially writing 
> duplicate transaction markers after the ABORT markers which will have been 
> replicated back again.
>
> This is probably only a realistic possibility when KIP-939 is present, but it 
> is accepted and I'm sure it will land soon enough. I would say this KIP 
> should be prepared for the existence of long-running transactions so we don't 
> dig a hole to fall into later.
>
> AS31: In the section on the Admin Client, the description for 
> Admin.createClusterMirror(String, Map, CreateClusterMirrorOptions) is 
> duplicated.
>
> AS32: The use of StartMirrorTopicsRequestData.TopicData in an external 
> interface looks odd.
>
> AS33: I suggest per-topic error messages in the RPC responses such as 
> PauseMirrorTopics so you can pass back more context, such as the state if the 
> request failed due to a state mismatch.
>
> AS34: There is an unnamed field in DescribeClusterMirrorsResponse. The same 
> in LastMirrorEpochsValue.
>
> AS34: Some of the fields in BumpLeaderEpochsRequest start with lower-case 
> letters. Also, there are quite a few fields in the RPCs in general that 
> should be version "0+" which are "0".
>
> AS35: Sometimes the partition states in the RPCs are string, but sometimes 
> int8.
>
> AS36: The base level for share group support is probably 4.1. All of the RPCs 
> were present in 4.1, although they were only enabled by default in 4.2.
>
> Thanks,
> Andrew
>
> On 2026/06/04 11:01:36 Federico Valeri wrote:
> > Hello, additional updates and replies:
> >
> > VK12: Consumer data loss risk is a good point. We updated the "Group
> > Offsets" paragraph.
> >
> > Some of you raised the point that using mirror.name suffixes for pause
> > and stop operations is a bit inelegant and we agreed. Now we propose a
> > better approach:
> >
> > We replaced the topic config based approach (mirror.name and
> > .stopped/.paused suffix conventions) with a first class metadata
> > record for tracking mirror topic state changes. A new
> > MirrorTopicStateChangeRecord is added to the metadata log with three
> > fields: TopicId (uuid), MirrorName (string), and DesiredState (int8,
> > where 0 = start mirroring, 1 = stop, 2 = pause). When a user calls
> > startMirrorTopics, stopMirrorTopics, or pauseMirrorTopics, the
> > controller writes this record to the metadata log instead of
> > manipulating config suffixes. Brokers receive the record via metadata
> > updates and react accordingly, triggering partition level state
> > transitions through the existing MirrorPartitionState state machine.
> > More details in the updated KIP.
> >
> > Let us know what you think.
> >
> > Thanks
> > Fede
> >
> >
> > On Tue, Jun 2, 2026 at 8:50 AM Luke Chen <[email protected]> wrote:
> > >
> > > Hi Viquar,
> > >
> > > Thanks for the comment.
> > >
> > > Regarding VK12, thanks for raising the issue.
> > > We didn't think about that.
> > > I agree that compared with data re-processing, it's worse to have data 
> > > loss.
> > > The proposed formula makes sense to me.
> > > *syncedOffset = max(destinationLogStartOffset, min(sourceCommitted,
> > > destinationLEO))*
> > >
> > > Let us have some discussion internally and then update the KIP and reply 
> > > to
> > > the thread.
> > >
> > > Thanks,
> > > Luke
> > >
> > > On Wed, May 20, 2026 at 2:39 AM Federico Valeri <[email protected]>
> > > wrote:
> > >
> > > > Hi Rajini,
> > > >
> > > > RS14: Done. Missed that, sorry.
> > > >
> > > > Thanks again.
> > > >
> > > > On Tue, May 19, 2026 at 7:28 PM Rajini Sivaram <[email protected]>
> > > > wrote:
> > > > >
> > > > > Hi Federico,
> > > > >
> > > > > Thanks for the updates. Just one minor point, apart from that, looks
> > > > good.
> > > > >
> > > > > RS14: KIP still shows "bin/kafka-configs.sh --bootstrap-server :9094
> > > > > --entity-type mirrors".
> > > > > Will be good to change that to `--entity-type cluster-mirrors`.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Rajini
> > > > >
> > > > >
> > > > > On Tue, May 19, 2026 at 5:28 PM vaquar khan <[email protected]>
> > > > wrote:
> > > > >
> > > > > > Hi Federico and team,
> > > > > >
> > > > > > Thank you for your detailed response on May 11, 2026. I greatly
> > > > appreciate
> > > > > > the collaborative effort over the past few months to harden the
> > > > KIP-1279
> > > > > > architecture.
> > > > > >
> > > > > > * 1. Resolved Architectural & Stability Vulnerabilities*
> > > > > >
> > > > > > I am pleased to confirm that the KIP has successfully integrated 
> > > > > > fixes
> > > > for
> > > > > > the critical vulnerabilities I flagged, thereby protecting the
> > > > cluster's
> > > > > > state machine, memory bounds, and control plane. Consider the 
> > > > > > following
> > > > > > items fully resolved on my end:
> > > > > >
> > > > > >    -
> > > > > >
> > > > > >    *VK4 / VK8 (Negative PID Bug & State Machine Failure):* I 
> > > > > > previously
> > > > > >    identified that the proposed -(sourceProducerId + 2) PID 
> > > > > > rewriting
> > > > > >    formula would fundamentally break hasProducerId() in
> > > > > >    AbstractRecordBatch.java, causing the broker to bypass
> > > > > >    ProducerStateManager.update() and default the Last Stable Offset
> > > > (LSO)
> > > > > >    to the High Watermark. I am glad to see the KIP authors abandoned
> > > > PID
> > > > > >    mapping entirely and adopted the deterministic MIRROR_PID_RESET
> > > > control
> > > > > >    record barrier. This perfectly protects Kafka's exactly-once
> > > > semantics.
> > > > > >
> > > > > >    -
> > > > > >
> > > > > >    *VK11 (TransactionIndex Rebuild Ambiguity):* Thank you for
> > > > officially
> > > > > >    confirming that the TransactionIndex is strictly rebuilt locally
> > > > during
> > > > > >    log appends rather than copied byte-for-byte. This resolves my
> > > > concern
> > > > > >    regarding PID mismatches inducing consumer read anomalies.
> > > > > >    -
> > > > > >
> > > > > >    *VK1 (Thundering Herd / OOM Heap Allocation):* Your clarification
> > > > that
> > > > > >    fetcher threads multiplex partitions meaning the memory 
> > > > > > footprint is
> > > > > >    strictly bounded by num_fetcher_threads * response_max_bytes 
> > > > > > rather
> > > > than
> > > > > >    a concurrent 1MB buffer per partition fully resolves my concern
> > > > > > regarding
> > > > > >    50GB broker-wide OOM spikes during mass partition wake-ups.
> > > > > >    -
> > > > > >
> > > > > >    *VK3 (Control Plane Hotspots):* I had flagged the severe risk of
> > > > > >    metadata saturation on a single broker during a "link flap" 
> > > > > > event.
> > > > Your
> > > > > >    confirmation that the __mirror_state topic utilizes a compound 
> > > > > > hash
> > > > > > of (mirrorName,
> > > > > >    topicId, partition number) mathematically ensures that the 
> > > > > > 50,000+
> > > > state
> > > > > >    transitions will be safely distributed across the cluster,
> > > > neutralizing
> > > > > > the
> > > > > >    single-node hotspot risk.
> > > > > >
> > > > > > 2. Outstanding Critical Blocker: VK12 (Offset Sync Data Loss)
> > > > > >
> > > > > > Regarding VK12, there is a fundamental misunderstanding in your
> > > > previous
> > > > > > reply. You stated: *"The scenario described can only occur if 
> > > > > > offsets
> > > > are
> > > > > > force-written to an active group, which the design prevents."*
> > > > > >
> > > > > > My concern has absolutely nothing to do with overwriting active
> > > > groups. My
> > > > > > concern applies strictly to the normal synchronization of 
> > > > > > inactive/dead
> > > > > > groups, and is based directly on the race condition currently
> > > > documented in
> > > > > > the official KIP-1279 text:
> > > > > >
> > > > > > *"During offset synchronization, the committed offset in the
> > > > destination
> > > > > > cluster may temporarily exceed the current log end offset (LEO) of 
> > > > > > the
> > > > > > mirror topic... consumers attempting to resume from offset 100 will
> > > > receive
> > > > > > an OffsetOutOfRangeException. To handle this gracefully, consumers
> > > > should
> > > > > > configure auto.offset.reset=latest..."*
> > > > > >
> > > > > > If a failover happens during this documented divergence window, a
> > > > > > reconnecting consumer will hit the OffsetOutOfRangeException. If the
> > > > > > downstream consumer follows the KIP's official advice and relies on
> > > > > > auto.offset.reset=latest, the consumer will jump to the absolute 
> > > > > > newest
> > > > > > offset on the partition. *This completely skips any newly produced
> > > > records
> > > > > > that arrived between the failover and the consumer reconnecting,
> > > > resulting
> > > > > > in silent, unrecoverable data loss for the downstream application.*
> > > > > > *Proposed Resolution: Double-Clamped Offset Safety Invariant* 
> > > > > > Instead
> > > > of
> > > > > > requiring consumers to use auto.offset.reset=latest and endorsing a
> > > > known
> > > > > > data loss vector, I propose that the ClusterMirrorCoordinator 
> > > > > > enforce a
> > > > > > pre-persist offset validation invariant during offset 
> > > > > > synchronization.
> > > > > > Before any translated offset is committed to the destination 
> > > > > > cluster,
> > > > the
> > > > > > coordinator must apply a double-clamp:
> > > > > >
> > > > > > *syncedOffset = max(destinationLSO, min(sourceCommitted,
> > > > destinationLEO))*
> > > > > >
> > > > > > Where destinationLSO is the Log Start Offset (earliest readable
> > > > position
> > > > > > post-retention) and destinationLEO is the Log End Offset (latest
> > > > replicated
> > > > > > position) of the destination partition.
> > > > > >
> > > > > > This guarantees that every persisted offset falls within the 
> > > > > > physically
> > > > > > valid range  eliminating both the OffsetOutOfRangeException caused 
> > > > > > by
> > > > > > exceeding the LEO during replication lag, and the expired-offset 
> > > > > > rewind
> > > > > > caused by falling below the LSO due to destination retention 
> > > > > > policies.
> > > > > >
> > > > > > The worst-case trade-off under this invariant is bounded 
> > > > > > re-processing
> > > > > > proportional to the replication lag at failover time not total
> > > > partition
> > > > > > depth which is perfectly consistent with Kafka's documented
> > > > at-least-once
> > > > > > delivery guarantees. Broad enterprise production evidence from
> > > > large-scale
> > > > > > cross-cluster failovers confirms that state checks alone (preventing
> > > > active
> > > > > > group overwrites) are insufficient; strict offset bounds clamping is
> > > > > > required to achieve enterprise-grade data integrity.
> > > > > >
> > > > > > I look forward to your thoughts on implementing this final offset
> > > > capping
> > > > > > logic. Once VK12 is patched, I believe this architecture will be
> > > > > > exceptionally robust and ready for enterprise deployment.
> > > > > >
> > > > > > Best Regards,
> > > > > >
> > > > > > Viquar Khan
> > > > > >
> > > > > >
> > > > > > On Mon, 18 May 2026 at 14:23, Rajini Sivaram 
> > > > > > <[email protected]>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Federico,
> > > > > > >
> > > > > > > Thanks for the updates! The KIP is looking good. A few more small
> > > > > > comments.
> > > > > > >
> > > > > > >
> > > > > > > RS13: A couple of places still refer to `kafka-mirror.sh` like 
> > > > > > > under
> > > > > > > `Failover Process`. Can we change them to
> > > > `*kafka-cluster-mirrors.sh*`?
> > > > > > >
> > > > > > > RS14: Should we change `--entity-type mirrors` for kafka-configs 
> > > > > > > to
> > > > be `
> > > > > > > --entity-type *cluster-mirrors*` to be consistent? Also,
> > > > > > > CLUSTER_MIRROR((byte)
> > > > > > > 64, "mirror"); could be `*cluster-mirror*`?
> > > > > > >
> > > > > > > RS15: It may be useful to rename `mirror.topic.num.partitions` 
> > > > > > > and `
> > > > > > > mirror.topic.replication.factor` since they are very similar to `
> > > > > > > mirror.topic.properties.exclude`, but the `mirror.topic` prefix
> > > > refers to
> > > > > > > different topics (the internal topic for the first two and actual
> > > > mirror
> > > > > > > topics for the other one).
> > > > > > >
> > > > > > > RS16: ACL Sync: KIP says "Deletes ACLs that exist in destination 
> > > > > > > but
> > > > not
> > > > > > in
> > > > > > > source using DeleteAcls request."
> > > > > > > What happens if someone creates an ACL on the destination to deny
> > > > > > > User:Alice access to all topics?
> > > > > > >
> > > > > > >    1. If that ACL also existed on the source cluster and then it 
> > > > > > > was
> > > > > > >    removed, will it get removed from the destination?
> > > > > > >    2. If that ACL never existed on the source cluster, will it get
> > > > > > removed
> > > > > > >    from the destination?
> > > > > > >
> > > > > > > RS17: The table in the Source ACLs section says:
> > > > "DescribeClusterMirrors
> > > > > > >    MC      Read    Cluster     Log truncation"
> > > > > > > Should that be `ClusterMirror:Read` instead of `Cluster:Read`?
> > > > > > >
> > > > > > > Regards,
> > > > > > >
> > > > > > > Rajini
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Fri, May 15, 2026 at 8:41 AM Federico Valeri <
> > > > [email protected]>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Rajini, we finally addressed the API and tool naming
> > > > refactoring as
> > > > > > > > you suggested in RS6. Please take a look when you have time.
> > > > Thanks.
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, May 11, 2026 at 6:17 PM Federico Valeri <
> > > > [email protected]>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > Hello all, I want to highlight a couple of new paragraphs:
> > > > > > > > >
> > > > > > > > > 1. Leader Epoch Invariant: Cluster mirroring enforces the
> > > > invariant
> > > > > > > > > that the destination leader epoch must always be greater than 
> > > > > > > > > or
> > > > > > equal
> > > > > > > > > to the source leader epoch (DLE>=SLE). Without this, consumers
> > > > on the
> > > > > > > > > destination cluster can get stuck in an infinite metadata 
> > > > > > > > > refresh
> > > > > > loop
> > > > > > > > > when they encounter committed offsets carrying source epochs
> > > > higher
> > > > > > > > > than the local epoch. The invariant is maintained through 
> > > > > > > > > three
> > > > > > > > > mechanisms: reactive bumping (epoch fencing triggered when 
> > > > > > > > > SLE >
> > > > DLE
> > > > > > > > > during fetch), proactive bumping (scheduled when SLE 
> > > > > > > > > approaches
> > > > DLE
> > > > > > > > > within a threshold), and periodic bumping (checked during
> > > > coordinator
> > > > > > > > > metadata sync).
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620973#KIP1279:ClusterMirroring-LeaderEpochInvariant
> > > > > > > > >
> > > > > > > > > 2. Group Offsets: The coordinator periodically syncs consumer 
> > > > > > > > > and
> > > > > > > > > share group offsets from the source cluster to the destination
> > > > for
> > > > > > all
> > > > > > > > > mirrored topics. Groups are filtered by configurable
> > > > include/exclude
> > > > > > > > > patterns, and offsets are only synced for groups that are not
> > > > > > > > > currently active on the destination cluster, preventing
> > > > overwrites of
> > > > > > > > > local consumer progress. Because source and destination share 
> > > > > > > > > the
> > > > > > same
> > > > > > > > > topic offsets (no offset translation), synced offsets can be 
> > > > > > > > > used
> > > > > > > > > directly without mapping.
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620973#KIP1279:ClusterMirroring-GroupOffsets
> > > > > > > > >
> > > > > > > > > These new paragraphs directly address some of your questions,
> > > > but let
> > > > > > > > > me list them here:
> > > > > > > > >
> > > > > > > > > JR2: Yes, we removed the incorrect phrase and added more 
> > > > > > > > > details
> > > > to
> > > > > > > > > the paragraph.
> > > > > > > > >
> > > > > > > > > JR4: When source cluster topic has tiered storage enabled, CM
> > > > works
> > > > > > by
> > > > > > > > > mirroring remote and local log into destination cluster. When
> > > > > > > > > destination cluster topic has tiered storage enabled, CM 
> > > > > > > > > fails in
> > > > > > > > > PREPARING state because the LME may be in remote storage, but
> > > > works
> > > > > > > > > fine if already MIRRORING because no truncation is needed.
> > > > > > > > >
> > > > > > > > > JR11: See "Leader Epoch Invariant" paragraph mentioned above.
> > > > > > > > >
> > > > > > > > > JR13: We can't support stateful Streams application because
> > > > > > > > > asynchronous replication cannot preserve the transactional
> > > > boundaries
> > > > > > > > > between input offset commits, state store mutations written to
> > > > > > > > > changelog topics, and intermediate records written to 
> > > > > > > > > repartition
> > > > > > > > > topics. The synchronous extension of this design will be able 
> > > > > > > > > to
> > > > > > > > > support them. Existing Features Integration paragraph updated.
> > > > > > > > >
> > > > > > > > > JR18: See "Group Offsets" paragraph mentioned above.
> > > > > > > > >
> > > > > > > > > IY1: See "Group Offsets" paragraph mentioned above.
> > > > > > > > >
> > > > > > > > > Thanks
> > > > > > > > > Fede
> > > > > > > > >
> > > > > > > > > On Mon, May 11, 2026 at 6:08 PM Federico Valeri <
> > > > > > [email protected]>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Hi Vaquar,
> > > > > > > > > >
> > > > > > > > > > VK4/VK8: We don't do PID mapping anymore. The KIP was 
> > > > > > > > > > updated
> > > > some
> > > > > > > > > > time ago with the new approach based on the new PID reset
> > > > control
> > > > > > > > > > record.
> > > > > > > > > >
> > > > > > > > > > VK11: The transaction index is always built locally during 
> > > > > > > > > > log
> > > > > > > append,
> > > > > > > > > > never copied.
> > > > > > > > > >
> > > > > > > > > > VK1: The 50,000 * 1MB = 50GB calculation misunderstands the
> > > > fetch
> > > > > > > > > > model. Fetcher threads don't allocate one buffer per 
> > > > > > > > > > partition.
> > > > > > > Actual
> > > > > > > > > > peak memory is roughly num_fetcher_threads *
> > > > response_max_bytes,
> > > > > > not
> > > > > > > > > > num_partitions * partition_max_bytes. With 1 fetcher thread
> > > > and the
> > > > > > > > > > default response max, the memory footprint is modest
> > > > regardless of
> > > > > > > > > > partition count. We are leveraging the same proven pattern
> > > > used by
> > > > > > > the
> > > > > > > > > > internal replication.
> > > > > > > > > >
> > > > > > > > > > VK3: The __mirror_state topic uses hash-based partitioning
> > > > based on
> > > > > > > > > > mirrorName, topicId and partition number. With the 
> > > > > > > > > > production
> > > > > > default
> > > > > > > > > > of 50 partitions, 50,000 partition transitions distribute
> > > > across
> > > > > > ~50
> > > > > > > > > > partition leaders on different brokers, not a single broker.
> > > > This
> > > > > > is
> > > > > > > > > > the same proven pattern as __consumer_offsets, which handles
> > > > > > millions
> > > > > > > > > > of commits.
> > > > > > > > > >
> > > > > > > > > > VK12: The scenario described can only occur if offsets are
> > > > > > > > > > force-written to an active group, which the design prevents.
> > > > > > > > > >
> > > > > > > > > > Cheers
> > > > > > > > > > Fede
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> >

Reply via email to