Hi Fede,
Thanks for the updates.

AS25: It's all good in the current version.

AS26: Yes, it's definitely theoretical and I didn't mean for the KIP to change. 
I suppose that the CreateTopic with a specific topic ID would fail because the 
topic ID is already in use. That seems entirely appropriate behaviour.

AS27: start/stop/pause/resume is definitely nicer.

Thanks,
Andrew

On 2026/04/15 08:31:19 Federico Valeri wrote:
> Hi Andrew,
> 
> AS8: I'm working on this and will update the KIP soon.
> 
> AS24: Still thinking about this. We will update in case we find a better
> design. We are also considering your suggestions.
> 
> AS25: Where do you see that in the KIP? Maybe it was in a previous
> revision. Anyway, we decided to remove the --replication-factor flag to
> always honor the destination cluster default. The operator can always use
> the assignment script to change it.
> 
> AS26: That's an interesting theoretical observation. Sharing the same topic
> ID between source and mirror is important for safety, as it ensures the
> mirror is replicating data from the correct source topic, not from a
> different topic that happens to share the same name. Any future
> multi-tenancy solution would need to handle this transparently in its own
> layer. For example, in addition to rewriting the topic name (namespacing),
> a proxy would also need to intercept DescribeTopicPartitions (and likely
> CreateTopics with MirrorInfo) to remap or namespace topic IDs transparently.
> 
> AS27: Yes, add/remove along with pause/resume is confusing. We changed
> AddTopicsToMirror/RemoveTopicsFromMirror to
> StartMirrorTopics/StopMirrorTopics. We will still keep the same semantics
> though. Stop can be used for failover or migration use cases, and pause for
> "pausing mirroring without making the topic writable" (maintenance).
> 
> Thanks
> Fede
> 
> 
> 
> On Fri, Apr 3, 2026 at 4:59 PM Andrew Schofield <[email protected]>
> wrote:
> >
> > Hi Fede and friends,
> > Thanks for the responses to my excessive comments. I like the direction
> it's heading in.
> >
> > AS6, AS7: Thanks for adding the tables of permissions. Making
> ClusterMirror an ACL resource seems like a good improvement to me. A few
> detailed clarifications:
> >
> > DescribeConfigs RPC: DESCRIBE_CONFIGS on TOPIC, not DESCRIBE.
> > CreatePartitions RPC: ALTER on TOPIC.
> > IncrementalAlterConfigs RPC: ALTER_CONFIGS on CLUSTER_MIRROR on mirrors,
> and ALTER_CONFIGS on TOPIC for topics.
> > OffsetCommit RPC: READ on GROUP and READ on TOPIC.
> > CreateAcls RPC: ALTER on CLUSTER.
> > DeleteAcls RPC: ALTER on CLUSTER.
> >
> > For operations which act on two kinds of resources, such as doing topic
> things to groups, we generally need permission on both resources. I suggest:
> >
> > AddTopicsToMirror RPC: Maybe add READ on TOPIC.
> > RemovesTopicsFromMirror RPC: Maybe add READ on TOPIC.
> >
> > AS8: I think that including AuthorizedOperations in the
> DescribeMirrorsResponse will work nicely now.
> >
> > AS9: Thanks for the table of error codes. Looks comprehensive, but you'll
> need MIRROR_AUTHORIZATION_FAILED too I think.
> >
> > AS10-15: Thanks. Looks good.
> >
> > AS16: Thanks. But I now have AS24 below. Sorry.
> >
> > AS17: Thanks. Looks good.
> >
> > AS18: The epoch information has been reworked in the latest version.
> Looks good to me.
> >
> > AS19-21: Thanks. Looks good.
> >
> > AS22-23: Also, see AS24 below.
> >
> > And here are a few new comments.
> >
> > AS24: It seems to me that the list of topics being mirrored is really a
> property of the mirror resource. Having `mirror.name` as a topic config,
> and then overloading it with various state suffixes seems a bit inelegant.
> >
> > I suggest:
> > * The mirror name follows the same rules as topic name (which it cannot
> quite do as the KIP is written because of .deleted and so on).
> > * The list of topics are a property of the mirror. Adding and removing
> topics mutates the mirror resource.
> > * The mirror name is no longer a topic config. Then, you do not need
> special handling to hide it if the user is not authorized to describe the
> cluster mirror, and you don't need to fiddle with the names as the topic's
> mirroring state changes. Since the state changes are mediated by the
> cluster mirroring components, keeping control of the state in the mirror
> resource seems workable.
> > * The mirror state for the topics in the mirror is also handled as
> properties or metadata of the mirror resource.
> > * You probably would need DESCRIBE on TOPIC to see that a topic was being
> mirrored, as well as DESCRIBE on MIRROR. This matches resources such as
> consumer groups where you can only see the committed offsets for the topics
> you can describe.
> >
> > I know this is largely a matter of opinion, so feel free to reject my
> suggestion.
> >
> > AS25: When cluster mirroring creates a topic on the destination, I wonder
> why it does not inherit the replication factor of the source topic by
> default. I can understand why you might want source and mirror topics to
> have different replication factors, but I think the default is currently
> the default replication factor for the destination cluster, as opposed to
> the replication factor of the source topic.
> >
> > AS26: The source topic and mirror topic use the same topic ID. I like the
> simplicity of this, but there's a theoretical implication which I thought I
> would raise. Although Apache Kafka itself does not support multi-tenancy
> yet, people have built multi-tenancy on top using proxies and techniques
> such as adding and removing topic name prefixes transparently to the
> clients. It seems to me that multi-tenancy in Apache Kafka is a gap waiting
> to be filled and before long a suitable KIP will be brought forward. With
> such techniques, if someone tried to use cluster mirroring where the source
> and destination "clusters" were actually virtual clusters on the same Kafka
> cluster, the attempt to create the mirror topic with the same topic ID
> would fail. I'm sure this is just a theoretical concern for AK because the
> project itself doesn't have multi-tenancy, yet, but I wondered what the
> authors think about this.
> >
> > AS27: This is definitely a matter of personal taste and it probably
> indicates that my mental model of the KIP differs from the authors'. It
> seems to me that `kafka-mirrors.sh --remove` should remove a topic from a
> mirror because mirroring it is no longer required. However, this is
> actually the command for initiating failover. Should there be a separate
> `kafka-mirrors.sh --failover`?
> >
> > Thanks,
> > Andrew
> >
> > On 2026/04/03 09:35:47 Federico Valeri wrote:
> > > Hello everyone,
> > >
> > > Thank you all for the thoughtful questions and suggestions, and thanks
> > > to Michael Maison for proposing a better approach to the chained
> > > mirroring problem. Some of these required careful consideration and
> > > led us to refine the design.
> > >
> > > For reviewers, I recommend focusing on the paragraphs with significant
> changes:
> > >
> > > 1. MirrorFetcherThread (updated)
> > > 2. Security Control (updated)
> > > 3. Idempotent Producer (updated)
> > > 4. Command Workflows (new)
> > > 5. Error Names (new)
> > >
> > > Below are answers to the outstanding questions.
> > >
> > > AS6, AS7: I added a table to "Security Control" illustrating
> > > authorization requirements for each component. Let me know if it
> > > covers what you had in mind.
> > >
> > > AS8: I removed that field for now. It is on my backlog and I will post
> > > an update soon.
> > >
> > > AS9, AS17: There is a new "Errors" paragraph with a table listing new
> > > and reused error codes, along with the RPCs that use each one.
> > >
> > > AS18: Could you clarify which LeaderEpoch checks you are referring to?
> > > Here is our understanding:
> > >
> > > 1. WriteMirrorStatesRequest: The coordinator validates that the
> > > current leader epoch matches the request leader epoch, rejecting
> > > outdated requests.
> > > 2. ReadMirrorStatesResponse: The coordinator includes the current
> > > leader epoch in the response, and the leader node validates it against
> > > its own current leader epoch.
> > > 3. LastMirroredOffsetsResponse: We replaced this with
> > > LastMirroredEpochsResponse, so we believe this no longer applies.
> > >
> > > AS22: Description updated to: "This property is filtered out from
> > > DescribeConfigs responses to avoid exposing internal state to users."
> > >
> > > ---
> > >
> > > RS1: I added the DeleteMirror RPC and updated all relevant sections.
> > >
> > > RS5: As you suggested, I introduced a new ResourceType called
> > > ClusterMirror. See "Security Control."
> > >
> > > RS6: The refactoring suggestions make sense, but we need more time to
> > > think them through.
> > >
> > > RS9: Until we support tiered storage, we will mirror all records
> > > including data in remote storage into the destination cluster.
> > >
> > > RS10: There is a future improvement briefly described in "Bandwidth
> > > Control." Let me know if you need more details.
> > >
> > > ---
> > >
> > > JR3: I created a new "Command Workflows" section that covers the
> > > workflow for each mirror operation, including failover and failback.
> > > Let me know if you spot any issues.
> > >
> > > I hope I have addressed everything, but please let me know if I missed
> > > any question.
> > >
> > > Cheers
> > > Fede
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Mar 20, 2026 at 5:12 AM Luke Chen <[email protected]> wrote:
> > > >
> > > > Hi Jun,
> > > >
> > > > Thanks for the review comments.
> > > > Answer them below.
> > > >
> > > > > JR1. "Epoch rewriting: When records are appended to the destination
> log,
> > > > the batch epochs are rewritten to match the destination cluster's
> leader
> > > > epochs, maintaining consistency within the destination cluster." This
> has a
> > > > couple of impacts.
> > > >
> > > > We have an updated design to support unclean leader election in this
> doc,
> > > > where we will NOT rewrite the leader epoch anymore.
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/Unclean+Leader+Election+in+Cluster+Mirroring
> > > >
> > > >
> > > > > JR2. "Tiered Storage is not initially supported": Ideally, we should
> > > > support tiered storage. Same as RS9, the destination cluster issues
> > > > consumer requests, which support tiering.
> > > > The OffsetMovedToTieredStorageException is used by replication
> > > > inside AbstractFetcherThread. This suggests that it's probably not a
> good
> > > > fit for cluster mirroring to use AbstractFetcherThread.
> > > >
> > > > Yes, thanks for pointing this out. I was wrong about this. Before we
> > > > support tiered storage, the mirroring will mirror all records
> including
> > > > data in the remote storage into the destination cluster.
> > > >
> > > > > JR3. For each new cluster mirroring command, it would be useful to
> > > > document
> > > > the underlying workflow (e.g, which RPC requests are issued, to which
> node;
> > > > what records are written to metadata topic, or internal topic, which
> > > > actions are triggered on the broker, etc).
> > > >
> > > > Will do.
> > > >
> > > > > JR4. Truncating a log to LMO. Currently, there is no internal API
> that
> > > > truncates a partition from the end. Could you describe how this will
> be
> > > > implemented to ensure all replicas are consistent after the
> truncation?
> > > >
> > > > The truncation flow is like this:
> > > > 1. When the MirrorMetadataManager in the node gets notified about the
> > > > partition leader assignment when onMetadataUpdate (via TopicsDelta),
> it'll
> > > > query the mirror coordinator about mirror partition state.
> > > > 2. When it's the PREPARING state, the MirrorMetadataManager in the
> leader
> > > > node will get the last mirrored offset (or epoch) from the source
> cluster
> > > > (new API) and then do the log truncate.
> > > > 3. In (2), we'll also register a callback in Partition instance, and
> wait
> > > > until all ISRs complete the truncation by checking the follower
> replica's
> > > > LEO.
> > > > 4. In (3), this check will be invoked every time the leader node
> update
> > > > follower fetch state, like how we check if high watermark should be
> > > > incremented.
> > > > 5. After all ISRs complete the truncation, we'll invoke the callback
> and
> > > > move the mirror partition state to MIRRORING, and then start fetching
> data
> > > > from the source cluster.
> > > >
> > > > Note:
> > > > (1) In PREPARING state, the partition is READ-ONLY, so there will no
> any
> > > > data written in the leader node
> > > > (2) During step (1) ~ (4), if any leadership change happens, the new
> leader
> > > > will start from step (1) to complete the log truncation process.
> > > > (3) If unclean leader election is supported
> > > > (i.e. mirror.support.unclean.leader.election=true), then we'll wait
> until
> > > > ALL registered replicas complete the truncation before moving on to
> > > > MIRRORING state.
> > > >
> > > > We'll update the KIP in the following days to address the community
> > > > feedback. Some questions need more thinking.
> > > > Please give us some time. :)
> > > >
> > > > Thank you,
> > > > Luke
> > > >
> > > >
> > > > On Fri, Mar 20, 2026 at 9:10 AM Jun Rao via dev <[email protected]>
> > > > wrote:
> > > >
> > > > > Hi, Federico,
> > > > >
> > > > > Thanks for the KIP. A few comments.
> > > > >
> > > > > JR1. "Epoch rewriting: When records are appended to the destination
> log,
> > > > > the batch epochs are rewritten to match the destination cluster's
> leader
> > > > > epochs, maintaining consistency within the destination cluster."
> This has a
> > > > > couple of impacts.
> > > > > JR1.1 How do we ensure that the leader epoch in committed offsets is
> > > > > consistent with the leader epoch in the batch? This consistency is
> > > > > important when the consumer fails over to a different cluster. It
> seems the
> > > > > KIP doesn't translate the leader epoch when mirroring the comitted
> offsets.
> > > > > JR1.2 Typically, leader epochs increase monotonically in the log.
> Do we
> > > > > ensure this remains the case after failover and failback?
> > > > >
> > > > > JR2. "Tiered Storage is not initially supported": Ideally, we should
> > > > > support tiered storage. Same as RS9, the destination cluster issues
> > > > > consumer requests, which support tiering.
> > > > > The OffsetMovedToTieredStorageException is used by replication
> > > > > inside AbstractFetcherThread. This suggests that it's probably not
> a good
> > > > > fit for cluster mirroring to use AbstractFetcherThread.
> > > > >
> > > > > JR3. For each new cluster mirroring command, it would be useful to
> document
> > > > > the underlying workflow (e.g, which RPC requests are issued, to
> which node;
> > > > > what records are written to metadata topic, or internal topic, which
> > > > > actions are triggered on the broker, etc).
> > > > >
> > > > > JR4. Truncating a log to LMO. Currently, there is no internal API
> that
> > > > > truncates a partition from the end. Could you describe how this
> will be
> > > > > implemented to ensure all replicas are consistent after the
> truncation?
> > > > >
> > > > > Jun
> > > > >
> > > > > On Mon, Mar 16, 2026 at 2:44 AM Federico Valeri <
> [email protected]>
> > > > > wrote:
> > > > >
> > > > > > I mentioned a corner case in the chained mirroring use case. Let
> me
> > > > > > clarify what I mean with a simple example:
> > > > > >
> > > > > > 1. B is fetching from A, and C is fetching from B (A --> B --> C)
> > > > > > 2. A producer with PID 5 sends records to A
> > > > > > 3. Failover happens and B becomes writable (A -x-> B --> C)
> > > > > > 4. A different producer with PID 5 sends records to B
> > > > > > 5. Collision on cluster C (two different producers mapped to PID
> -7 in C)
> > > > > >
> > > > > > (arrows represent data flowing, not fetch direction)
> > > > > >
> > > > > >
> > > > > > On Sun, Mar 15, 2026 at 7:14 PM Federico Valeri <
> [email protected]>
> > > > > > wrote:
> > > > > > >
> > > > > > > Hi Rajini, thanks for your thoughtful review and for catching a
> few
> > > > > > > bugs. I'll skip some questions that we will address later.
> > > > > > >
> > > > > > > RS2: The metadata records are described in "Mirror Metadata
> Records"
> > > > > > > paragraph. Currently there are only two records:
> "LastMirroredOffsets"
> > > > > > > record tracks the latest successfully mirrored offset for each
> > > > > > > partition, while "MirrorPartitionState" record represents the
> > > > > > > lifecycle states of a mirrored partition.
> > > > > > >
> > > > > > > RS3: That's a good point that was also raised by Andrew. It was
> an
> > > > > > > easy solution that we used for our prototype, but we need to
> think
> > > > > > > about a better solution.
> > > > > > >
> > > > > > > RS4: Current design is that mirror fetcher threads behaves like
> a
> > > > > > > read_committed consumer fetching up to "source LSO". On
> failover we
> > > > > > > truncate destination log to "local LSO".
> > > > > > >
> > > > > > > The approach of fetching up to HW that you propose is still
> safe as we
> > > > > > > keep truncating to local LSO on failover, but it trades lower
> > > > > > > steady-state lag (especially when long-running transactions
> exist on
> > > > > > > the source) for more data loss on failover (the net data loss
> relative
> > > > > > > to the source is the same in both approaches). In other words,
> with
> > > > > > > your approach we fetch more data that we may then need to
> truncate.
> > > > > > > Also, read_uncommited consumers on the destination cluster
> would be
> > > > > > > able to read records that may be truncated on failover. These
> are just
> > > > > > > my consideration, but we are open to discussion on which is the
> best
> > > > > > > approach here.
> > > > > > >
> > > > > > > When a failover is triggered (RemoveTopicsFromMirror), the
> sequence is:
> > > > > > >
> > > > > > > 1. Partitions transition to STOPPING state
> > > > > > > 2. Fetchers are removed
> > > > > > > 3. For each partition, truncate to local LSO is called
> > > > > > >   3.1. Reads LSO from each partition's local log
> > > > > > >   3.2. Calls log.truncateTo(offset) on the UnifiedLog
> > > > > > >   3.3. Ensures ISR members complete truncation before the
> partition
> > > > > > > becomes writable
> > > > > > > 4. For each partition, the LSO is recorded as the last mirrored
> offset
> > > > > > > (LMO) in __mirror_state
> > > > > > > 5. Partitions transition to STOPPED and become writable
> > > > > > >
> > > > > > > When a failback is triggered (AddTopicsToMirror), the sequence
> is:
> > > > > > >
> > > > > > > 1. Partitions transition to PREPARING state
> > > > > > > 2. For each partition, truncation to LMO is called
> > > > > > >   2.1. This sends a LastMirroredOffsetsRequest to the source
> cluster
> > > > > > > to fetch the offsets that were recorded during the previous
> failover
> > > > > > >   2.2.a. The response offsets are used to truncate local logs
> > > > > > >   2.2.b. If the source cluster doesn't support the
> LastMirroredOffsets
> > > > > > > API or first-time mirror, it truncates to offset 0
> > > > > > > 3. Partitions transition to MIRRORING
> > > > > > >
> > > > > > > RS7: Can you point me to the section that says configs are
> stored in
> > > > > > > __mirror_state? Mirror connection configs (bootstrap servers,
> > > > > > > credentials, etc.) are stored in KRaft metadata via
> > > > > > > ConfigResource.Type.MIRROR, not in __mirror_state. The internal
> topic
> > > > > > > only stores partition states and last mirrored offsets.
> Sensitive
> > > > > > > credentials follow the existing KRaft config handling, which is
> > > > > > > already protected by controller/broker access controls and
> sensitive
> > > > > > > config redaction in DescribeConfigs responses.
> > > > > > >
> > > > > > > RS8: Not sure what's the recommended approach here. Adding a
> new error
> > > > > > > code does not change the response schema and older clients that
> don't
> > > > > > > recognize the new error code will surface it as an
> > > > > > > UnknownServerException (non-retriable).
> > > > > > >
> > > > > > > RS11: Good catch. This is a prototype simplification that we
> need to
> > > > > > > address. To properly sync consumer group offsets, the
> implementation
> > > > > > > would need to send ListGroups to all source brokers (or use the
> > > > > > > AdminClient which does this internally), send FindCoordinator to
> > > > > > > discover the group coordinator for each group, send OffsetFetch
> to the
> > > > > > > correct group coordinator.
> > > > > > >
> > > > > > > RS12: You are absolutely right, the transformation is not
> idempotent,
> > > > > > > so it is not safe for chained mirroring (A -> B -> C). Instead,
> > > > > > > round-trip mirroring (A -> B, then B -> A) works because, when
> doing a
> > > > > > > failback, the log is truncated before mirroring resumes, so
> previously
> > > > > > > mirrored records with negative pids are removed and the
> transformation
> > > > > > > is only applied to new records produced natively on that cluster
> > > > > > > (double-transformation never occurs). Non-transactional batches
> stay
> > > > > > > at -1 [ -(-1 + 2) = -(1) = -1], which is correct.
> > > > > > >
> > > > > > > The chained mirroring would work if we skip the transformation
> when
> > > > > > > pid is negative, but there is still an edge case: A -> B -> C
> with
> > > > > > > local B producer. If cluster A has local pid 5 and cluster B
> also has
> > > > > > > local pid 5, both end up as -7 on cluster C. Collision: two
> different
> > > > > > > producers with the same pid on the destination. No pid-only
> > > > > > > transformation can solve that. We would need to incorporate
> cluster
> > > > > > > identity.
> > > > > > >
> > > > > > > Possible solution that would handle any topology: The producer
> IDs are
> > > > > > > 64-bit signed longs used to identify a producer. The clusterId
> (UUID)
> > > > > > > is a globally 128-bit unique identifier for each source
> cluster. We
> > > > > > > could use the clusterId hash to partition the entire negative
> PID
> > > > > > > space into regions, one per source cluster. Basically we divide
> the 64
> > > > > > > bits into three fields: bit 63 (sign bit), bits 62-31 (region
> > > > > > > selector), bits 30-0 (producer identity). Once a non-negative
> PID is
> > > > > > > mapped to a region, it passes through unchanged no matter how
> many
> > > > > > > hops follow (i.e. we apply the transformation only for PIDs >=
> 0).
> > > > > > >
> > > > > > > Example with two clusters:
> > > > > > >
> > > > > > > - Bit 63: This is the sign bit that makes the value negative and
> > > > > > > distinguishes mirrored pids from local ones (which are
> non-negative).
> > > > > > > - Bits 62-31 cluster A: clusterId = "abc-123", clusterHash = 42
> > > > > > > - Bits 62-31 cluster B: clusterId = "xyz-789", clusterHash = 99
> > > > > > > - Bits 30-0: Local producer ID 5 that is the same on both
> clusters.
> > > > > > >
> > > > > > >   A's pid 5 -->
> > > > > > >
> 1|00000000000000000000000000101010|0000000000000000000000000000101
> > > > > > >   B's pid 5 -->
> > > > > > >
> 1|00000000000000000000000001100011|0000000000000000000000000000101
> > > > > > >
> > > > > > > On Wed, Mar 11, 2026 at 1:23 PM Rajini Sivaram <
> > > > > [email protected]>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > A few more questions about the KIP for clarification:
> > > > > > > >
> > > > > > > > RS8: The KIP says produce requests to mirror topics will throw
> > > > > > > > ReadOnlyTopicException. For Produce Requests returning a new
> error to
> > > > > > > > clients, don’t we need to bump Produce request version?
> > > > > > > >
> > > > > > > > RS9: The KIP says we use OffsetMovedToTieredStorageException
> to
> > > > > prevent
> > > > > > > > mirroring of data in tiered storage. But doesn’t the mirror
> client
> > > > > look
> > > > > > > > like a regular consumer to the source cluster and return
> records
> > > > > > fetched
> > > > > > > > from tiered storage?
> > > > > > > >
> > > > > > > > RS10: Client-id based quotas for the source cluster look hard
> to
> > > > > manage
> > > > > > > > since there is no hierarchy or grouping possible. Seems
> better to
> > > > > rely
> > > > > > on
> > > > > > > > secure user-principal based quotas on the source-side.
> > > > > > > >
> > > > > > > > RS11: The KIP says `The manager maintains a connection pool
> with one
> > > > > > > > blocking sender per source cluster`. If this is the
> connection used
> > > > > for
> > > > > > > > periodic sync of offsets, topic configs etc. the coordinator
> is
> > > > > likely
> > > > > > to
> > > > > > > > need connections to all source brokers (i.e. all group
> coordinators).
> > > > > > > >
> > > > > > > > RS12: The KIP proposes to transform producer ids for mirror
> records
> > > > > to
> > > > > > > > avoid conflicts. This comes at a cost because CRC checksum
> needs to
> > > > > be
> > > > > > > > recomputed. To justify this cost, we need to ensure that this
> > > > > > > > transformation works in all cases. What happens if you are
> mirroring
> > > > > a
> > > > > > > > mirror topic? Is that a supported scenario? Or mirroring back
> > > > > mirrored
> > > > > > data
> > > > > > > > during failback because the source was truncated?
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Rajini
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Mar 10, 2026 at 8:19 PM Rajini Sivaram <
> > > > > > [email protected]>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi team,
> > > > > > > > >
> > > > > > > > > Thanks for the KIP! I have a few questions, mostly
> clarification at
> > > > > > this
> > > > > > > > > point.
> > > > > > > > >
> > > > > > > > > RS1: There is a `CreateMirror` request but no corresponding
> > > > > > `DeleteMirror`
> > > > > > > > > request. Is that intentional?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > RS2: It will be good to define the format of data going
> into the
> > > > > > internal
> > > > > > > > > mirror state topic. There is an example under
> kafka-dump-logs,
> > > > > which
> > > > > > > > > shows partition-level state in the payload and the mirror
> name as
> > > > > > key. I
> > > > > > > > > guess that is not what we expect it to be. Do we delete this
> > > > > > information
> > > > > > > > > when a topic is deleted or a mirror is deleted?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > RS3: KIP currently says mirror name cannot end with
> .removed. I
> > > > > > guess it
> > > > > > > > > cannot also end with .paused.  Have we considered storing
> state and
> > > > > > > > > mirror name separately, but updated together for a topic?
> Since new
> > > > > > > > > states may be added in future, name restrictions may become
> hard to
> > > > > > > > > implement.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > RS4: The KIP says *“mirroring must fetch only up to the LSO
> to
> > > > > > maintain
> > > > > > > > > transactional consistency”* and it also says *“During the
> mirror
> > > > > > stopping
> > > > > > > > > transition, the MirrorCoordinator performs a log truncation
> > > > > > operation that
> > > > > > > > > resets each mirror partition to its LSO.”*  I guess the
> plan is to
> > > > > > fetch
> > > > > > > > > up to high watermark and truncate to locally computed LSO on
> > > > > > failover?
> > > > > > > > > Details of the sequence here will be useful. How does
> > > > > > MirrorCoordinator
> > > > > > > > > perform truncation?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > RS5: The KIP says “*On the destination cluster,
> mirror-related
> > > > > > operations
> > > > > > > > > (creating mirrors, adding/removing topics from mirrors,
> managing
> > > > > > mirror
> > > > > > > > > configurations) require the CLUSTER_ACTION permission on the
> > > > > cluster
> > > > > > > > > resource.*” The  `Cluster:ClusterAction` ACL is currently
> used for
> > > > > > broker
> > > > > > > > > service account, e.g. local replication is authorized using
> this.
> > > > > It
> > > > > > seems
> > > > > > > > > odd to grant this permission to users managing a resource
> on the
> > > > > > cluster.
> > > > > > > > > Have we considered adding a new resource type
> `ClusterMirror` and
> > > > > > define
> > > > > > > > > ACLs like `ClusterMirror:Create`, `ClusterMirror:Alter` and
> `
> > > > > > > > > ClusterMirror:AlterConfigs`?
> > > > > > > > >
> > > > > > > > > RS6: The KIP talks about three entities: Cluster Mirror,
> Mirror
> > > > > > Topic and Mirror
> > > > > > > > > Partition, with Cluster Mirroring as the feature name.
> Since we
> > > > > > already
> > > > > > > > > have MirrorMaker that also refers to mirrors, it will be
> nice if we
> > > > > > can
> > > > > > > > > refer to the entities using their full name in the CLI and
> public
> > > > > > APIs.
> > > > > > > > > That will enable us to add more mirror topic and mirror
> partition
> > > > > > APIs in
> > > > > > > > > the future if needed. For example:
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >    - `kafka-cluster-mirrors.sh` to manage cluster mirrors
> > > > > > > > >    -  createClusterMirrors(), listClusterMirrors(),
> > > > > > > > >    describeClusterMirrors() etc on the Admin API and Kafka
> > > > > Protocol.
> > > > > > > > >    -  KIP proposes pauseMirrorTopics(),
> resumeMirrorTopics() which
> > > > > > are
> > > > > > > > >    good.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > RS7: The KIP proposes to store mirror configs in the
> internal
> > > > > mirror
> > > > > > state
> > > > > > > > > topic. This includes sensitive credentials of another
> cluster. Have
> > > > > > we
> > > > > > > > > considered other options? Can a user with read access read
> the data
> > > > > > from
> > > > > > > > > the state topic using a consumer?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Rajini
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Sun, Mar 8, 2026 at 8:58 PM Andrew Schofield <
> > > > > > [email protected]>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Hi Fede and friends,
> > > > > > > > >> I've re-read in detail and have quite a lot of comments,
> mostly
> > > > > > minor
> > > > > > > > >> clarifications, but as it approaches a vote, it's good to
> get the
> > > > > > details
> > > > > > > > >> nailed down.
> > > > > > > > >>
> > > > > > > > >> AS6: Could we have a diagram which shows which RPCs are
> served by
> > > > > > which
> > > > > > > > >> components? This will help illustrate the authorisation
> > > > > > requirements for
> > > > > > > > >> the various components, which is an aspect of the KIP that
> I don't
> > > > > > think is
> > > > > > > > >> completely specified yet.
> > > > > > > > >>
> > > > > > > > >> AS7: Please could you include a table of the operations and
> > > > > > resources
> > > > > > > > >> which will be checked for authorisation of each of the RPCs
> > > > > > introduced.
> > > > > > > > >> Also, please could you document the permissions which the
> > > > > > destination
> > > > > > > > >> cluster will require to mirror data and ACLs (for example,
> I think
> > > > > > it will
> > > > > > > > >> need ALTER on the CLUSTER resource to manipulate ACLs)?
> It's going
> > > > > > to need
> > > > > > > > >> Metadata, DescribeConfigs, DescribeAcls, ListGroups,
> OffsetFetch,
> > > > > > > > >> LastMirrorOffset and Fetch RPCs I think, possibly others
> too. The
> > > > > > user is
> > > > > > > > >> probably going to want to give as little permission as
> possible to
> > > > > > the
> > > > > > > > >> destination cluster to get its job done.
> > > > > > > > >>
> > > > > > > > >> AS8: You include AuthorizedOperations in
> DescribeMirrorsResponse,
> > > > > > but I
> > > > > > > > >> don't know what the operations are. I think the implies
> MIRROR is
> > > > > a
> > > > > > new
> > > > > > > > >> resource type in the Kafka security model and
> DescribeMirrors can
> > > > > > be used
> > > > > > > > >> to enquire the authorised operations for the client making
> the
> > > > > > Admin API
> > > > > > > > >> request.
> > > > > > > > >>
> > > > > > > > >> AS9: I think you're going to need some new error codes in
> the
> > > > > Kafka
> > > > > > > > >> protocol, as least:
> > > > > > > > >>
> > > > > > > > >> * INVALID_MIRROR_NAME or similar if the mirror name
> doesn't meet
> > > > > the
> > > > > > > > >> rules for a topic name
> > > > > > > > >> * UNKNOWN_MIRROR if the mirror doesn't exist
> > > > > > > > >>
> > > > > > > > >> And probably some more for logical inconsistencies such as
> this
> > > > > > topic
> > > > > > > > >> isn't in that mirror, that topic is already in another
> mirror, and
> > > > > > so on.
> > > > > > > > >>
> > > > > > > > >> AS10: Could you add the usage information for
> kafka-mirrors.sh
> > > > > (the
> > > > > > > > >> intended output from kafka-mirrors.sh --help) so all of the
> > > > > options
> > > > > > are
> > > > > > > > >> documented together? For example, I see that
> --replication-factor
> > > > > is
> > > > > > > > >> included in one of the examples, which seems a bit
> surprising and
> > > > > > I'm not
> > > > > > > > >> sure whether it's a mistake or a feature. I can probably
> use
> > > > > > --describe
> > > > > > > > >> with a specific --mirror but it's not specified.
> > > > > > > > >>
> > > > > > > > >> AS11: I would expect the signature for
> Admin.addTopicsToMirror to
> > > > > be
> > > > > > > > >> Admin.addTopicsToMirror(String mirrorName, Set<String>
> topics,
> > > > > > > > >> AddTopicsToMirrorOptions options) because it's for adding
> topics
> > > > > to
> > > > > > a
> > > > > > > > >> mirror, as the counterpart to
> Admin.removeTopicsFromMirror(String
> > > > > > > > >> mirrorName, Set<String> topics,
> RemoveTopicsFromMirrorOptions
> > > > > > options).
> > > > > > > > >>
> > > > > > > > >> AS12: I don't think ignorable RPC fields in version 0 RPCs
> make
> > > > > > sense
> > > > > > > > >> because they're not trying to be compatible with a previous
> > > > > version.
> > > > > > > > >>
> > > > > > > > >> AS13: I would have expected AddTopicsToMirrorRequest to
> have
> > > > > mirror
> > > > > > name
> > > > > > > > >> above the list of topics because the same mirror name
> applies to
> > > > > > all of the
> > > > > > > > >> topics being added. As specified, you repeat the mirror
> name for
> > > > > > all of the
> > > > > > > > >> topics.
> > > > > > > > >>
> > > > > > > > >> AS14: I suggest adding ErrorMessage to the responses in
> all cases
> > > > > > to make
> > > > > > > > >> it easier to give more descriptive exception messages than
> just
> > > > > the
> > > > > > default
> > > > > > > > >> for the error codes.
> > > > > > > > >>
> > > > > > > > >> AS15: I may have the wrong end of the stick here, but I
> expected
> > > > > > > > >> RemoveTopicsFromMirrorRequest to remove the topics from a
> specific
> > > > > > named
> > > > > > > > >> mirror as implied by the example of the kafka-mirrors.sh
> command.
> > > > > > In fact,
> > > > > > > > >> I was expecting the mirror to contain the topics in the
> admin RPC
> > > > > > requests
> > > > > > > > >> and responses, and that's only true for about half of them.
> > > > > > > > >>
> > > > > > > > >> AS16: Can I change the mirror.name config using
> > > > > > IncrementalAlterConfigs?
> > > > > > > > >> If I attempt it, what's the error?
> > > > > > > > >>
> > > > > > > > >> AS17: If I attempt mirror RPCs when the mirror is in the
> wrong
> > > > > > state, the
> > > > > > > > >> error is specified as INVALID_REQUEST. That's usually kept
> for
> > > > > > badly formed
> > > > > > > > >> requests, as opposed to logically invalid ones. Maybe
> > > > > > MIRROR_NOT_STOPPED or
> > > > > > > > >> MIRRORING_ACTIVE or similar would be more expressive.
> > > > > > > > >>
> > > > > > > > >> AS18: Should the LastMirroredOffsetsResponse,
> > > > > > ReadMirrorStatesResponse
> > > > > > > > >> and WriteMirrorStatesRequest include LeaderEpoch? I
> suspect so.
> > > > > > > > >>
> > > > > > > > >> AS19: In DescribeMirrorsResponse, I suspect you will want
> "null"
> > > > > > values
> > > > > > > > >> for some fields which don't have values during
> initialisation and
> > > > > > so on,
> > > > > > > > >> such as lag.
> > > > > > > > >>
> > > > > > > > >> AS20: Do you need to add new versions of the
> DescribeConfigs and
> > > > > > > > >> IncrementalAlterConfigs RPCs to support mirror resources?
> > > > > > > > >>
> > > > > > > > >> AS21: The topic configuration
> > > > > mirror.replication.throttled.replicas
> > > > > > is
> > > > > > > > >> described as a list, but the default is MAX_LONG.
> > > > > > > > >>
> > > > > > > > >> AS22: By including mirror.name as a topic config, a client
> which
> > > > > > has
> > > > > > > > >> permission to describe configs for the topic is able to
> discover
> > > > > > the name
> > > > > > > > >> of the mirror, whether they are permitted to list the
> mirrors or
> > > > > > describe
> > > > > > > > >> that particular mirror. Generally, the Kafka authorisation
> model
> > > > > > does not
> > > > > > > > >> permit this kind of unauthorised information disclosure.
> For
> > > > > > example, when
> > > > > > > > >> a client describes the committed offsets for a consumer
> group, the
> > > > > > list of
> > > > > > > > >> topics returned is filtered to only those topics which the
> client
> > > > > is
> > > > > > > > >> permitted to describe, even though that may results in an
> > > > > > incomplete set of
> > > > > > > > >> topic partitions being returned. Is there an alternative
> way in
> > > > > > which this
> > > > > > > > >> information could be stored so Kafka only reveals mirror
> > > > > > information to
> > > > > > > > >> principals authorised to see it?
> > > > > > > > >>
> > > > > > > > >> AS23: I observe that there are situations in which a
> `.removed`
> > > > > > suffix is
> > > > > > > > >> added to the mirror name. Is it permitted for the user to
> define a
> > > > > > mirror
> > > > > > > > >> called "my.nasty.mirror.removed" and does it break
> anything?
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> Thanks,
> > > > > > > > >> Andrew
> > > > > > > > >>
> > > > > > > > >> On 2026/03/06 13:41:52 Paolo Patierno wrote:
> > > > > > > > >> > Hi Fede,
> > > > > > > > >> > something more ...
> > > > > > > > >> >
> > > > > > > > >> > Is there any migration path for users who want to
> migrate from
> > > > > > using
> > > > > > > > >> Mirror
> > > > > > > > >> > Maker 2 to the cluster mirroring?
> > > > > > > > >> > I mean, something like a tool useful to create a
> corresponding
> > > > > > cluster
> > > > > > > > >> > mirroring configuration starting from a MM2 one. Nothing
> that
> > > > > > runs the
> > > > > > > > >> > migration automatically but something that can be
> provided to
> > > > > the
> > > > > > users
> > > > > > > > >> as
> > > > > > > > >> > output to be validated and put in place by them.
> > > > > > > > >> >
> > > > > > > > >> > The Admin Client is missing methods to pause and stop
> mirroring
> > > > > > (but we
> > > > > > > > >> > have corresponding protocol messages). Is it on purpose?
> Any
> > > > > > specific
> > > > > > > > >> > reasons? They would be important from an automatic
> operator
> > > > > > perspective
> > > > > > > > >> use
> > > > > > > > >> > case.
> > > > > > > > >> > Also a method to provide the LastMirroredOffset from the
> source
> > > > > > cluster
> > > > > > > > >> > could be useful for progress and tracking purposes.
> > > > > > > > >> > Finally, what about a method to get the mirror states? I
> don't
> > > > > > think the
> > > > > > > > >> > describe method provides such information.
> > > > > > > > >> > In general, I think that the Admin Client section needs
> to cover
> > > > > > in more
> > > > > > > > >> > details the new classes definition like
> CreateMirrorOptions,
> > > > > > > > >> > CreateMirrorResult, ... and so on for all the defined new
> > > > > methods.
> > > > > > > > >> >
> > > > > > > > >> > > AddTopicsToMirrorResult addTopicsToMirror(Map<String,
> String>
> > > > > > > > >> > topicToMirrorName, AddTopicsToMirrorOptions options);
> > > > > > > > >> >
> > > > > > > > >> > Isn't it missing the mirrorName (as you have in the
> > > > > > > > >> removeTopicsFromMirror
> > > > > > > > >> > counterpart)?
> > > > > > > > >> > What's the topicToMirrorName parameter if it's defined
> as a Map?
> > > > > > The
> > > > > > > > >> method
> > > > > > > > >> > is also plural using "topics" so comparing to the
> > > > > > removeTopicsFromMirror
> > > > > > > > >> > method, I would assume the parameter really is
> Set<String>
> > > > > topics?
> > > > > > > > >> > Comparing to the corresponding protocol message
> > > > > > > > >> AddTopicsToMirrorRequest, I
> > > > > > > > >> > see a list of topics but each of them has id, name and
> > > > > > corresponding
> > > > > > > > >> > mirror. So it's unclear how the addTopicsToMirror is
> defined.
> > > > > > > > >> >
> > > > > > > > >> > > RemoveTopicsFromMirrorResult
> removeTopicsFromMirror(String
> > > > > > mirrorName,
> > > > > > > > >> > Set<String> topics, RemoveTopicsFromMirrorOptions
> options);
> > > > > > > > >> >
> > > > > > > > >> > This method gets a mirrorName but if I look at the
> corresponding
> > > > > > > > >> protocol
> > > > > > > > >> > message RemoveTopicsFromMirrorRequest, it says "Allows
> users to
> > > > > > detach
> > > > > > > > >> > topics from their associated mirror" so the mirror is
> actually
> > > > > not
> > > > > > > > >> provided
> > > > > > > > >> > and it's exactly what I see in the JSON definition (only
> topics
> > > > > > list
> > > > > > > > >> with
> > > > > > > > >> > id and name).
> > > > > > > > >> >
> > > > > > > > >> > Finally, regarding the protocol change:
> > > > > > > > >> >
> > > > > > > > >> > * ListMirrorsResponse I would add the clusterId in the
> JSON
> > > > > > definition
> > > > > > > > >> > (it's related to my comments in the previous email when
> using
> > > > > the
> > > > > > tool).
> > > > > > > > >> > * WriteMirrorStatesRequest has the following in the JSON
> which
> > > > > > should
> > > > > > > > >> not
> > > > > > > > >> > be part of it "{ "name": "RemovedTopics", "type":
> "[]string",
> > > > > > > > >> "versions":
> > > > > > > > >> > "0+", "about": "The topic names to be removed." }"
> > > > > > > > >> >
> > > > > > > > >> > Thanks,
> > > > > > > > >> > Paolo.
> > > > > > > > >> >
> > > > > > > > >> > On Fri, 6 Mar 2026 at 13:08, Paolo Patierno <
> > > > > > [email protected]>
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> > > Hi Fede,
> > > > > > > > >> > > thank you for the proposal. I had a first pass with
> following
> > > > > > > > >> thoughts and
> > > > > > > > >> > > questions.
> > > > > > > > >> > >
> > > > > > > > >> > > > When the unclean.leader.election.enable is set to
> true, the
> > > > > > broker
> > > > > > > > >> will
> > > > > > > > >> > > log a warning at every configuration synchronization
> period.
> > > > > > > > >> > > Be more explicit about what the warning says.
> > > > > > > > >> > >
> > > > > > > > >> > > > This topic ID is not used by other topics in the
> current
> > > > > > cluster
> > > > > > > > >> > > In such a case, which should be very unlikely, what's
> going to
> > > > > > happen?
> > > > > > > > >> > > Isn't it possible to mirror the topic?
> > > > > > > > >> > >
> > > > > > > > >> > > > To enable it, all cluster nodes (controllers and
> brokers)
> > > > > must
> > > > > > > > >> > > explicitly enable unstable API versions and unstable
> feature
> > > > > > versions
> > > > > > > > >> in
> > > > > > > > >> > > all configuration files. After starting the cluster
> with a
> > > > > > minimum
> > > > > > > > >> metadata
> > > > > > > > >> > > version, operators can dynamically enable the mirror
> version
> > > > > > feature
> > > > > > > > >> to
> > > > > > > > >> > > activate Cluster Mirroring.
> > > > > > > > >> > > AFAIU there is going to be a dedicated feature flag
> for it,
> > > > > > right? If
> > > > > > > > >> yes
> > > > > > > > >> > > can we state it clearly also specifying the exact name
> (i.e.
> > > > > > > > >> mirror.version
> > > > > > > > >> > > or something similar)?
> > > > > > > > >> > >
> > > > > > > > >> > > When running the kafka-mirrors.sh tool to list the
> mirrors,
> > > > > > other than
> > > > > > > > >> > > showing the SOURCE-BOOTSTRAP, it could be useful to
> have also
> > > > > > the
> > > > > > > > >> clusterId
> > > > > > > > >> > > which, as a unique identifier, could be helpful in
> automated
> > > > > > systems
> > > > > > > > >> using
> > > > > > > > >> > > the cluster mirroring. Of course, it would be
> important to
> > > > > have
> > > > > > in the
> > > > > > > > >> > > ListMirrorsResponse as well as an additional field.
> > > > > > > > >> > >
> > > > > > > > >> > > What happens in case of Kafka downgrade from a version
> > > > > > supporting
> > > > > > > > >> > > mirroring to an older one not supporting it.
> > > > > > > > >> > > The mirror won't be running but the topic
> configuration will
> > > > > > still
> > > > > > > > >> have
> > > > > > > > >> > > config parameters like mirror.name and so on, right?
> Are they
> > > > > > just
> > > > > > > > >> > > ignored by the older Kafka version and the cluster
> will work
> > > > > > without
> > > > > > > > >> issues?
> > > > > > > > >> > >
> > > > > > > > >> > > Thanks,
> > > > > > > > >> > > Paolo
> > > > > > > > >> > >
> > > > > > > > >> > > On Fri, 6 Mar 2026 at 10:43, Luke Chen <
> [email protected]>
> > > > > > wrote:
> > > > > > > > >> > >
> > > > > > > > >> > >> Hi Andrew and all,
> > > > > > > > >> > >>
> > > > > > > > >> > >> About AS5, yes, I've created a sub-document
> > > > > > > > >> > >> <
> > > > > > > > >> > >>
> > > > > > > > >>
> > > > > >
> > > > >
> https://cwiki.apache.org/confluence/display/KAFKA/Unclean+Leader+Election+in+Cluster+Mirroring
> > > > > > > > >> > >> >to
> > > > > > > > >> > >> explain the algorithm to support unclean leader
> election in
> > > > > > cluster
> > > > > > > > >> > >> mirroring.
> > > > > > > > >> > >> Thanks for your comments, I'm inspired by that! :)
> > > > > > > > >> > >>
> > > > > > > > >> > >> About your idea, to store the owner of the leader
> epoch when
> > > > > > > > >> leadership
> > > > > > > > >> > >> change, I think it might not be needed because the
> most
> > > > > > important
> > > > > > > > >> thing
> > > > > > > > >> > >> should be this:
> > > > > > > > >> > >> > you might find that both ends have declared a local
> epoch
> > > > > N,
> > > > > > but
> > > > > > > > >> someone
> > > > > > > > >> > >> has to win.
> > > > > > > > >> > >>
> > > > > > > > >> > >> That is, as long as we have a way to declare who is
> the owner
> > > > > > of
> > > > > > > > >> leader
> > > > > > > > >> > >> epoch N, then the 2 clusters can sync up successfully.
> > > > > > > > >> > >> And that's why I proposed to the "last mirrored
> leader epoch"
> > > > > > > > >> semantic in
> > > > > > > > >> > >> the sub-proposal
> > > > > > > > >> > >> <
> > > > > > > > >> > >>
> > > > > > > > >>
> > > > > >
> > > > >
> https://cwiki.apache.org/confluence/display/KAFKA/Unclean+Leader+Election+in+Cluster+Mirroring
> > > > > > > > >> > >> >,
> > > > > > > > >> > >> which is a solution to draw a line between these 2
> clusters
> > > > > to
> > > > > > > > >> declare
> > > > > > > > >> > >> records beyond the "last mirrored leader epoch" N, it
> belongs
> > > > > > to
> > > > > > > > >> who. I
> > > > > > > > >> > >> think this should work well, as long as all replicas
> in the
> > > > > > cluster
> > > > > > > > >> can
> > > > > > > > >> > >> truncate the log correctly.
> > > > > > > > >> > >>
> > > > > > > > >> > >> What do you think?
> > > > > > > > >> > >>
> > > > > > > > >> > >> Any feedback is appreciated.
> > > > > > > > >> > >>
> > > > > > > > >> > >> Thank you,
> > > > > > > > >> > >> Luke
> > > > > > > > >> > >>
> > > > > > > > >> > >> On Fri, Mar 6, 2026 at 6:02 PM Andrew Schofield <
> > > > > > > > >> [email protected]>
> > > > > > > > >> > >> wrote:
> > > > > > > > >> > >>
> > > > > > > > >> > >> > Hi Fede,
> > > > > > > > >> > >> > Thanks for your response.
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > AS1: Thanks for the clarification.
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > AS2: I expect you'll include a version bump of
> > > > > > > > >> AlterShareGroupOffsets in
> > > > > > > > >> > >> > this KIP, but that's a small matter compared with
> the rest
> > > > > > of the
> > > > > > > > >> > >> protocol
> > > > > > > > >> > >> > changes.
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > AS3: OK.
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > AS4: Thanks for the details. My only comment is
> that it
> > > > > > might be a
> > > > > > > > >> bit
> > > > > > > > >> > >> > laborious when you want to failover all topics. I
> suggest
> > > > > > adding
> > > > > > > > >> > >> > `--all-topics` so you could do:
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > $ bin/kafka-mirror.sh --bootstrap-server :9094
> --remove
> > > > > > > > >> --all-topics
> > > > > > > > >> > >> > --mirror my-mirror
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > AS5: Thanks for the response. I understand there
> are good
> > > > > > reasons
> > > > > > > > >> for
> > > > > > > > >> > >> the
> > > > > > > > >> > >> > way epochs are handled in the KIP. I see that there
> is a
> > > > > > > > >> sub-document
> > > > > > > > >> > >> for
> > > > > > > > >> > >> > the KIP about unclean leader election. I'll spend
> some time
> > > > > > > > >> reviewing
> > > > > > > > >> > >> that.
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > Thanks,
> > > > > > > > >> > >> > Andrew
> > > > > > > > >> > >> >
> > > > > > > > >> > >> > On 2026/02/18 13:27:07 Federico Valeri wrote:
> > > > > > > > >> > >> > > Hi Andrew, thanks for the review.
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > Let me try to answer your questions and then other
> > > > > authors
> > > > > > can
> > > > > > > > >> join
> > > > > > > > >> > >> > > the discussion.
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > AS1
> > > > > > > > >> > >> > > ------
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > Destination topics are created with the same
> topic IDs
> > > > > > using the
> > > > > > > > >> > >> > > extended CreateTopics API. Then, data is
> replicated
> > > > > > starting from
> > > > > > > > >> > >> > > offset 0 with byte-for-byte batch copying, so
> destination
> > > > > > offsets
> > > > > > > > >> > >> > > always match source offsets. When failing over,
> we record
> > > > > > the
> > > > > > > > >> last
> > > > > > > > >> > >> > > mirrored offset (LMO) in the destination cluster.
> When
> > > > > > failing
> > > > > > > > >> back,
> > > > > > > > >> > >> > > the LMO is used for truncating and then start
> mirroring
> > > > > the
> > > > > > > > >> delta,
> > > > > > > > >> > >> > > otherwise we start mirroring from scratch by
> truncating
> > > > > to
> > > > > > zero.
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > Retention: If the mirror leader attempts to fetch
> an
> > > > > > offset that
> > > > > > > > >> is
> > > > > > > > >> > >> > > below the current log start offset of the source
> leader
> > > > > > (e.g.
> > > > > > > > >> fetching
> > > > > > > > >> > >> > > offset 50 when log start offset is 100), the
> source
> > > > > broker
> > > > > > > > >> returns an
> > > > > > > > >> > >> > > OffsetOutOfRangeException that the mirror leader
> handles
> > > > > by
> > > > > > > > >> truncating
> > > > > > > > >> > >> > > to the source's current log start offset and
> resuming
> > > > > > fetching
> > > > > > > > >> from
> > > > > > > > >> > >> > > that point. Compaction: The mirror leader
> replicates
> > > > > these
> > > > > > > > >> compacted
> > > > > > > > >> > >> > > log segments exactly as they exist in the source
> cluster,
> > > > > > > > >> maintaining
> > > > > > > > >> > >> > > the same offset assignments and gaps.
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > Do you have any specific corner case in mind?
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > AS2
> > > > > > > > >> > >> > > ------
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > Agreed. The current AlterShareGroupOffsetsRequest
> (v0)
> > > > > only
> > > > > > > > >> includes
> > > > > > > > >> > >> > > PartitionIndex and StartOffset with no epoch
> field. When
> > > > > > > > >> mirroring
> > > > > > > > >> > >> > > share group offsets across clusters, the epoch is
> needed
> > > > > to
> > > > > > > > >> ensure the
> > > > > > > > >> > >> > > offset alteration targets the correct leader
> generation.
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > AS3
> > > > > > > > >> > >> > > ------
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > Right, the enum is now fixed. Yes, we will parse
> from the
> > > > > > right
> > > > > > > > >> and
> > > > > > > > >> > >> > > apply the same naming rules used for topic name ;)
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > AS4
> > > > > > > > >> > >> > > -------
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > Agreed. I'll try to improve those paragraphs
> because they
> > > > > > are
> > > > > > > > >> crucial
> > > > > > > > >> > >> > > from an operational point of view.
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > Let me shortly explain how it is supposed to work:
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > 9091 (source) -----> 9094 (destination)
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > The single operation that allows an operator to
> switch
> > > > > all
> > > > > > > > >> topics at
> > > > > > > > >> > >> > > once in case of disaster is the following:
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > bin/kafka-mirror.sh --bootstrap-server :9094
> --remove
> > > > > > --topic .*
> > > > > > > > >> > >> > > --mirror my-mirror
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > 9091 (source) --x--> 9094 (destination)
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > After that, all mirror topics become detached
> from the
> > > > > > source
> > > > > > > > >> cluster
> > > > > > > > >> > >> > > and start accepting writes (the two cluster are
> allowed
> > > > > to
> > > > > > > > >> diverge).
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > When the source cluster is back, the operator can
> > > > > failback
> > > > > > by
> > > > > > > > >> creating
> > > > > > > > >> > >> > > a mirror with the same name on the source cluster
> (new
> > > > > > > > >> destination):
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > echo "bootstrap.servers=localhost:9094" >
> > > > > > > > >> /tmp/my-mirror.properties
> > > > > > > > >> > >> > > bin/kafka-mirrors.sh --bootstrap-server :9091
> --create
> > > > > > --mirror
> > > > > > > > >> > >> > > my-mirror --mirror-config
> /tmp/my-mirror.properties
> > > > > > > > >> > >> > > bin/kafka-mirrors.sh --bootstrap-server :"9091
> --add
> > > > > > --topic .*
> > > > > > > > >> > >> > > --mirror my-mirror
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > 9091 (destination) <----- 9094 (source)
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > AS5
> > > > > > > > >> > >> > > -------
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > This is the core of our design and we reached that
> > > > > > empirically by
> > > > > > > > >> > >> > > trying out different options. We didn't want to
> change
> > > > > > local
> > > > > > > > >> > >> > > replication, and this is something you need to do
> when
> > > > > > > > >> preserving the
> > > > > > > > >> > >> > > source leader epoch. The current design is simple
> and
> > > > > > keeps the
> > > > > > > > >> epoch
> > > > > > > > >> > >> > > domains entirely separate. Destination cluster is
> in
> > > > > > charge of
> > > > > > > > >> the
> > > > > > > > >> > >> > > leader epoch for its own log. The source epoch is
> only
> > > > > used
> > > > > > > > >> during the
> > > > > > > > >> > >> > > fetch protocol to validate responses and detect
> > > > > divergence.
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > The polarity idea of tracking whether an epoch
> bump
> > > > > > originated
> > > > > > > > >> from
> > > > > > > > >> > >> > > replication vs. local leadership change is
> interesting,
> > > > > > but adds
> > > > > > > > >> > >> > > significant complexity and coupling between
> source and
> > > > > > > > >> destination
> > > > > > > > >> > >> > > epochs. Could you clarify what specific scenario
> polarity
> > > > > > > > >> tracking
> > > > > > > > >> > >> > > would address that the current separation doesn't
> handle?
> > > > > > One
> > > > > > > > >> case we
> > > > > > > > >> > >> > > don't support is unclean leader election
> reconciliation
> > > > > > across
> > > > > > > > >> > >> > > clusters, is that the gap you're aiming at?
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > I tried to rewrite the unclean leader election
> paragraph
> > > > > > in the
> > > > > > > > >> > >> > > rejected alternatives to be easier to digest. Let
> me know
> > > > > > if it
> > > > > > > > >> works.
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> > > On Tue, Feb 17, 2026 at 2:57 PM Andrew Schofield
> > > > > > > > >> > >> > > <[email protected]> wrote:
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > Hi Fede and friends,
> > > > > > > > >> > >> > > > Thanks for the KIP.
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > It’s a comprehensive design, easy to read and
> has
> > > > > clearly
> > > > > > > > >> taken a
> > > > > > > > >> > >> lot
> > > > > > > > >> > >> > of work.
> > > > > > > > >> > >> > > > The principle of integrating mirroring into the
> brokers
> > > > > > makes
> > > > > > > > >> total
> > > > > > > > >> > >> > sense to me.
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > The main comment I have is that mirroring like
> this
> > > > > > cannot
> > > > > > > > >> handle
> > > > > > > > >> > >> > situations
> > > > > > > > >> > >> > > > in which multiple topic-partitions are logically
> > > > > > related, such
> > > > > > > > >> as
> > > > > > > > >> > >> > transactions,
> > > > > > > > >> > >> > > > with total fidelity. Each topic-partition is
> being
> > > > > > replicated
> > > > > > > > >> as a
> > > > > > > > >> > >> > separate entity.
> > > > > > > > >> > >> > > > The KIP calls this out and describes the
> behaviour
> > > > > > thoroughly.
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > A few initial comments.
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > AS1) Is it true that offsets are always
> preserved by
> > > > > > this KIP?
> > > > > > > > >> I
> > > > > > > > >> > >> > *think* so but
> > > > > > > > >> > >> > > > not totally sure that it’s true in all cases.
> It would
> > > > > > > > >> certainly be
> > > > > > > > >> > >> > nice.
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > AS2) I think you need to add epoch information
> to
> > > > > > > > >> > >> > AlterShareGroupOffsetsRequest.
> > > > > > > > >> > >> > > > It really should already be there in hindsight,
> but I
> > > > > > think
> > > > > > > > >> this KIP
> > > > > > > > >> > >> > requires it.
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > AS3) The CoordinatorType enum for MIRROR will
> need to
> > > > > be
> > > > > > 3
> > > > > > > > >> because 2
> > > > > > > > >> > >> > is SHARE.
> > > > > > > > >> > >> > > > I’m sure you’ll parse the keys from the right ;)
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > AS4) The procedure for achieving a failover
> could be
> > > > > > clearer.
> > > > > > > > >> Let’s
> > > > > > > > >> > >> > say that I am
> > > > > > > > >> > >> > > > using cluster mirroring to achieve DR
> replication. My
> > > > > > source
> > > > > > > > >> cluster
> > > > > > > > >> > >> > is utterly lost
> > > > > > > > >> > >> > > > due to a disaster. What’s the single operation
> that I
> > > > > > perform
> > > > > > > > >> to
> > > > > > > > >> > >> > switch all of the
> > > > > > > > >> > >> > > > topics mirrored from the lost source cluster to
> become
> > > > > > the
> > > > > > > > >> active
> > > > > > > > >> > >> > topics?
> > > > > > > > >> > >> > > > Similarly for failback.
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > AS5) The only piece that I’m really unsure of
> is the
> > > > > > epoch
> > > > > > > > >> > >> management.
> > > > > > > > >> > >> > I would
> > > > > > > > >> > >> > > > have thought that the cluster which currently
> has the
> > > > > > writable
> > > > > > > > >> > >> > topic-partition
> > > > > > > > >> > >> > > > would be in charge of the leader epoch and it
> would not
> > > > > > be
> > > > > > > > >> > >> necessary to
> > > > > > > > >> > >> > > > perform all of the gymnastics described in the
> section
> > > > > > on epoch
> > > > > > > > >> > >> > rewriting.
> > > > > > > > >> > >> > > > I have read the Rejected Alternatives section
> too, but
> > > > > I
> > > > > > don’t
> > > > > > > > >> fully
> > > > > > > > >> > >> > grasp
> > > > > > > > >> > >> > > > why it was necessary to reject it.
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > I wonder if we could store the “polarity” of an
> epoch,
> > > > > > > > >> essentially
> > > > > > > > >> > >> > whether the
> > > > > > > > >> > >> > > > epoch bump was observed by replication from a
> source
> > > > > > cluster,
> > > > > > > > >> or
> > > > > > > > >> > >> > whether
> > > > > > > > >> > >> > > > it was bumped by a local leadership change when
> the
> > > > > > topic is
> > > > > > > > >> locally
> > > > > > > > >> > >> > writable.
> > > > > > > > >> > >> > > > When a topic-partition switches from read-only
> to
> > > > > > writable, we
> > > > > > > > >> > >> should
> > > > > > > > >> > >> > definitely
> > > > > > > > >> > >> > > > bump the epoch, and we could record the fact
> that it
> > > > > was
> > > > > > a
> > > > > > > > >> local
> > > > > > > > >> > >> epoch.
> > > > > > > > >> > >> > > > When connectivity is re-established, you might
> find
> > > > > that
> > > > > > both
> > > > > > > > >> ends
> > > > > > > > >> > >> have
> > > > > > > > >> > >> > > > declared a local epoch N, but someone has to
> win.
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > Thanks,
> > > > > > > > >> > >> > > > Andrew
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > > > > On 14 Feb 2026, at 07:17, Federico Valeri <
> > > > > > > > >> [email protected]>
> > > > > > > > >> > >> > wrote:
> > > > > > > > >> > >> > > > >
> > > > > > > > >> > >> > > > > Hi, we would like to start a discussion
> thread about
> > > > > > > > >> KIP-1279:
> > > > > > > > >> > >> > Cluster
> > > > > > > > >> > >> > > > > Mirroring.
> > > > > > > > >> > >> > > > >
> > > > > > > > >> > >> > > > > Cluster Mirroring is a new Kafka feature that
> enables
> > > > > > native,
> > > > > > > > >> > >> > > > > broker-level topic replication across
> clusters.
> > > > > Unlike
> > > > > > > > >> > >> MirrorMaker 2
> > > > > > > > >> > >> > > > > (which runs as an external Connect-based
> tool),
> > > > > Cluster
> > > > > > > > >> Mirroring
> > > > > > > > >> > >> is
> > > > > > > > >> > >> > > > > built into the broker itself, allowing tighter
> > > > > > integration
> > > > > > > > >> with
> > > > > > > > >> > >> the
> > > > > > > > >> > >> > > > > controller, coordinator, and partition
> lifecycle.
> > > > > > > > >> > >> > > > >
> > > > > > > > >> > >> > > > >
> > > > > > > > >> > >> >
> > > > > > > > >> > >>
> > > > > > > > >>
> > > > > >
> > > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1279%3A+Cluster+Mirroring
> > > > > > > > >> > >> > > > >
> > > > > > > > >> > >> > > > > There are a few missing bits, but most of the
> design
> > > > > is
> > > > > > > > >> there, so
> > > > > > > > >> > >> we
> > > > > > > > >> > >> > > > > think it is the right time to involve the
> community
> > > > > > and get
> > > > > > > > >> > >> feedback.
> > > > > > > > >> > >> > > > > Please help validating our approach.
> > > > > > > > >> > >> > > > >
> > > > > > > > >> > >> > > > > Thanks
> > > > > > > > >> > >> > > > > Fede
> > > > > > > > >> > >> > > >
> > > > > > > > >> > >> > >
> > > > > > > > >> > >> >
> > > > > > > > >> > >>
> > > > > > > > >> > >
> > > > > > > > >> > >
> > > > > > > > >> > > --
> > > > > > > > >> > > Paolo Patierno
> > > > > > > > >> > >
> > > > > > > > >> > > *Senior Principal Software Engineer @ IBM**CNCF
> Ambassador*
> > > > > > > > >> > >
> > > > > > > > >> > > Twitter : @ppatierno <http://twitter.com/ppatierno>
> > > > > > > > >> > > Linkedin : paolopatierno <
> > > > > > http://it.linkedin.com/in/paolopatierno>
> > > > > > > > >> > > GitHub : ppatierno <https://github.com/ppatierno>
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > --
> > > > > > > > >> > Paolo Patierno
> > > > > > > > >> >
> > > > > > > > >> > *Senior Principal Software Engineer @ IBM**CNCF
> Ambassador*
> > > > > > > > >> >
> > > > > > > > >> > Twitter : @ppatierno <http://twitter.com/ppatierno>
> > > > > > > > >> > Linkedin : paolopatierno <
> > > > > http://it.linkedin.com/in/paolopatierno
> > > > > > >
> > > > > > > > >> > GitHub : ppatierno <https://github.com/ppatierno>
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > >
> > > > > >
> > > > >
> > >
> 

Reply via email to