Hi Fede, Thanks for the updates. AS25: It's all good in the current version.
AS26: Yes, it's definitely theoretical and I didn't mean for the KIP to change. I suppose that the CreateTopic with a specific topic ID would fail because the topic ID is already in use. That seems entirely appropriate behaviour. AS27: start/stop/pause/resume is definitely nicer. Thanks, Andrew On 2026/04/15 08:31:19 Federico Valeri wrote: > Hi Andrew, > > AS8: I'm working on this and will update the KIP soon. > > AS24: Still thinking about this. We will update in case we find a better > design. We are also considering your suggestions. > > AS25: Where do you see that in the KIP? Maybe it was in a previous > revision. Anyway, we decided to remove the --replication-factor flag to > always honor the destination cluster default. The operator can always use > the assignment script to change it. > > AS26: That's an interesting theoretical observation. Sharing the same topic > ID between source and mirror is important for safety, as it ensures the > mirror is replicating data from the correct source topic, not from a > different topic that happens to share the same name. Any future > multi-tenancy solution would need to handle this transparently in its own > layer. For example, in addition to rewriting the topic name (namespacing), > a proxy would also need to intercept DescribeTopicPartitions (and likely > CreateTopics with MirrorInfo) to remap or namespace topic IDs transparently. > > AS27: Yes, add/remove along with pause/resume is confusing. We changed > AddTopicsToMirror/RemoveTopicsFromMirror to > StartMirrorTopics/StopMirrorTopics. We will still keep the same semantics > though. Stop can be used for failover or migration use cases, and pause for > "pausing mirroring without making the topic writable" (maintenance). > > Thanks > Fede > > > > On Fri, Apr 3, 2026 at 4:59 PM Andrew Schofield <[email protected]> > wrote: > > > > Hi Fede and friends, > > Thanks for the responses to my excessive comments. I like the direction > it's heading in. > > > > AS6, AS7: Thanks for adding the tables of permissions. Making > ClusterMirror an ACL resource seems like a good improvement to me. A few > detailed clarifications: > > > > DescribeConfigs RPC: DESCRIBE_CONFIGS on TOPIC, not DESCRIBE. > > CreatePartitions RPC: ALTER on TOPIC. > > IncrementalAlterConfigs RPC: ALTER_CONFIGS on CLUSTER_MIRROR on mirrors, > and ALTER_CONFIGS on TOPIC for topics. > > OffsetCommit RPC: READ on GROUP and READ on TOPIC. > > CreateAcls RPC: ALTER on CLUSTER. > > DeleteAcls RPC: ALTER on CLUSTER. > > > > For operations which act on two kinds of resources, such as doing topic > things to groups, we generally need permission on both resources. I suggest: > > > > AddTopicsToMirror RPC: Maybe add READ on TOPIC. > > RemovesTopicsFromMirror RPC: Maybe add READ on TOPIC. > > > > AS8: I think that including AuthorizedOperations in the > DescribeMirrorsResponse will work nicely now. > > > > AS9: Thanks for the table of error codes. Looks comprehensive, but you'll > need MIRROR_AUTHORIZATION_FAILED too I think. > > > > AS10-15: Thanks. Looks good. > > > > AS16: Thanks. But I now have AS24 below. Sorry. > > > > AS17: Thanks. Looks good. > > > > AS18: The epoch information has been reworked in the latest version. > Looks good to me. > > > > AS19-21: Thanks. Looks good. > > > > AS22-23: Also, see AS24 below. > > > > And here are a few new comments. > > > > AS24: It seems to me that the list of topics being mirrored is really a > property of the mirror resource. Having `mirror.name` as a topic config, > and then overloading it with various state suffixes seems a bit inelegant. > > > > I suggest: > > * The mirror name follows the same rules as topic name (which it cannot > quite do as the KIP is written because of .deleted and so on). > > * The list of topics are a property of the mirror. Adding and removing > topics mutates the mirror resource. > > * The mirror name is no longer a topic config. Then, you do not need > special handling to hide it if the user is not authorized to describe the > cluster mirror, and you don't need to fiddle with the names as the topic's > mirroring state changes. Since the state changes are mediated by the > cluster mirroring components, keeping control of the state in the mirror > resource seems workable. > > * The mirror state for the topics in the mirror is also handled as > properties or metadata of the mirror resource. > > * You probably would need DESCRIBE on TOPIC to see that a topic was being > mirrored, as well as DESCRIBE on MIRROR. This matches resources such as > consumer groups where you can only see the committed offsets for the topics > you can describe. > > > > I know this is largely a matter of opinion, so feel free to reject my > suggestion. > > > > AS25: When cluster mirroring creates a topic on the destination, I wonder > why it does not inherit the replication factor of the source topic by > default. I can understand why you might want source and mirror topics to > have different replication factors, but I think the default is currently > the default replication factor for the destination cluster, as opposed to > the replication factor of the source topic. > > > > AS26: The source topic and mirror topic use the same topic ID. I like the > simplicity of this, but there's a theoretical implication which I thought I > would raise. Although Apache Kafka itself does not support multi-tenancy > yet, people have built multi-tenancy on top using proxies and techniques > such as adding and removing topic name prefixes transparently to the > clients. It seems to me that multi-tenancy in Apache Kafka is a gap waiting > to be filled and before long a suitable KIP will be brought forward. With > such techniques, if someone tried to use cluster mirroring where the source > and destination "clusters" were actually virtual clusters on the same Kafka > cluster, the attempt to create the mirror topic with the same topic ID > would fail. I'm sure this is just a theoretical concern for AK because the > project itself doesn't have multi-tenancy, yet, but I wondered what the > authors think about this. > > > > AS27: This is definitely a matter of personal taste and it probably > indicates that my mental model of the KIP differs from the authors'. It > seems to me that `kafka-mirrors.sh --remove` should remove a topic from a > mirror because mirroring it is no longer required. However, this is > actually the command for initiating failover. Should there be a separate > `kafka-mirrors.sh --failover`? > > > > Thanks, > > Andrew > > > > On 2026/04/03 09:35:47 Federico Valeri wrote: > > > Hello everyone, > > > > > > Thank you all for the thoughtful questions and suggestions, and thanks > > > to Michael Maison for proposing a better approach to the chained > > > mirroring problem. Some of these required careful consideration and > > > led us to refine the design. > > > > > > For reviewers, I recommend focusing on the paragraphs with significant > changes: > > > > > > 1. MirrorFetcherThread (updated) > > > 2. Security Control (updated) > > > 3. Idempotent Producer (updated) > > > 4. Command Workflows (new) > > > 5. Error Names (new) > > > > > > Below are answers to the outstanding questions. > > > > > > AS6, AS7: I added a table to "Security Control" illustrating > > > authorization requirements for each component. Let me know if it > > > covers what you had in mind. > > > > > > AS8: I removed that field for now. It is on my backlog and I will post > > > an update soon. > > > > > > AS9, AS17: There is a new "Errors" paragraph with a table listing new > > > and reused error codes, along with the RPCs that use each one. > > > > > > AS18: Could you clarify which LeaderEpoch checks you are referring to? > > > Here is our understanding: > > > > > > 1. WriteMirrorStatesRequest: The coordinator validates that the > > > current leader epoch matches the request leader epoch, rejecting > > > outdated requests. > > > 2. ReadMirrorStatesResponse: The coordinator includes the current > > > leader epoch in the response, and the leader node validates it against > > > its own current leader epoch. > > > 3. LastMirroredOffsetsResponse: We replaced this with > > > LastMirroredEpochsResponse, so we believe this no longer applies. > > > > > > AS22: Description updated to: "This property is filtered out from > > > DescribeConfigs responses to avoid exposing internal state to users." > > > > > > --- > > > > > > RS1: I added the DeleteMirror RPC and updated all relevant sections. > > > > > > RS5: As you suggested, I introduced a new ResourceType called > > > ClusterMirror. See "Security Control." > > > > > > RS6: The refactoring suggestions make sense, but we need more time to > > > think them through. > > > > > > RS9: Until we support tiered storage, we will mirror all records > > > including data in remote storage into the destination cluster. > > > > > > RS10: There is a future improvement briefly described in "Bandwidth > > > Control." Let me know if you need more details. > > > > > > --- > > > > > > JR3: I created a new "Command Workflows" section that covers the > > > workflow for each mirror operation, including failover and failback. > > > Let me know if you spot any issues. > > > > > > I hope I have addressed everything, but please let me know if I missed > > > any question. > > > > > > Cheers > > > Fede > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Mar 20, 2026 at 5:12 AM Luke Chen <[email protected]> wrote: > > > > > > > > Hi Jun, > > > > > > > > Thanks for the review comments. > > > > Answer them below. > > > > > > > > > JR1. "Epoch rewriting: When records are appended to the destination > log, > > > > the batch epochs are rewritten to match the destination cluster's > leader > > > > epochs, maintaining consistency within the destination cluster." This > has a > > > > couple of impacts. > > > > > > > > We have an updated design to support unclean leader election in this > doc, > > > > where we will NOT rewrite the leader epoch anymore. > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Unclean+Leader+Election+in+Cluster+Mirroring > > > > > > > > > > > > > JR2. "Tiered Storage is not initially supported": Ideally, we should > > > > support tiered storage. Same as RS9, the destination cluster issues > > > > consumer requests, which support tiering. > > > > The OffsetMovedToTieredStorageException is used by replication > > > > inside AbstractFetcherThread. This suggests that it's probably not a > good > > > > fit for cluster mirroring to use AbstractFetcherThread. > > > > > > > > Yes, thanks for pointing this out. I was wrong about this. Before we > > > > support tiered storage, the mirroring will mirror all records > including > > > > data in the remote storage into the destination cluster. > > > > > > > > > JR3. For each new cluster mirroring command, it would be useful to > > > > document > > > > the underlying workflow (e.g, which RPC requests are issued, to which > node; > > > > what records are written to metadata topic, or internal topic, which > > > > actions are triggered on the broker, etc). > > > > > > > > Will do. > > > > > > > > > JR4. Truncating a log to LMO. Currently, there is no internal API > that > > > > truncates a partition from the end. Could you describe how this will > be > > > > implemented to ensure all replicas are consistent after the > truncation? > > > > > > > > The truncation flow is like this: > > > > 1. When the MirrorMetadataManager in the node gets notified about the > > > > partition leader assignment when onMetadataUpdate (via TopicsDelta), > it'll > > > > query the mirror coordinator about mirror partition state. > > > > 2. When it's the PREPARING state, the MirrorMetadataManager in the > leader > > > > node will get the last mirrored offset (or epoch) from the source > cluster > > > > (new API) and then do the log truncate. > > > > 3. In (2), we'll also register a callback in Partition instance, and > wait > > > > until all ISRs complete the truncation by checking the follower > replica's > > > > LEO. > > > > 4. In (3), this check will be invoked every time the leader node > update > > > > follower fetch state, like how we check if high watermark should be > > > > incremented. > > > > 5. After all ISRs complete the truncation, we'll invoke the callback > and > > > > move the mirror partition state to MIRRORING, and then start fetching > data > > > > from the source cluster. > > > > > > > > Note: > > > > (1) In PREPARING state, the partition is READ-ONLY, so there will no > any > > > > data written in the leader node > > > > (2) During step (1) ~ (4), if any leadership change happens, the new > leader > > > > will start from step (1) to complete the log truncation process. > > > > (3) If unclean leader election is supported > > > > (i.e. mirror.support.unclean.leader.election=true), then we'll wait > until > > > > ALL registered replicas complete the truncation before moving on to > > > > MIRRORING state. > > > > > > > > We'll update the KIP in the following days to address the community > > > > feedback. Some questions need more thinking. > > > > Please give us some time. :) > > > > > > > > Thank you, > > > > Luke > > > > > > > > > > > > On Fri, Mar 20, 2026 at 9:10 AM Jun Rao via dev <[email protected]> > > > > wrote: > > > > > > > > > Hi, Federico, > > > > > > > > > > Thanks for the KIP. A few comments. > > > > > > > > > > JR1. "Epoch rewriting: When records are appended to the destination > log, > > > > > the batch epochs are rewritten to match the destination cluster's > leader > > > > > epochs, maintaining consistency within the destination cluster." > This has a > > > > > couple of impacts. > > > > > JR1.1 How do we ensure that the leader epoch in committed offsets is > > > > > consistent with the leader epoch in the batch? This consistency is > > > > > important when the consumer fails over to a different cluster. It > seems the > > > > > KIP doesn't translate the leader epoch when mirroring the comitted > offsets. > > > > > JR1.2 Typically, leader epochs increase monotonically in the log. > Do we > > > > > ensure this remains the case after failover and failback? > > > > > > > > > > JR2. "Tiered Storage is not initially supported": Ideally, we should > > > > > support tiered storage. Same as RS9, the destination cluster issues > > > > > consumer requests, which support tiering. > > > > > The OffsetMovedToTieredStorageException is used by replication > > > > > inside AbstractFetcherThread. This suggests that it's probably not > a good > > > > > fit for cluster mirroring to use AbstractFetcherThread. > > > > > > > > > > JR3. For each new cluster mirroring command, it would be useful to > document > > > > > the underlying workflow (e.g, which RPC requests are issued, to > which node; > > > > > what records are written to metadata topic, or internal topic, which > > > > > actions are triggered on the broker, etc). > > > > > > > > > > JR4. Truncating a log to LMO. Currently, there is no internal API > that > > > > > truncates a partition from the end. Could you describe how this > will be > > > > > implemented to ensure all replicas are consistent after the > truncation? > > > > > > > > > > Jun > > > > > > > > > > On Mon, Mar 16, 2026 at 2:44 AM Federico Valeri < > [email protected]> > > > > > wrote: > > > > > > > > > > > I mentioned a corner case in the chained mirroring use case. Let > me > > > > > > clarify what I mean with a simple example: > > > > > > > > > > > > 1. B is fetching from A, and C is fetching from B (A --> B --> C) > > > > > > 2. A producer with PID 5 sends records to A > > > > > > 3. Failover happens and B becomes writable (A -x-> B --> C) > > > > > > 4. A different producer with PID 5 sends records to B > > > > > > 5. Collision on cluster C (two different producers mapped to PID > -7 in C) > > > > > > > > > > > > (arrows represent data flowing, not fetch direction) > > > > > > > > > > > > > > > > > > On Sun, Mar 15, 2026 at 7:14 PM Federico Valeri < > [email protected]> > > > > > > wrote: > > > > > > > > > > > > > > Hi Rajini, thanks for your thoughtful review and for catching a > few > > > > > > > bugs. I'll skip some questions that we will address later. > > > > > > > > > > > > > > RS2: The metadata records are described in "Mirror Metadata > Records" > > > > > > > paragraph. Currently there are only two records: > "LastMirroredOffsets" > > > > > > > record tracks the latest successfully mirrored offset for each > > > > > > > partition, while "MirrorPartitionState" record represents the > > > > > > > lifecycle states of a mirrored partition. > > > > > > > > > > > > > > RS3: That's a good point that was also raised by Andrew. It was > an > > > > > > > easy solution that we used for our prototype, but we need to > think > > > > > > > about a better solution. > > > > > > > > > > > > > > RS4: Current design is that mirror fetcher threads behaves like > a > > > > > > > read_committed consumer fetching up to "source LSO". On > failover we > > > > > > > truncate destination log to "local LSO". > > > > > > > > > > > > > > The approach of fetching up to HW that you propose is still > safe as we > > > > > > > keep truncating to local LSO on failover, but it trades lower > > > > > > > steady-state lag (especially when long-running transactions > exist on > > > > > > > the source) for more data loss on failover (the net data loss > relative > > > > > > > to the source is the same in both approaches). In other words, > with > > > > > > > your approach we fetch more data that we may then need to > truncate. > > > > > > > Also, read_uncommited consumers on the destination cluster > would be > > > > > > > able to read records that may be truncated on failover. These > are just > > > > > > > my consideration, but we are open to discussion on which is the > best > > > > > > > approach here. > > > > > > > > > > > > > > When a failover is triggered (RemoveTopicsFromMirror), the > sequence is: > > > > > > > > > > > > > > 1. Partitions transition to STOPPING state > > > > > > > 2. Fetchers are removed > > > > > > > 3. For each partition, truncate to local LSO is called > > > > > > > 3.1. Reads LSO from each partition's local log > > > > > > > 3.2. Calls log.truncateTo(offset) on the UnifiedLog > > > > > > > 3.3. Ensures ISR members complete truncation before the > partition > > > > > > > becomes writable > > > > > > > 4. For each partition, the LSO is recorded as the last mirrored > offset > > > > > > > (LMO) in __mirror_state > > > > > > > 5. Partitions transition to STOPPED and become writable > > > > > > > > > > > > > > When a failback is triggered (AddTopicsToMirror), the sequence > is: > > > > > > > > > > > > > > 1. Partitions transition to PREPARING state > > > > > > > 2. For each partition, truncation to LMO is called > > > > > > > 2.1. This sends a LastMirroredOffsetsRequest to the source > cluster > > > > > > > to fetch the offsets that were recorded during the previous > failover > > > > > > > 2.2.a. The response offsets are used to truncate local logs > > > > > > > 2.2.b. If the source cluster doesn't support the > LastMirroredOffsets > > > > > > > API or first-time mirror, it truncates to offset 0 > > > > > > > 3. Partitions transition to MIRRORING > > > > > > > > > > > > > > RS7: Can you point me to the section that says configs are > stored in > > > > > > > __mirror_state? Mirror connection configs (bootstrap servers, > > > > > > > credentials, etc.) are stored in KRaft metadata via > > > > > > > ConfigResource.Type.MIRROR, not in __mirror_state. The internal > topic > > > > > > > only stores partition states and last mirrored offsets. > Sensitive > > > > > > > credentials follow the existing KRaft config handling, which is > > > > > > > already protected by controller/broker access controls and > sensitive > > > > > > > config redaction in DescribeConfigs responses. > > > > > > > > > > > > > > RS8: Not sure what's the recommended approach here. Adding a > new error > > > > > > > code does not change the response schema and older clients that > don't > > > > > > > recognize the new error code will surface it as an > > > > > > > UnknownServerException (non-retriable). > > > > > > > > > > > > > > RS11: Good catch. This is a prototype simplification that we > need to > > > > > > > address. To properly sync consumer group offsets, the > implementation > > > > > > > would need to send ListGroups to all source brokers (or use the > > > > > > > AdminClient which does this internally), send FindCoordinator to > > > > > > > discover the group coordinator for each group, send OffsetFetch > to the > > > > > > > correct group coordinator. > > > > > > > > > > > > > > RS12: You are absolutely right, the transformation is not > idempotent, > > > > > > > so it is not safe for chained mirroring (A -> B -> C). Instead, > > > > > > > round-trip mirroring (A -> B, then B -> A) works because, when > doing a > > > > > > > failback, the log is truncated before mirroring resumes, so > previously > > > > > > > mirrored records with negative pids are removed and the > transformation > > > > > > > is only applied to new records produced natively on that cluster > > > > > > > (double-transformation never occurs). Non-transactional batches > stay > > > > > > > at -1 [ -(-1 + 2) = -(1) = -1], which is correct. > > > > > > > > > > > > > > The chained mirroring would work if we skip the transformation > when > > > > > > > pid is negative, but there is still an edge case: A -> B -> C > with > > > > > > > local B producer. If cluster A has local pid 5 and cluster B > also has > > > > > > > local pid 5, both end up as -7 on cluster C. Collision: two > different > > > > > > > producers with the same pid on the destination. No pid-only > > > > > > > transformation can solve that. We would need to incorporate > cluster > > > > > > > identity. > > > > > > > > > > > > > > Possible solution that would handle any topology: The producer > IDs are > > > > > > > 64-bit signed longs used to identify a producer. The clusterId > (UUID) > > > > > > > is a globally 128-bit unique identifier for each source > cluster. We > > > > > > > could use the clusterId hash to partition the entire negative > PID > > > > > > > space into regions, one per source cluster. Basically we divide > the 64 > > > > > > > bits into three fields: bit 63 (sign bit), bits 62-31 (region > > > > > > > selector), bits 30-0 (producer identity). Once a non-negative > PID is > > > > > > > mapped to a region, it passes through unchanged no matter how > many > > > > > > > hops follow (i.e. we apply the transformation only for PIDs >= > 0). > > > > > > > > > > > > > > Example with two clusters: > > > > > > > > > > > > > > - Bit 63: This is the sign bit that makes the value negative and > > > > > > > distinguishes mirrored pids from local ones (which are > non-negative). > > > > > > > - Bits 62-31 cluster A: clusterId = "abc-123", clusterHash = 42 > > > > > > > - Bits 62-31 cluster B: clusterId = "xyz-789", clusterHash = 99 > > > > > > > - Bits 30-0: Local producer ID 5 that is the same on both > clusters. > > > > > > > > > > > > > > A's pid 5 --> > > > > > > > > 1|00000000000000000000000000101010|0000000000000000000000000000101 > > > > > > > B's pid 5 --> > > > > > > > > 1|00000000000000000000000001100011|0000000000000000000000000000101 > > > > > > > > > > > > > > On Wed, Mar 11, 2026 at 1:23 PM Rajini Sivaram < > > > > > [email protected]> > > > > > > wrote: > > > > > > > > > > > > > > > > A few more questions about the KIP for clarification: > > > > > > > > > > > > > > > > RS8: The KIP says produce requests to mirror topics will throw > > > > > > > > ReadOnlyTopicException. For Produce Requests returning a new > error to > > > > > > > > clients, don’t we need to bump Produce request version? > > > > > > > > > > > > > > > > RS9: The KIP says we use OffsetMovedToTieredStorageException > to > > > > > prevent > > > > > > > > mirroring of data in tiered storage. But doesn’t the mirror > client > > > > > look > > > > > > > > like a regular consumer to the source cluster and return > records > > > > > > fetched > > > > > > > > from tiered storage? > > > > > > > > > > > > > > > > RS10: Client-id based quotas for the source cluster look hard > to > > > > > manage > > > > > > > > since there is no hierarchy or grouping possible. Seems > better to > > > > > rely > > > > > > on > > > > > > > > secure user-principal based quotas on the source-side. > > > > > > > > > > > > > > > > RS11: The KIP says `The manager maintains a connection pool > with one > > > > > > > > blocking sender per source cluster`. If this is the > connection used > > > > > for > > > > > > > > periodic sync of offsets, topic configs etc. the coordinator > is > > > > > likely > > > > > > to > > > > > > > > need connections to all source brokers (i.e. all group > coordinators). > > > > > > > > > > > > > > > > RS12: The KIP proposes to transform producer ids for mirror > records > > > > > to > > > > > > > > avoid conflicts. This comes at a cost because CRC checksum > needs to > > > > > be > > > > > > > > recomputed. To justify this cost, we need to ensure that this > > > > > > > > transformation works in all cases. What happens if you are > mirroring > > > > > a > > > > > > > > mirror topic? Is that a supported scenario? Or mirroring back > > > > > mirrored > > > > > > data > > > > > > > > during failback because the source was truncated? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Rajini > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 10, 2026 at 8:19 PM Rajini Sivaram < > > > > > > [email protected]> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi team, > > > > > > > > > > > > > > > > > > Thanks for the KIP! I have a few questions, mostly > clarification at > > > > > > this > > > > > > > > > point. > > > > > > > > > > > > > > > > > > RS1: There is a `CreateMirror` request but no corresponding > > > > > > `DeleteMirror` > > > > > > > > > request. Is that intentional? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > RS2: It will be good to define the format of data going > into the > > > > > > internal > > > > > > > > > mirror state topic. There is an example under > kafka-dump-logs, > > > > > which > > > > > > > > > shows partition-level state in the payload and the mirror > name as > > > > > > key. I > > > > > > > > > guess that is not what we expect it to be. Do we delete this > > > > > > information > > > > > > > > > when a topic is deleted or a mirror is deleted? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > RS3: KIP currently says mirror name cannot end with > .removed. I > > > > > > guess it > > > > > > > > > cannot also end with .paused. Have we considered storing > state and > > > > > > > > > mirror name separately, but updated together for a topic? > Since new > > > > > > > > > states may be added in future, name restrictions may become > hard to > > > > > > > > > implement. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > RS4: The KIP says *“mirroring must fetch only up to the LSO > to > > > > > > maintain > > > > > > > > > transactional consistency”* and it also says *“During the > mirror > > > > > > stopping > > > > > > > > > transition, the MirrorCoordinator performs a log truncation > > > > > > operation that > > > > > > > > > resets each mirror partition to its LSO.”* I guess the > plan is to > > > > > > fetch > > > > > > > > > up to high watermark and truncate to locally computed LSO on > > > > > > failover? > > > > > > > > > Details of the sequence here will be useful. How does > > > > > > MirrorCoordinator > > > > > > > > > perform truncation? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > RS5: The KIP says “*On the destination cluster, > mirror-related > > > > > > operations > > > > > > > > > (creating mirrors, adding/removing topics from mirrors, > managing > > > > > > mirror > > > > > > > > > configurations) require the CLUSTER_ACTION permission on the > > > > > cluster > > > > > > > > > resource.*” The `Cluster:ClusterAction` ACL is currently > used for > > > > > > broker > > > > > > > > > service account, e.g. local replication is authorized using > this. > > > > > It > > > > > > seems > > > > > > > > > odd to grant this permission to users managing a resource > on the > > > > > > cluster. > > > > > > > > > Have we considered adding a new resource type > `ClusterMirror` and > > > > > > define > > > > > > > > > ACLs like `ClusterMirror:Create`, `ClusterMirror:Alter` and > ` > > > > > > > > > ClusterMirror:AlterConfigs`? > > > > > > > > > > > > > > > > > > RS6: The KIP talks about three entities: Cluster Mirror, > Mirror > > > > > > Topic and Mirror > > > > > > > > > Partition, with Cluster Mirroring as the feature name. > Since we > > > > > > already > > > > > > > > > have MirrorMaker that also refers to mirrors, it will be > nice if we > > > > > > can > > > > > > > > > refer to the entities using their full name in the CLI and > public > > > > > > APIs. > > > > > > > > > That will enable us to add more mirror topic and mirror > partition > > > > > > APIs in > > > > > > > > > the future if needed. For example: > > > > > > > > > > > > > > > > > > > > > > > > > > > - `kafka-cluster-mirrors.sh` to manage cluster mirrors > > > > > > > > > - createClusterMirrors(), listClusterMirrors(), > > > > > > > > > describeClusterMirrors() etc on the Admin API and Kafka > > > > > Protocol. > > > > > > > > > - KIP proposes pauseMirrorTopics(), > resumeMirrorTopics() which > > > > > > are > > > > > > > > > good. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > RS7: The KIP proposes to store mirror configs in the > internal > > > > > mirror > > > > > > state > > > > > > > > > topic. This includes sensitive credentials of another > cluster. Have > > > > > > we > > > > > > > > > considered other options? Can a user with read access read > the data > > > > > > from > > > > > > > > > the state topic using a consumer? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > > Rajini > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sun, Mar 8, 2026 at 8:58 PM Andrew Schofield < > > > > > > [email protected]> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > >> Hi Fede and friends, > > > > > > > > >> I've re-read in detail and have quite a lot of comments, > mostly > > > > > > minor > > > > > > > > >> clarifications, but as it approaches a vote, it's good to > get the > > > > > > details > > > > > > > > >> nailed down. > > > > > > > > >> > > > > > > > > >> AS6: Could we have a diagram which shows which RPCs are > served by > > > > > > which > > > > > > > > >> components? This will help illustrate the authorisation > > > > > > requirements for > > > > > > > > >> the various components, which is an aspect of the KIP that > I don't > > > > > > think is > > > > > > > > >> completely specified yet. > > > > > > > > >> > > > > > > > > >> AS7: Please could you include a table of the operations and > > > > > > resources > > > > > > > > >> which will be checked for authorisation of each of the RPCs > > > > > > introduced. > > > > > > > > >> Also, please could you document the permissions which the > > > > > > destination > > > > > > > > >> cluster will require to mirror data and ACLs (for example, > I think > > > > > > it will > > > > > > > > >> need ALTER on the CLUSTER resource to manipulate ACLs)? > It's going > > > > > > to need > > > > > > > > >> Metadata, DescribeConfigs, DescribeAcls, ListGroups, > OffsetFetch, > > > > > > > > >> LastMirrorOffset and Fetch RPCs I think, possibly others > too. The > > > > > > user is > > > > > > > > >> probably going to want to give as little permission as > possible to > > > > > > the > > > > > > > > >> destination cluster to get its job done. > > > > > > > > >> > > > > > > > > >> AS8: You include AuthorizedOperations in > DescribeMirrorsResponse, > > > > > > but I > > > > > > > > >> don't know what the operations are. I think the implies > MIRROR is > > > > > a > > > > > > new > > > > > > > > >> resource type in the Kafka security model and > DescribeMirrors can > > > > > > be used > > > > > > > > >> to enquire the authorised operations for the client making > the > > > > > > Admin API > > > > > > > > >> request. > > > > > > > > >> > > > > > > > > >> AS9: I think you're going to need some new error codes in > the > > > > > Kafka > > > > > > > > >> protocol, as least: > > > > > > > > >> > > > > > > > > >> * INVALID_MIRROR_NAME or similar if the mirror name > doesn't meet > > > > > the > > > > > > > > >> rules for a topic name > > > > > > > > >> * UNKNOWN_MIRROR if the mirror doesn't exist > > > > > > > > >> > > > > > > > > >> And probably some more for logical inconsistencies such as > this > > > > > > topic > > > > > > > > >> isn't in that mirror, that topic is already in another > mirror, and > > > > > > so on. > > > > > > > > >> > > > > > > > > >> AS10: Could you add the usage information for > kafka-mirrors.sh > > > > > (the > > > > > > > > >> intended output from kafka-mirrors.sh --help) so all of the > > > > > options > > > > > > are > > > > > > > > >> documented together? For example, I see that > --replication-factor > > > > > is > > > > > > > > >> included in one of the examples, which seems a bit > surprising and > > > > > > I'm not > > > > > > > > >> sure whether it's a mistake or a feature. I can probably > use > > > > > > --describe > > > > > > > > >> with a specific --mirror but it's not specified. > > > > > > > > >> > > > > > > > > >> AS11: I would expect the signature for > Admin.addTopicsToMirror to > > > > > be > > > > > > > > >> Admin.addTopicsToMirror(String mirrorName, Set<String> > topics, > > > > > > > > >> AddTopicsToMirrorOptions options) because it's for adding > topics > > > > > to > > > > > > a > > > > > > > > >> mirror, as the counterpart to > Admin.removeTopicsFromMirror(String > > > > > > > > >> mirrorName, Set<String> topics, > RemoveTopicsFromMirrorOptions > > > > > > options). > > > > > > > > >> > > > > > > > > >> AS12: I don't think ignorable RPC fields in version 0 RPCs > make > > > > > > sense > > > > > > > > >> because they're not trying to be compatible with a previous > > > > > version. > > > > > > > > >> > > > > > > > > >> AS13: I would have expected AddTopicsToMirrorRequest to > have > > > > > mirror > > > > > > name > > > > > > > > >> above the list of topics because the same mirror name > applies to > > > > > > all of the > > > > > > > > >> topics being added. As specified, you repeat the mirror > name for > > > > > > all of the > > > > > > > > >> topics. > > > > > > > > >> > > > > > > > > >> AS14: I suggest adding ErrorMessage to the responses in > all cases > > > > > > to make > > > > > > > > >> it easier to give more descriptive exception messages than > just > > > > > the > > > > > > default > > > > > > > > >> for the error codes. > > > > > > > > >> > > > > > > > > >> AS15: I may have the wrong end of the stick here, but I > expected > > > > > > > > >> RemoveTopicsFromMirrorRequest to remove the topics from a > specific > > > > > > named > > > > > > > > >> mirror as implied by the example of the kafka-mirrors.sh > command. > > > > > > In fact, > > > > > > > > >> I was expecting the mirror to contain the topics in the > admin RPC > > > > > > requests > > > > > > > > >> and responses, and that's only true for about half of them. > > > > > > > > >> > > > > > > > > >> AS16: Can I change the mirror.name config using > > > > > > IncrementalAlterConfigs? > > > > > > > > >> If I attempt it, what's the error? > > > > > > > > >> > > > > > > > > >> AS17: If I attempt mirror RPCs when the mirror is in the > wrong > > > > > > state, the > > > > > > > > >> error is specified as INVALID_REQUEST. That's usually kept > for > > > > > > badly formed > > > > > > > > >> requests, as opposed to logically invalid ones. Maybe > > > > > > MIRROR_NOT_STOPPED or > > > > > > > > >> MIRRORING_ACTIVE or similar would be more expressive. > > > > > > > > >> > > > > > > > > >> AS18: Should the LastMirroredOffsetsResponse, > > > > > > ReadMirrorStatesResponse > > > > > > > > >> and WriteMirrorStatesRequest include LeaderEpoch? I > suspect so. > > > > > > > > >> > > > > > > > > >> AS19: In DescribeMirrorsResponse, I suspect you will want > "null" > > > > > > values > > > > > > > > >> for some fields which don't have values during > initialisation and > > > > > > so on, > > > > > > > > >> such as lag. > > > > > > > > >> > > > > > > > > >> AS20: Do you need to add new versions of the > DescribeConfigs and > > > > > > > > >> IncrementalAlterConfigs RPCs to support mirror resources? > > > > > > > > >> > > > > > > > > >> AS21: The topic configuration > > > > > mirror.replication.throttled.replicas > > > > > > is > > > > > > > > >> described as a list, but the default is MAX_LONG. > > > > > > > > >> > > > > > > > > >> AS22: By including mirror.name as a topic config, a client > which > > > > > > has > > > > > > > > >> permission to describe configs for the topic is able to > discover > > > > > > the name > > > > > > > > >> of the mirror, whether they are permitted to list the > mirrors or > > > > > > describe > > > > > > > > >> that particular mirror. Generally, the Kafka authorisation > model > > > > > > does not > > > > > > > > >> permit this kind of unauthorised information disclosure. > For > > > > > > example, when > > > > > > > > >> a client describes the committed offsets for a consumer > group, the > > > > > > list of > > > > > > > > >> topics returned is filtered to only those topics which the > client > > > > > is > > > > > > > > >> permitted to describe, even though that may results in an > > > > > > incomplete set of > > > > > > > > >> topic partitions being returned. Is there an alternative > way in > > > > > > which this > > > > > > > > >> information could be stored so Kafka only reveals mirror > > > > > > information to > > > > > > > > >> principals authorised to see it? > > > > > > > > >> > > > > > > > > >> AS23: I observe that there are situations in which a > `.removed` > > > > > > suffix is > > > > > > > > >> added to the mirror name. Is it permitted for the user to > define a > > > > > > mirror > > > > > > > > >> called "my.nasty.mirror.removed" and does it break > anything? > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> Thanks, > > > > > > > > >> Andrew > > > > > > > > >> > > > > > > > > >> On 2026/03/06 13:41:52 Paolo Patierno wrote: > > > > > > > > >> > Hi Fede, > > > > > > > > >> > something more ... > > > > > > > > >> > > > > > > > > > >> > Is there any migration path for users who want to > migrate from > > > > > > using > > > > > > > > >> Mirror > > > > > > > > >> > Maker 2 to the cluster mirroring? > > > > > > > > >> > I mean, something like a tool useful to create a > corresponding > > > > > > cluster > > > > > > > > >> > mirroring configuration starting from a MM2 one. Nothing > that > > > > > > runs the > > > > > > > > >> > migration automatically but something that can be > provided to > > > > > the > > > > > > users > > > > > > > > >> as > > > > > > > > >> > output to be validated and put in place by them. > > > > > > > > >> > > > > > > > > > >> > The Admin Client is missing methods to pause and stop > mirroring > > > > > > (but we > > > > > > > > >> > have corresponding protocol messages). Is it on purpose? > Any > > > > > > specific > > > > > > > > >> > reasons? They would be important from an automatic > operator > > > > > > perspective > > > > > > > > >> use > > > > > > > > >> > case. > > > > > > > > >> > Also a method to provide the LastMirroredOffset from the > source > > > > > > cluster > > > > > > > > >> > could be useful for progress and tracking purposes. > > > > > > > > >> > Finally, what about a method to get the mirror states? I > don't > > > > > > think the > > > > > > > > >> > describe method provides such information. > > > > > > > > >> > In general, I think that the Admin Client section needs > to cover > > > > > > in more > > > > > > > > >> > details the new classes definition like > CreateMirrorOptions, > > > > > > > > >> > CreateMirrorResult, ... and so on for all the defined new > > > > > methods. > > > > > > > > >> > > > > > > > > > >> > > AddTopicsToMirrorResult addTopicsToMirror(Map<String, > String> > > > > > > > > >> > topicToMirrorName, AddTopicsToMirrorOptions options); > > > > > > > > >> > > > > > > > > > >> > Isn't it missing the mirrorName (as you have in the > > > > > > > > >> removeTopicsFromMirror > > > > > > > > >> > counterpart)? > > > > > > > > >> > What's the topicToMirrorName parameter if it's defined > as a Map? > > > > > > The > > > > > > > > >> method > > > > > > > > >> > is also plural using "topics" so comparing to the > > > > > > removeTopicsFromMirror > > > > > > > > >> > method, I would assume the parameter really is > Set<String> > > > > > topics? > > > > > > > > >> > Comparing to the corresponding protocol message > > > > > > > > >> AddTopicsToMirrorRequest, I > > > > > > > > >> > see a list of topics but each of them has id, name and > > > > > > corresponding > > > > > > > > >> > mirror. So it's unclear how the addTopicsToMirror is > defined. > > > > > > > > >> > > > > > > > > > >> > > RemoveTopicsFromMirrorResult > removeTopicsFromMirror(String > > > > > > mirrorName, > > > > > > > > >> > Set<String> topics, RemoveTopicsFromMirrorOptions > options); > > > > > > > > >> > > > > > > > > > >> > This method gets a mirrorName but if I look at the > corresponding > > > > > > > > >> protocol > > > > > > > > >> > message RemoveTopicsFromMirrorRequest, it says "Allows > users to > > > > > > detach > > > > > > > > >> > topics from their associated mirror" so the mirror is > actually > > > > > not > > > > > > > > >> provided > > > > > > > > >> > and it's exactly what I see in the JSON definition (only > topics > > > > > > list > > > > > > > > >> with > > > > > > > > >> > id and name). > > > > > > > > >> > > > > > > > > > >> > Finally, regarding the protocol change: > > > > > > > > >> > > > > > > > > > >> > * ListMirrorsResponse I would add the clusterId in the > JSON > > > > > > definition > > > > > > > > >> > (it's related to my comments in the previous email when > using > > > > > the > > > > > > tool). > > > > > > > > >> > * WriteMirrorStatesRequest has the following in the JSON > which > > > > > > should > > > > > > > > >> not > > > > > > > > >> > be part of it "{ "name": "RemovedTopics", "type": > "[]string", > > > > > > > > >> "versions": > > > > > > > > >> > "0+", "about": "The topic names to be removed." }" > > > > > > > > >> > > > > > > > > > >> > Thanks, > > > > > > > > >> > Paolo. > > > > > > > > >> > > > > > > > > > >> > On Fri, 6 Mar 2026 at 13:08, Paolo Patierno < > > > > > > [email protected]> > > > > > > > > >> > wrote: > > > > > > > > >> > > > > > > > > > >> > > Hi Fede, > > > > > > > > >> > > thank you for the proposal. I had a first pass with > following > > > > > > > > >> thoughts and > > > > > > > > >> > > questions. > > > > > > > > >> > > > > > > > > > > >> > > > When the unclean.leader.election.enable is set to > true, the > > > > > > broker > > > > > > > > >> will > > > > > > > > >> > > log a warning at every configuration synchronization > period. > > > > > > > > >> > > Be more explicit about what the warning says. > > > > > > > > >> > > > > > > > > > > >> > > > This topic ID is not used by other topics in the > current > > > > > > cluster > > > > > > > > >> > > In such a case, which should be very unlikely, what's > going to > > > > > > happen? > > > > > > > > >> > > Isn't it possible to mirror the topic? > > > > > > > > >> > > > > > > > > > > >> > > > To enable it, all cluster nodes (controllers and > brokers) > > > > > must > > > > > > > > >> > > explicitly enable unstable API versions and unstable > feature > > > > > > versions > > > > > > > > >> in > > > > > > > > >> > > all configuration files. After starting the cluster > with a > > > > > > minimum > > > > > > > > >> metadata > > > > > > > > >> > > version, operators can dynamically enable the mirror > version > > > > > > feature > > > > > > > > >> to > > > > > > > > >> > > activate Cluster Mirroring. > > > > > > > > >> > > AFAIU there is going to be a dedicated feature flag > for it, > > > > > > right? If > > > > > > > > >> yes > > > > > > > > >> > > can we state it clearly also specifying the exact name > (i.e. > > > > > > > > >> mirror.version > > > > > > > > >> > > or something similar)? > > > > > > > > >> > > > > > > > > > > >> > > When running the kafka-mirrors.sh tool to list the > mirrors, > > > > > > other than > > > > > > > > >> > > showing the SOURCE-BOOTSTRAP, it could be useful to > have also > > > > > > the > > > > > > > > >> clusterId > > > > > > > > >> > > which, as a unique identifier, could be helpful in > automated > > > > > > systems > > > > > > > > >> using > > > > > > > > >> > > the cluster mirroring. Of course, it would be > important to > > > > > have > > > > > > in the > > > > > > > > >> > > ListMirrorsResponse as well as an additional field. > > > > > > > > >> > > > > > > > > > > >> > > What happens in case of Kafka downgrade from a version > > > > > > supporting > > > > > > > > >> > > mirroring to an older one not supporting it. > > > > > > > > >> > > The mirror won't be running but the topic > configuration will > > > > > > still > > > > > > > > >> have > > > > > > > > >> > > config parameters like mirror.name and so on, right? > Are they > > > > > > just > > > > > > > > >> > > ignored by the older Kafka version and the cluster > will work > > > > > > without > > > > > > > > >> issues? > > > > > > > > >> > > > > > > > > > > >> > > Thanks, > > > > > > > > >> > > Paolo > > > > > > > > >> > > > > > > > > > > >> > > On Fri, 6 Mar 2026 at 10:43, Luke Chen < > [email protected]> > > > > > > wrote: > > > > > > > > >> > > > > > > > > > > >> > >> Hi Andrew and all, > > > > > > > > >> > >> > > > > > > > > >> > >> About AS5, yes, I've created a sub-document > > > > > > > > >> > >> < > > > > > > > > >> > >> > > > > > > > > >> > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Unclean+Leader+Election+in+Cluster+Mirroring > > > > > > > > >> > >> >to > > > > > > > > >> > >> explain the algorithm to support unclean leader > election in > > > > > > cluster > > > > > > > > >> > >> mirroring. > > > > > > > > >> > >> Thanks for your comments, I'm inspired by that! :) > > > > > > > > >> > >> > > > > > > > > >> > >> About your idea, to store the owner of the leader > epoch when > > > > > > > > >> leadership > > > > > > > > >> > >> change, I think it might not be needed because the > most > > > > > > important > > > > > > > > >> thing > > > > > > > > >> > >> should be this: > > > > > > > > >> > >> > you might find that both ends have declared a local > epoch > > > > > N, > > > > > > but > > > > > > > > >> someone > > > > > > > > >> > >> has to win. > > > > > > > > >> > >> > > > > > > > > >> > >> That is, as long as we have a way to declare who is > the owner > > > > > > of > > > > > > > > >> leader > > > > > > > > >> > >> epoch N, then the 2 clusters can sync up successfully. > > > > > > > > >> > >> And that's why I proposed to the "last mirrored > leader epoch" > > > > > > > > >> semantic in > > > > > > > > >> > >> the sub-proposal > > > > > > > > >> > >> < > > > > > > > > >> > >> > > > > > > > > >> > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Unclean+Leader+Election+in+Cluster+Mirroring > > > > > > > > >> > >> >, > > > > > > > > >> > >> which is a solution to draw a line between these 2 > clusters > > > > > to > > > > > > > > >> declare > > > > > > > > >> > >> records beyond the "last mirrored leader epoch" N, it > belongs > > > > > > to > > > > > > > > >> who. I > > > > > > > > >> > >> think this should work well, as long as all replicas > in the > > > > > > cluster > > > > > > > > >> can > > > > > > > > >> > >> truncate the log correctly. > > > > > > > > >> > >> > > > > > > > > >> > >> What do you think? > > > > > > > > >> > >> > > > > > > > > >> > >> Any feedback is appreciated. > > > > > > > > >> > >> > > > > > > > > >> > >> Thank you, > > > > > > > > >> > >> Luke > > > > > > > > >> > >> > > > > > > > > >> > >> On Fri, Mar 6, 2026 at 6:02 PM Andrew Schofield < > > > > > > > > >> [email protected]> > > > > > > > > >> > >> wrote: > > > > > > > > >> > >> > > > > > > > > >> > >> > Hi Fede, > > > > > > > > >> > >> > Thanks for your response. > > > > > > > > >> > >> > > > > > > > > > >> > >> > AS1: Thanks for the clarification. > > > > > > > > >> > >> > > > > > > > > > >> > >> > AS2: I expect you'll include a version bump of > > > > > > > > >> AlterShareGroupOffsets in > > > > > > > > >> > >> > this KIP, but that's a small matter compared with > the rest > > > > > > of the > > > > > > > > >> > >> protocol > > > > > > > > >> > >> > changes. > > > > > > > > >> > >> > > > > > > > > > >> > >> > AS3: OK. > > > > > > > > >> > >> > > > > > > > > > >> > >> > AS4: Thanks for the details. My only comment is > that it > > > > > > might be a > > > > > > > > >> bit > > > > > > > > >> > >> > laborious when you want to failover all topics. I > suggest > > > > > > adding > > > > > > > > >> > >> > `--all-topics` so you could do: > > > > > > > > >> > >> > > > > > > > > > >> > >> > $ bin/kafka-mirror.sh --bootstrap-server :9094 > --remove > > > > > > > > >> --all-topics > > > > > > > > >> > >> > --mirror my-mirror > > > > > > > > >> > >> > > > > > > > > > >> > >> > AS5: Thanks for the response. I understand there > are good > > > > > > reasons > > > > > > > > >> for > > > > > > > > >> > >> the > > > > > > > > >> > >> > way epochs are handled in the KIP. I see that there > is a > > > > > > > > >> sub-document > > > > > > > > >> > >> for > > > > > > > > >> > >> > the KIP about unclean leader election. I'll spend > some time > > > > > > > > >> reviewing > > > > > > > > >> > >> that. > > > > > > > > >> > >> > > > > > > > > > >> > >> > Thanks, > > > > > > > > >> > >> > Andrew > > > > > > > > >> > >> > > > > > > > > > >> > >> > On 2026/02/18 13:27:07 Federico Valeri wrote: > > > > > > > > >> > >> > > Hi Andrew, thanks for the review. > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > Let me try to answer your questions and then other > > > > > authors > > > > > > can > > > > > > > > >> join > > > > > > > > >> > >> > > the discussion. > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > AS1 > > > > > > > > >> > >> > > ------ > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > Destination topics are created with the same > topic IDs > > > > > > using the > > > > > > > > >> > >> > > extended CreateTopics API. Then, data is > replicated > > > > > > starting from > > > > > > > > >> > >> > > offset 0 with byte-for-byte batch copying, so > destination > > > > > > offsets > > > > > > > > >> > >> > > always match source offsets. When failing over, > we record > > > > > > the > > > > > > > > >> last > > > > > > > > >> > >> > > mirrored offset (LMO) in the destination cluster. > When > > > > > > failing > > > > > > > > >> back, > > > > > > > > >> > >> > > the LMO is used for truncating and then start > mirroring > > > > > the > > > > > > > > >> delta, > > > > > > > > >> > >> > > otherwise we start mirroring from scratch by > truncating > > > > > to > > > > > > zero. > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > Retention: If the mirror leader attempts to fetch > an > > > > > > offset that > > > > > > > > >> is > > > > > > > > >> > >> > > below the current log start offset of the source > leader > > > > > > (e.g. > > > > > > > > >> fetching > > > > > > > > >> > >> > > offset 50 when log start offset is 100), the > source > > > > > broker > > > > > > > > >> returns an > > > > > > > > >> > >> > > OffsetOutOfRangeException that the mirror leader > handles > > > > > by > > > > > > > > >> truncating > > > > > > > > >> > >> > > to the source's current log start offset and > resuming > > > > > > fetching > > > > > > > > >> from > > > > > > > > >> > >> > > that point. Compaction: The mirror leader > replicates > > > > > these > > > > > > > > >> compacted > > > > > > > > >> > >> > > log segments exactly as they exist in the source > cluster, > > > > > > > > >> maintaining > > > > > > > > >> > >> > > the same offset assignments and gaps. > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > Do you have any specific corner case in mind? > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > AS2 > > > > > > > > >> > >> > > ------ > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > Agreed. The current AlterShareGroupOffsetsRequest > (v0) > > > > > only > > > > > > > > >> includes > > > > > > > > >> > >> > > PartitionIndex and StartOffset with no epoch > field. When > > > > > > > > >> mirroring > > > > > > > > >> > >> > > share group offsets across clusters, the epoch is > needed > > > > > to > > > > > > > > >> ensure the > > > > > > > > >> > >> > > offset alteration targets the correct leader > generation. > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > AS3 > > > > > > > > >> > >> > > ------ > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > Right, the enum is now fixed. Yes, we will parse > from the > > > > > > right > > > > > > > > >> and > > > > > > > > >> > >> > > apply the same naming rules used for topic name ;) > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > AS4 > > > > > > > > >> > >> > > ------- > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > Agreed. I'll try to improve those paragraphs > because they > > > > > > are > > > > > > > > >> crucial > > > > > > > > >> > >> > > from an operational point of view. > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > Let me shortly explain how it is supposed to work: > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > 9091 (source) -----> 9094 (destination) > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > The single operation that allows an operator to > switch > > > > > all > > > > > > > > >> topics at > > > > > > > > >> > >> > > once in case of disaster is the following: > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > bin/kafka-mirror.sh --bootstrap-server :9094 > --remove > > > > > > --topic .* > > > > > > > > >> > >> > > --mirror my-mirror > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > 9091 (source) --x--> 9094 (destination) > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > After that, all mirror topics become detached > from the > > > > > > source > > > > > > > > >> cluster > > > > > > > > >> > >> > > and start accepting writes (the two cluster are > allowed > > > > > to > > > > > > > > >> diverge). > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > When the source cluster is back, the operator can > > > > > failback > > > > > > by > > > > > > > > >> creating > > > > > > > > >> > >> > > a mirror with the same name on the source cluster > (new > > > > > > > > >> destination): > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > echo "bootstrap.servers=localhost:9094" > > > > > > > > > >> /tmp/my-mirror.properties > > > > > > > > >> > >> > > bin/kafka-mirrors.sh --bootstrap-server :9091 > --create > > > > > > --mirror > > > > > > > > >> > >> > > my-mirror --mirror-config > /tmp/my-mirror.properties > > > > > > > > >> > >> > > bin/kafka-mirrors.sh --bootstrap-server :"9091 > --add > > > > > > --topic .* > > > > > > > > >> > >> > > --mirror my-mirror > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > 9091 (destination) <----- 9094 (source) > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > AS5 > > > > > > > > >> > >> > > ------- > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > This is the core of our design and we reached that > > > > > > empirically by > > > > > > > > >> > >> > > trying out different options. We didn't want to > change > > > > > > local > > > > > > > > >> > >> > > replication, and this is something you need to do > when > > > > > > > > >> preserving the > > > > > > > > >> > >> > > source leader epoch. The current design is simple > and > > > > > > keeps the > > > > > > > > >> epoch > > > > > > > > >> > >> > > domains entirely separate. Destination cluster is > in > > > > > > charge of > > > > > > > > >> the > > > > > > > > >> > >> > > leader epoch for its own log. The source epoch is > only > > > > > used > > > > > > > > >> during the > > > > > > > > >> > >> > > fetch protocol to validate responses and detect > > > > > divergence. > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > The polarity idea of tracking whether an epoch > bump > > > > > > originated > > > > > > > > >> from > > > > > > > > >> > >> > > replication vs. local leadership change is > interesting, > > > > > > but adds > > > > > > > > >> > >> > > significant complexity and coupling between > source and > > > > > > > > >> destination > > > > > > > > >> > >> > > epochs. Could you clarify what specific scenario > polarity > > > > > > > > >> tracking > > > > > > > > >> > >> > > would address that the current separation doesn't > handle? > > > > > > One > > > > > > > > >> case we > > > > > > > > >> > >> > > don't support is unclean leader election > reconciliation > > > > > > across > > > > > > > > >> > >> > > clusters, is that the gap you're aiming at? > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > I tried to rewrite the unclean leader election > paragraph > > > > > > in the > > > > > > > > >> > >> > > rejected alternatives to be easier to digest. Let > me know > > > > > > if it > > > > > > > > >> works. > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > On Tue, Feb 17, 2026 at 2:57 PM Andrew Schofield > > > > > > > > >> > >> > > <[email protected]> wrote: > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > > Hi Fede and friends, > > > > > > > > >> > >> > > > Thanks for the KIP. > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > > It’s a comprehensive design, easy to read and > has > > > > > clearly > > > > > > > > >> taken a > > > > > > > > >> > >> lot > > > > > > > > >> > >> > of work. > > > > > > > > >> > >> > > > The principle of integrating mirroring into the > brokers > > > > > > makes > > > > > > > > >> total > > > > > > > > >> > >> > sense to me. > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > > The main comment I have is that mirroring like > this > > > > > > cannot > > > > > > > > >> handle > > > > > > > > >> > >> > situations > > > > > > > > >> > >> > > > in which multiple topic-partitions are logically > > > > > > related, such > > > > > > > > >> as > > > > > > > > >> > >> > transactions, > > > > > > > > >> > >> > > > with total fidelity. Each topic-partition is > being > > > > > > replicated > > > > > > > > >> as a > > > > > > > > >> > >> > separate entity. > > > > > > > > >> > >> > > > The KIP calls this out and describes the > behaviour > > > > > > thoroughly. > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > > A few initial comments. > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > > AS1) Is it true that offsets are always > preserved by > > > > > > this KIP? > > > > > > > > >> I > > > > > > > > >> > >> > *think* so but > > > > > > > > >> > >> > > > not totally sure that it’s true in all cases. > It would > > > > > > > > >> certainly be > > > > > > > > >> > >> > nice. > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > > AS2) I think you need to add epoch information > to > > > > > > > > >> > >> > AlterShareGroupOffsetsRequest. > > > > > > > > >> > >> > > > It really should already be there in hindsight, > but I > > > > > > think > > > > > > > > >> this KIP > > > > > > > > >> > >> > requires it. > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > > AS3) The CoordinatorType enum for MIRROR will > need to > > > > > be > > > > > > 3 > > > > > > > > >> because 2 > > > > > > > > >> > >> > is SHARE. > > > > > > > > >> > >> > > > I’m sure you’ll parse the keys from the right ;) > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > > AS4) The procedure for achieving a failover > could be > > > > > > clearer. > > > > > > > > >> Let’s > > > > > > > > >> > >> > say that I am > > > > > > > > >> > >> > > > using cluster mirroring to achieve DR > replication. My > > > > > > source > > > > > > > > >> cluster > > > > > > > > >> > >> > is utterly lost > > > > > > > > >> > >> > > > due to a disaster. What’s the single operation > that I > > > > > > perform > > > > > > > > >> to > > > > > > > > >> > >> > switch all of the > > > > > > > > >> > >> > > > topics mirrored from the lost source cluster to > become > > > > > > the > > > > > > > > >> active > > > > > > > > >> > >> > topics? > > > > > > > > >> > >> > > > Similarly for failback. > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > > AS5) The only piece that I’m really unsure of > is the > > > > > > epoch > > > > > > > > >> > >> management. > > > > > > > > >> > >> > I would > > > > > > > > >> > >> > > > have thought that the cluster which currently > has the > > > > > > writable > > > > > > > > >> > >> > topic-partition > > > > > > > > >> > >> > > > would be in charge of the leader epoch and it > would not > > > > > > be > > > > > > > > >> > >> necessary to > > > > > > > > >> > >> > > > perform all of the gymnastics described in the > section > > > > > > on epoch > > > > > > > > >> > >> > rewriting. > > > > > > > > >> > >> > > > I have read the Rejected Alternatives section > too, but > > > > > I > > > > > > don’t > > > > > > > > >> fully > > > > > > > > >> > >> > grasp > > > > > > > > >> > >> > > > why it was necessary to reject it. > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > > I wonder if we could store the “polarity” of an > epoch, > > > > > > > > >> essentially > > > > > > > > >> > >> > whether the > > > > > > > > >> > >> > > > epoch bump was observed by replication from a > source > > > > > > cluster, > > > > > > > > >> or > > > > > > > > >> > >> > whether > > > > > > > > >> > >> > > > it was bumped by a local leadership change when > the > > > > > > topic is > > > > > > > > >> locally > > > > > > > > >> > >> > writable. > > > > > > > > >> > >> > > > When a topic-partition switches from read-only > to > > > > > > writable, we > > > > > > > > >> > >> should > > > > > > > > >> > >> > definitely > > > > > > > > >> > >> > > > bump the epoch, and we could record the fact > that it > > > > > was > > > > > > a > > > > > > > > >> local > > > > > > > > >> > >> epoch. > > > > > > > > >> > >> > > > When connectivity is re-established, you might > find > > > > > that > > > > > > both > > > > > > > > >> ends > > > > > > > > >> > >> have > > > > > > > > >> > >> > > > declared a local epoch N, but someone has to > win. > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > > Thanks, > > > > > > > > >> > >> > > > Andrew > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > > > On 14 Feb 2026, at 07:17, Federico Valeri < > > > > > > > > >> [email protected]> > > > > > > > > >> > >> > wrote: > > > > > > > > >> > >> > > > > > > > > > > > > >> > >> > > > > Hi, we would like to start a discussion > thread about > > > > > > > > >> KIP-1279: > > > > > > > > >> > >> > Cluster > > > > > > > > >> > >> > > > > Mirroring. > > > > > > > > >> > >> > > > > > > > > > > > > >> > >> > > > > Cluster Mirroring is a new Kafka feature that > enables > > > > > > native, > > > > > > > > >> > >> > > > > broker-level topic replication across > clusters. > > > > > Unlike > > > > > > > > >> > >> MirrorMaker 2 > > > > > > > > >> > >> > > > > (which runs as an external Connect-based > tool), > > > > > Cluster > > > > > > > > >> Mirroring > > > > > > > > >> > >> is > > > > > > > > >> > >> > > > > built into the broker itself, allowing tighter > > > > > > integration > > > > > > > > >> with > > > > > > > > >> > >> the > > > > > > > > >> > >> > > > > controller, coordinator, and partition > lifecycle. > > > > > > > > >> > >> > > > > > > > > > > > > >> > >> > > > > > > > > > > > > >> > >> > > > > > > > > > >> > >> > > > > > > > > >> > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1279%3A+Cluster+Mirroring > > > > > > > > >> > >> > > > > > > > > > > > > >> > >> > > > > There are a few missing bits, but most of the > design > > > > > is > > > > > > > > >> there, so > > > > > > > > >> > >> we > > > > > > > > >> > >> > > > > think it is the right time to involve the > community > > > > > > and get > > > > > > > > >> > >> feedback. > > > > > > > > >> > >> > > > > Please help validating our approach. > > > > > > > > >> > >> > > > > > > > > > > > > >> > >> > > > > Thanks > > > > > > > > >> > >> > > > > Fede > > > > > > > > >> > >> > > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > >> > >> > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > -- > > > > > > > > >> > > Paolo Patierno > > > > > > > > >> > > > > > > > > > > >> > > *Senior Principal Software Engineer @ IBM**CNCF > Ambassador* > > > > > > > > >> > > > > > > > > > > >> > > Twitter : @ppatierno <http://twitter.com/ppatierno> > > > > > > > > >> > > Linkedin : paolopatierno < > > > > > > http://it.linkedin.com/in/paolopatierno> > > > > > > > > >> > > GitHub : ppatierno <https://github.com/ppatierno> > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > -- > > > > > > > > >> > Paolo Patierno > > > > > > > > >> > > > > > > > > > >> > *Senior Principal Software Engineer @ IBM**CNCF > Ambassador* > > > > > > > > >> > > > > > > > > > >> > Twitter : @ppatierno <http://twitter.com/ppatierno> > > > > > > > > >> > Linkedin : paolopatierno < > > > > > http://it.linkedin.com/in/paolopatierno > > > > > > > > > > > > > > > >> > GitHub : ppatierno <https://github.com/ppatierno> > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
