Hi Rajini, thanks for your thoughtful review and for catching a few bugs. I'll skip some questions that we will address later.
RS2: The metadata records are described in "Mirror Metadata Records" paragraph. Currently there are only two records: "LastMirroredOffsets" record tracks the latest successfully mirrored offset for each partition, while "MirrorPartitionState" record represents the lifecycle states of a mirrored partition. RS3: That's a good point that was also raised by Andrew. It was an easy solution that we used for our prototype, but we need to think about a better solution. RS4: Current design is that mirror fetcher threads behaves like a read_committed consumer fetching up to "source LSO". On failover we truncate destination log to "local LSO". The approach of fetching up to HW that you propose is still safe as we keep truncating to local LSO on failover, but it trades lower steady-state lag (especially when long-running transactions exist on the source) for more data loss on failover (the net data loss relative to the source is the same in both approaches). In other words, with your approach we fetch more data that we may then need to truncate. Also, read_uncommited consumers on the destination cluster would be able to read records that may be truncated on failover. These are just my consideration, but we are open to discussion on which is the best approach here. When a failover is triggered (RemoveTopicsFromMirror), the sequence is: 1. Partitions transition to STOPPING state 2. Fetchers are removed 3. For each partition, truncate to local LSO is called 3.1. Reads LSO from each partition's local log 3.2. Calls log.truncateTo(offset) on the UnifiedLog 3.3. Ensures ISR members complete truncation before the partition becomes writable 4. For each partition, the LSO is recorded as the last mirrored offset (LMO) in __mirror_state 5. Partitions transition to STOPPED and become writable When a failback is triggered (AddTopicsToMirror), the sequence is: 1. Partitions transition to PREPARING state 2. For each partition, truncation to LMO is called 2.1. This sends a LastMirroredOffsetsRequest to the source cluster to fetch the offsets that were recorded during the previous failover 2.2.a. The response offsets are used to truncate local logs 2.2.b. If the source cluster doesn't support the LastMirroredOffsets API or first-time mirror, it truncates to offset 0 3. Partitions transition to MIRRORING RS7: Can you point me to the section that says configs are stored in __mirror_state? Mirror connection configs (bootstrap servers, credentials, etc.) are stored in KRaft metadata via ConfigResource.Type.MIRROR, not in __mirror_state. The internal topic only stores partition states and last mirrored offsets. Sensitive credentials follow the existing KRaft config handling, which is already protected by controller/broker access controls and sensitive config redaction in DescribeConfigs responses. RS8: Not sure what's the recommended approach here. Adding a new error code does not change the response schema and older clients that don't recognize the new error code will surface it as an UnknownServerException (non-retriable). RS11: Good catch. This is a prototype simplification that we need to address. To properly sync consumer group offsets, the implementation would need to send ListGroups to all source brokers (or use the AdminClient which does this internally), send FindCoordinator to discover the group coordinator for each group, send OffsetFetch to the correct group coordinator. RS12: You are absolutely right, the transformation is not idempotent, so it is not safe for chained mirroring (A -> B -> C). Instead, round-trip mirroring (A -> B, then B -> A) works because, when doing a failback, the log is truncated before mirroring resumes, so previously mirrored records with negative pids are removed and the transformation is only applied to new records produced natively on that cluster (double-transformation never occurs). Non-transactional batches stay at -1 [ -(-1 + 2) = -(1) = -1], which is correct. The chained mirroring would work if we skip the transformation when pid is negative, but there is still an edge case: A -> B -> C with local B producer. If cluster A has local pid 5 and cluster B also has local pid 5, both end up as -7 on cluster C. Collision: two different producers with the same pid on the destination. No pid-only transformation can solve that. We would need to incorporate cluster identity. Possible solution that would handle any topology: The producer IDs are 64-bit signed longs used to identify a producer. The clusterId (UUID) is a globally 128-bit unique identifier for each source cluster. We could use the clusterId hash to partition the entire negative PID space into regions, one per source cluster. Basically we divide the 64 bits into three fields: bit 63 (sign bit), bits 62-31 (region selector), bits 30-0 (producer identity). Once a non-negative PID is mapped to a region, it passes through unchanged no matter how many hops follow (i.e. we apply the transformation only for PIDs >= 0). Example with two clusters: - Bit 63: This is the sign bit that makes the value negative and distinguishes mirrored pids from local ones (which are non-negative). - Bits 62-31 cluster A: clusterId = "abc-123", clusterHash = 42 - Bits 62-31 cluster B: clusterId = "xyz-789", clusterHash = 99 - Bits 30-0: Local producer ID 5 that is the same on both clusters. A's pid 5 --> 1|00000000000000000000000000101010|0000000000000000000000000000101 B's pid 5 --> 1|00000000000000000000000001100011|0000000000000000000000000000101 On Wed, Mar 11, 2026 at 1:23 PM Rajini Sivaram <[email protected]> wrote: > > A few more questions about the KIP for clarification: > > RS8: The KIP says produce requests to mirror topics will throw > ReadOnlyTopicException. For Produce Requests returning a new error to > clients, don’t we need to bump Produce request version? > > RS9: The KIP says we use OffsetMovedToTieredStorageException to prevent > mirroring of data in tiered storage. But doesn’t the mirror client look > like a regular consumer to the source cluster and return records fetched > from tiered storage? > > RS10: Client-id based quotas for the source cluster look hard to manage > since there is no hierarchy or grouping possible. Seems better to rely on > secure user-principal based quotas on the source-side. > > RS11: The KIP says `The manager maintains a connection pool with one > blocking sender per source cluster`. If this is the connection used for > periodic sync of offsets, topic configs etc. the coordinator is likely to > need connections to all source brokers (i.e. all group coordinators). > > RS12: The KIP proposes to transform producer ids for mirror records to > avoid conflicts. This comes at a cost because CRC checksum needs to be > recomputed. To justify this cost, we need to ensure that this > transformation works in all cases. What happens if you are mirroring a > mirror topic? Is that a supported scenario? Or mirroring back mirrored data > during failback because the source was truncated? > > > > Thanks, > > Rajini > > > On Tue, Mar 10, 2026 at 8:19 PM Rajini Sivaram <[email protected]> > wrote: > > > Hi team, > > > > Thanks for the KIP! I have a few questions, mostly clarification at this > > point. > > > > RS1: There is a `CreateMirror` request but no corresponding `DeleteMirror` > > request. Is that intentional? > > > > > > > > RS2: It will be good to define the format of data going into the internal > > mirror state topic. There is an example under kafka-dump-logs, which > > shows partition-level state in the payload and the mirror name as key. I > > guess that is not what we expect it to be. Do we delete this information > > when a topic is deleted or a mirror is deleted? > > > > > > > > RS3: KIP currently says mirror name cannot end with .removed. I guess it > > cannot also end with .paused. Have we considered storing state and > > mirror name separately, but updated together for a topic? Since new > > states may be added in future, name restrictions may become hard to > > implement. > > > > > > > > RS4: The KIP says *“mirroring must fetch only up to the LSO to maintain > > transactional consistency”* and it also says *“During the mirror stopping > > transition, the MirrorCoordinator performs a log truncation operation that > > resets each mirror partition to its LSO.”* I guess the plan is to fetch > > up to high watermark and truncate to locally computed LSO on failover? > > Details of the sequence here will be useful. How does MirrorCoordinator > > perform truncation? > > > > > > > > RS5: The KIP says “*On the destination cluster, mirror-related operations > > (creating mirrors, adding/removing topics from mirrors, managing mirror > > configurations) require the CLUSTER_ACTION permission on the cluster > > resource.*” The `Cluster:ClusterAction` ACL is currently used for broker > > service account, e.g. local replication is authorized using this. It seems > > odd to grant this permission to users managing a resource on the cluster. > > Have we considered adding a new resource type `ClusterMirror` and define > > ACLs like `ClusterMirror:Create`, `ClusterMirror:Alter` and ` > > ClusterMirror:AlterConfigs`? > > > > RS6: The KIP talks about three entities: Cluster Mirror, Mirror Topic and > > Mirror > > Partition, with Cluster Mirroring as the feature name. Since we already > > have MirrorMaker that also refers to mirrors, it will be nice if we can > > refer to the entities using their full name in the CLI and public APIs. > > That will enable us to add more mirror topic and mirror partition APIs in > > the future if needed. For example: > > > > > > - `kafka-cluster-mirrors.sh` to manage cluster mirrors > > - createClusterMirrors(), listClusterMirrors(), > > describeClusterMirrors() etc on the Admin API and Kafka Protocol. > > - KIP proposes pauseMirrorTopics(), resumeMirrorTopics() which are > > good. > > > > > > > > RS7: The KIP proposes to store mirror configs in the internal mirror state > > topic. This includes sensitive credentials of another cluster. Have we > > considered other options? Can a user with read access read the data from > > the state topic using a consumer? > > > > > > > > Thanks, > > > > > > Rajini > > > > > > > > On Sun, Mar 8, 2026 at 8:58 PM Andrew Schofield <[email protected]> > > wrote: > > > >> Hi Fede and friends, > >> I've re-read in detail and have quite a lot of comments, mostly minor > >> clarifications, but as it approaches a vote, it's good to get the details > >> nailed down. > >> > >> AS6: Could we have a diagram which shows which RPCs are served by which > >> components? This will help illustrate the authorisation requirements for > >> the various components, which is an aspect of the KIP that I don't think is > >> completely specified yet. > >> > >> AS7: Please could you include a table of the operations and resources > >> which will be checked for authorisation of each of the RPCs introduced. > >> Also, please could you document the permissions which the destination > >> cluster will require to mirror data and ACLs (for example, I think it will > >> need ALTER on the CLUSTER resource to manipulate ACLs)? It's going to need > >> Metadata, DescribeConfigs, DescribeAcls, ListGroups, OffsetFetch, > >> LastMirrorOffset and Fetch RPCs I think, possibly others too. The user is > >> probably going to want to give as little permission as possible to the > >> destination cluster to get its job done. > >> > >> AS8: You include AuthorizedOperations in DescribeMirrorsResponse, but I > >> don't know what the operations are. I think the implies MIRROR is a new > >> resource type in the Kafka security model and DescribeMirrors can be used > >> to enquire the authorised operations for the client making the Admin API > >> request. > >> > >> AS9: I think you're going to need some new error codes in the Kafka > >> protocol, as least: > >> > >> * INVALID_MIRROR_NAME or similar if the mirror name doesn't meet the > >> rules for a topic name > >> * UNKNOWN_MIRROR if the mirror doesn't exist > >> > >> And probably some more for logical inconsistencies such as this topic > >> isn't in that mirror, that topic is already in another mirror, and so on. > >> > >> AS10: Could you add the usage information for kafka-mirrors.sh (the > >> intended output from kafka-mirrors.sh --help) so all of the options are > >> documented together? For example, I see that --replication-factor is > >> included in one of the examples, which seems a bit surprising and I'm not > >> sure whether it's a mistake or a feature. I can probably use --describe > >> with a specific --mirror but it's not specified. > >> > >> AS11: I would expect the signature for Admin.addTopicsToMirror to be > >> Admin.addTopicsToMirror(String mirrorName, Set<String> topics, > >> AddTopicsToMirrorOptions options) because it's for adding topics to a > >> mirror, as the counterpart to Admin.removeTopicsFromMirror(String > >> mirrorName, Set<String> topics, RemoveTopicsFromMirrorOptions options). > >> > >> AS12: I don't think ignorable RPC fields in version 0 RPCs make sense > >> because they're not trying to be compatible with a previous version. > >> > >> AS13: I would have expected AddTopicsToMirrorRequest to have mirror name > >> above the list of topics because the same mirror name applies to all of the > >> topics being added. As specified, you repeat the mirror name for all of the > >> topics. > >> > >> AS14: I suggest adding ErrorMessage to the responses in all cases to make > >> it easier to give more descriptive exception messages than just the default > >> for the error codes. > >> > >> AS15: I may have the wrong end of the stick here, but I expected > >> RemoveTopicsFromMirrorRequest to remove the topics from a specific named > >> mirror as implied by the example of the kafka-mirrors.sh command. In fact, > >> I was expecting the mirror to contain the topics in the admin RPC requests > >> and responses, and that's only true for about half of them. > >> > >> AS16: Can I change the mirror.name config using IncrementalAlterConfigs? > >> If I attempt it, what's the error? > >> > >> AS17: If I attempt mirror RPCs when the mirror is in the wrong state, the > >> error is specified as INVALID_REQUEST. That's usually kept for badly formed > >> requests, as opposed to logically invalid ones. Maybe MIRROR_NOT_STOPPED or > >> MIRRORING_ACTIVE or similar would be more expressive. > >> > >> AS18: Should the LastMirroredOffsetsResponse, ReadMirrorStatesResponse > >> and WriteMirrorStatesRequest include LeaderEpoch? I suspect so. > >> > >> AS19: In DescribeMirrorsResponse, I suspect you will want "null" values > >> for some fields which don't have values during initialisation and so on, > >> such as lag. > >> > >> AS20: Do you need to add new versions of the DescribeConfigs and > >> IncrementalAlterConfigs RPCs to support mirror resources? > >> > >> AS21: The topic configuration mirror.replication.throttled.replicas is > >> described as a list, but the default is MAX_LONG. > >> > >> AS22: By including mirror.name as a topic config, a client which has > >> permission to describe configs for the topic is able to discover the name > >> of the mirror, whether they are permitted to list the mirrors or describe > >> that particular mirror. Generally, the Kafka authorisation model does not > >> permit this kind of unauthorised information disclosure. For example, when > >> a client describes the committed offsets for a consumer group, the list of > >> topics returned is filtered to only those topics which the client is > >> permitted to describe, even though that may results in an incomplete set of > >> topic partitions being returned. Is there an alternative way in which this > >> information could be stored so Kafka only reveals mirror information to > >> principals authorised to see it? > >> > >> AS23: I observe that there are situations in which a `.removed` suffix is > >> added to the mirror name. Is it permitted for the user to define a mirror > >> called "my.nasty.mirror.removed" and does it break anything? > >> > >> > >> Thanks, > >> Andrew > >> > >> On 2026/03/06 13:41:52 Paolo Patierno wrote: > >> > Hi Fede, > >> > something more ... > >> > > >> > Is there any migration path for users who want to migrate from using > >> Mirror > >> > Maker 2 to the cluster mirroring? > >> > I mean, something like a tool useful to create a corresponding cluster > >> > mirroring configuration starting from a MM2 one. Nothing that runs the > >> > migration automatically but something that can be provided to the users > >> as > >> > output to be validated and put in place by them. > >> > > >> > The Admin Client is missing methods to pause and stop mirroring (but we > >> > have corresponding protocol messages). Is it on purpose? Any specific > >> > reasons? They would be important from an automatic operator perspective > >> use > >> > case. > >> > Also a method to provide the LastMirroredOffset from the source cluster > >> > could be useful for progress and tracking purposes. > >> > Finally, what about a method to get the mirror states? I don't think the > >> > describe method provides such information. > >> > In general, I think that the Admin Client section needs to cover in more > >> > details the new classes definition like CreateMirrorOptions, > >> > CreateMirrorResult, ... and so on for all the defined new methods. > >> > > >> > > AddTopicsToMirrorResult addTopicsToMirror(Map<String, String> > >> > topicToMirrorName, AddTopicsToMirrorOptions options); > >> > > >> > Isn't it missing the mirrorName (as you have in the > >> removeTopicsFromMirror > >> > counterpart)? > >> > What's the topicToMirrorName parameter if it's defined as a Map? The > >> method > >> > is also plural using "topics" so comparing to the removeTopicsFromMirror > >> > method, I would assume the parameter really is Set<String> topics? > >> > Comparing to the corresponding protocol message > >> AddTopicsToMirrorRequest, I > >> > see a list of topics but each of them has id, name and corresponding > >> > mirror. So it's unclear how the addTopicsToMirror is defined. > >> > > >> > > RemoveTopicsFromMirrorResult removeTopicsFromMirror(String mirrorName, > >> > Set<String> topics, RemoveTopicsFromMirrorOptions options); > >> > > >> > This method gets a mirrorName but if I look at the corresponding > >> protocol > >> > message RemoveTopicsFromMirrorRequest, it says "Allows users to detach > >> > topics from their associated mirror" so the mirror is actually not > >> provided > >> > and it's exactly what I see in the JSON definition (only topics list > >> with > >> > id and name). > >> > > >> > Finally, regarding the protocol change: > >> > > >> > * ListMirrorsResponse I would add the clusterId in the JSON definition > >> > (it's related to my comments in the previous email when using the tool). > >> > * WriteMirrorStatesRequest has the following in the JSON which should > >> not > >> > be part of it "{ "name": "RemovedTopics", "type": "[]string", > >> "versions": > >> > "0+", "about": "The topic names to be removed." }" > >> > > >> > Thanks, > >> > Paolo. > >> > > >> > On Fri, 6 Mar 2026 at 13:08, Paolo Patierno <[email protected]> > >> > wrote: > >> > > >> > > Hi Fede, > >> > > thank you for the proposal. I had a first pass with following > >> thoughts and > >> > > questions. > >> > > > >> > > > When the unclean.leader.election.enable is set to true, the broker > >> will > >> > > log a warning at every configuration synchronization period. > >> > > Be more explicit about what the warning says. > >> > > > >> > > > This topic ID is not used by other topics in the current cluster > >> > > In such a case, which should be very unlikely, what's going to happen? > >> > > Isn't it possible to mirror the topic? > >> > > > >> > > > To enable it, all cluster nodes (controllers and brokers) must > >> > > explicitly enable unstable API versions and unstable feature versions > >> in > >> > > all configuration files. After starting the cluster with a minimum > >> metadata > >> > > version, operators can dynamically enable the mirror version feature > >> to > >> > > activate Cluster Mirroring. > >> > > AFAIU there is going to be a dedicated feature flag for it, right? If > >> yes > >> > > can we state it clearly also specifying the exact name (i.e. > >> mirror.version > >> > > or something similar)? > >> > > > >> > > When running the kafka-mirrors.sh tool to list the mirrors, other than > >> > > showing the SOURCE-BOOTSTRAP, it could be useful to have also the > >> clusterId > >> > > which, as a unique identifier, could be helpful in automated systems > >> using > >> > > the cluster mirroring. Of course, it would be important to have in the > >> > > ListMirrorsResponse as well as an additional field. > >> > > > >> > > What happens in case of Kafka downgrade from a version supporting > >> > > mirroring to an older one not supporting it. > >> > > The mirror won't be running but the topic configuration will still > >> have > >> > > config parameters like mirror.name and so on, right? Are they just > >> > > ignored by the older Kafka version and the cluster will work without > >> issues? > >> > > > >> > > Thanks, > >> > > Paolo > >> > > > >> > > On Fri, 6 Mar 2026 at 10:43, Luke Chen <[email protected]> wrote: > >> > > > >> > >> Hi Andrew and all, > >> > >> > >> > >> About AS5, yes, I've created a sub-document > >> > >> < > >> > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/Unclean+Leader+Election+in+Cluster+Mirroring > >> > >> >to > >> > >> explain the algorithm to support unclean leader election in cluster > >> > >> mirroring. > >> > >> Thanks for your comments, I'm inspired by that! :) > >> > >> > >> > >> About your idea, to store the owner of the leader epoch when > >> leadership > >> > >> change, I think it might not be needed because the most important > >> thing > >> > >> should be this: > >> > >> > you might find that both ends have declared a local epoch N, but > >> someone > >> > >> has to win. > >> > >> > >> > >> That is, as long as we have a way to declare who is the owner of > >> leader > >> > >> epoch N, then the 2 clusters can sync up successfully. > >> > >> And that's why I proposed to the "last mirrored leader epoch" > >> semantic in > >> > >> the sub-proposal > >> > >> < > >> > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/Unclean+Leader+Election+in+Cluster+Mirroring > >> > >> >, > >> > >> which is a solution to draw a line between these 2 clusters to > >> declare > >> > >> records beyond the "last mirrored leader epoch" N, it belongs to > >> who. I > >> > >> think this should work well, as long as all replicas in the cluster > >> can > >> > >> truncate the log correctly. > >> > >> > >> > >> What do you think? > >> > >> > >> > >> Any feedback is appreciated. > >> > >> > >> > >> Thank you, > >> > >> Luke > >> > >> > >> > >> On Fri, Mar 6, 2026 at 6:02 PM Andrew Schofield < > >> [email protected]> > >> > >> wrote: > >> > >> > >> > >> > Hi Fede, > >> > >> > Thanks for your response. > >> > >> > > >> > >> > AS1: Thanks for the clarification. > >> > >> > > >> > >> > AS2: I expect you'll include a version bump of > >> AlterShareGroupOffsets in > >> > >> > this KIP, but that's a small matter compared with the rest of the > >> > >> protocol > >> > >> > changes. > >> > >> > > >> > >> > AS3: OK. > >> > >> > > >> > >> > AS4: Thanks for the details. My only comment is that it might be a > >> bit > >> > >> > laborious when you want to failover all topics. I suggest adding > >> > >> > `--all-topics` so you could do: > >> > >> > > >> > >> > $ bin/kafka-mirror.sh --bootstrap-server :9094 --remove > >> --all-topics > >> > >> > --mirror my-mirror > >> > >> > > >> > >> > AS5: Thanks for the response. I understand there are good reasons > >> for > >> > >> the > >> > >> > way epochs are handled in the KIP. I see that there is a > >> sub-document > >> > >> for > >> > >> > the KIP about unclean leader election. I'll spend some time > >> reviewing > >> > >> that. > >> > >> > > >> > >> > Thanks, > >> > >> > Andrew > >> > >> > > >> > >> > On 2026/02/18 13:27:07 Federico Valeri wrote: > >> > >> > > Hi Andrew, thanks for the review. > >> > >> > > > >> > >> > > Let me try to answer your questions and then other authors can > >> join > >> > >> > > the discussion. > >> > >> > > > >> > >> > > AS1 > >> > >> > > ------ > >> > >> > > > >> > >> > > Destination topics are created with the same topic IDs using the > >> > >> > > extended CreateTopics API. Then, data is replicated starting from > >> > >> > > offset 0 with byte-for-byte batch copying, so destination offsets > >> > >> > > always match source offsets. When failing over, we record the > >> last > >> > >> > > mirrored offset (LMO) in the destination cluster. When failing > >> back, > >> > >> > > the LMO is used for truncating and then start mirroring the > >> delta, > >> > >> > > otherwise we start mirroring from scratch by truncating to zero. > >> > >> > > > >> > >> > > Retention: If the mirror leader attempts to fetch an offset that > >> is > >> > >> > > below the current log start offset of the source leader (e.g. > >> fetching > >> > >> > > offset 50 when log start offset is 100), the source broker > >> returns an > >> > >> > > OffsetOutOfRangeException that the mirror leader handles by > >> truncating > >> > >> > > to the source's current log start offset and resuming fetching > >> from > >> > >> > > that point. Compaction: The mirror leader replicates these > >> compacted > >> > >> > > log segments exactly as they exist in the source cluster, > >> maintaining > >> > >> > > the same offset assignments and gaps. > >> > >> > > > >> > >> > > Do you have any specific corner case in mind? > >> > >> > > > >> > >> > > AS2 > >> > >> > > ------ > >> > >> > > > >> > >> > > Agreed. The current AlterShareGroupOffsetsRequest (v0) only > >> includes > >> > >> > > PartitionIndex and StartOffset with no epoch field. When > >> mirroring > >> > >> > > share group offsets across clusters, the epoch is needed to > >> ensure the > >> > >> > > offset alteration targets the correct leader generation. > >> > >> > > > >> > >> > > AS3 > >> > >> > > ------ > >> > >> > > > >> > >> > > Right, the enum is now fixed. Yes, we will parse from the right > >> and > >> > >> > > apply the same naming rules used for topic name ;) > >> > >> > > > >> > >> > > AS4 > >> > >> > > ------- > >> > >> > > > >> > >> > > Agreed. I'll try to improve those paragraphs because they are > >> crucial > >> > >> > > from an operational point of view. > >> > >> > > > >> > >> > > Let me shortly explain how it is supposed to work: > >> > >> > > > >> > >> > > 9091 (source) -----> 9094 (destination) > >> > >> > > > >> > >> > > The single operation that allows an operator to switch all > >> topics at > >> > >> > > once in case of disaster is the following: > >> > >> > > > >> > >> > > bin/kafka-mirror.sh --bootstrap-server :9094 --remove --topic .* > >> > >> > > --mirror my-mirror > >> > >> > > > >> > >> > > 9091 (source) --x--> 9094 (destination) > >> > >> > > > >> > >> > > After that, all mirror topics become detached from the source > >> cluster > >> > >> > > and start accepting writes (the two cluster are allowed to > >> diverge). > >> > >> > > > >> > >> > > When the source cluster is back, the operator can failback by > >> creating > >> > >> > > a mirror with the same name on the source cluster (new > >> destination): > >> > >> > > > >> > >> > > echo "bootstrap.servers=localhost:9094" > > >> /tmp/my-mirror.properties > >> > >> > > bin/kafka-mirrors.sh --bootstrap-server :9091 --create --mirror > >> > >> > > my-mirror --mirror-config /tmp/my-mirror.properties > >> > >> > > bin/kafka-mirrors.sh --bootstrap-server :"9091 --add --topic .* > >> > >> > > --mirror my-mirror > >> > >> > > > >> > >> > > 9091 (destination) <----- 9094 (source) > >> > >> > > > >> > >> > > AS5 > >> > >> > > ------- > >> > >> > > > >> > >> > > This is the core of our design and we reached that empirically by > >> > >> > > trying out different options. We didn't want to change local > >> > >> > > replication, and this is something you need to do when > >> preserving the > >> > >> > > source leader epoch. The current design is simple and keeps the > >> epoch > >> > >> > > domains entirely separate. Destination cluster is in charge of > >> the > >> > >> > > leader epoch for its own log. The source epoch is only used > >> during the > >> > >> > > fetch protocol to validate responses and detect divergence. > >> > >> > > > >> > >> > > The polarity idea of tracking whether an epoch bump originated > >> from > >> > >> > > replication vs. local leadership change is interesting, but adds > >> > >> > > significant complexity and coupling between source and > >> destination > >> > >> > > epochs. Could you clarify what specific scenario polarity > >> tracking > >> > >> > > would address that the current separation doesn't handle? One > >> case we > >> > >> > > don't support is unclean leader election reconciliation across > >> > >> > > clusters, is that the gap you're aiming at? > >> > >> > > > >> > >> > > I tried to rewrite the unclean leader election paragraph in the > >> > >> > > rejected alternatives to be easier to digest. Let me know if it > >> works. > >> > >> > > > >> > >> > > On Tue, Feb 17, 2026 at 2:57 PM Andrew Schofield > >> > >> > > <[email protected]> wrote: > >> > >> > > > > >> > >> > > > Hi Fede and friends, > >> > >> > > > Thanks for the KIP. > >> > >> > > > > >> > >> > > > It’s a comprehensive design, easy to read and has clearly > >> taken a > >> > >> lot > >> > >> > of work. > >> > >> > > > The principle of integrating mirroring into the brokers makes > >> total > >> > >> > sense to me. > >> > >> > > > > >> > >> > > > The main comment I have is that mirroring like this cannot > >> handle > >> > >> > situations > >> > >> > > > in which multiple topic-partitions are logically related, such > >> as > >> > >> > transactions, > >> > >> > > > with total fidelity. Each topic-partition is being replicated > >> as a > >> > >> > separate entity. > >> > >> > > > The KIP calls this out and describes the behaviour thoroughly. > >> > >> > > > > >> > >> > > > A few initial comments. > >> > >> > > > > >> > >> > > > AS1) Is it true that offsets are always preserved by this KIP? > >> I > >> > >> > *think* so but > >> > >> > > > not totally sure that it’s true in all cases. It would > >> certainly be > >> > >> > nice. > >> > >> > > > > >> > >> > > > AS2) I think you need to add epoch information to > >> > >> > AlterShareGroupOffsetsRequest. > >> > >> > > > It really should already be there in hindsight, but I think > >> this KIP > >> > >> > requires it. > >> > >> > > > > >> > >> > > > AS3) The CoordinatorType enum for MIRROR will need to be 3 > >> because 2 > >> > >> > is SHARE. > >> > >> > > > I’m sure you’ll parse the keys from the right ;) > >> > >> > > > > >> > >> > > > AS4) The procedure for achieving a failover could be clearer. > >> Let’s > >> > >> > say that I am > >> > >> > > > using cluster mirroring to achieve DR replication. My source > >> cluster > >> > >> > is utterly lost > >> > >> > > > due to a disaster. What’s the single operation that I perform > >> to > >> > >> > switch all of the > >> > >> > > > topics mirrored from the lost source cluster to become the > >> active > >> > >> > topics? > >> > >> > > > Similarly for failback. > >> > >> > > > > >> > >> > > > AS5) The only piece that I’m really unsure of is the epoch > >> > >> management. > >> > >> > I would > >> > >> > > > have thought that the cluster which currently has the writable > >> > >> > topic-partition > >> > >> > > > would be in charge of the leader epoch and it would not be > >> > >> necessary to > >> > >> > > > perform all of the gymnastics described in the section on epoch > >> > >> > rewriting. > >> > >> > > > I have read the Rejected Alternatives section too, but I don’t > >> fully > >> > >> > grasp > >> > >> > > > why it was necessary to reject it. > >> > >> > > > > >> > >> > > > I wonder if we could store the “polarity” of an epoch, > >> essentially > >> > >> > whether the > >> > >> > > > epoch bump was observed by replication from a source cluster, > >> or > >> > >> > whether > >> > >> > > > it was bumped by a local leadership change when the topic is > >> locally > >> > >> > writable. > >> > >> > > > When a topic-partition switches from read-only to writable, we > >> > >> should > >> > >> > definitely > >> > >> > > > bump the epoch, and we could record the fact that it was a > >> local > >> > >> epoch. > >> > >> > > > When connectivity is re-established, you might find that both > >> ends > >> > >> have > >> > >> > > > declared a local epoch N, but someone has to win. > >> > >> > > > > >> > >> > > > Thanks, > >> > >> > > > Andrew > >> > >> > > > > >> > >> > > > > On 14 Feb 2026, at 07:17, Federico Valeri < > >> [email protected]> > >> > >> > wrote: > >> > >> > > > > > >> > >> > > > > Hi, we would like to start a discussion thread about > >> KIP-1279: > >> > >> > Cluster > >> > >> > > > > Mirroring. > >> > >> > > > > > >> > >> > > > > Cluster Mirroring is a new Kafka feature that enables native, > >> > >> > > > > broker-level topic replication across clusters. Unlike > >> > >> MirrorMaker 2 > >> > >> > > > > (which runs as an external Connect-based tool), Cluster > >> Mirroring > >> > >> is > >> > >> > > > > built into the broker itself, allowing tighter integration > >> with > >> > >> the > >> > >> > > > > controller, coordinator, and partition lifecycle. > >> > >> > > > > > >> > >> > > > > > >> > >> > > >> > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1279%3A+Cluster+Mirroring > >> > >> > > > > > >> > >> > > > > There are a few missing bits, but most of the design is > >> there, so > >> > >> we > >> > >> > > > > think it is the right time to involve the community and get > >> > >> feedback. > >> > >> > > > > Please help validating our approach. > >> > >> > > > > > >> > >> > > > > Thanks > >> > >> > > > > Fede > >> > >> > > > > >> > >> > > > >> > >> > > >> > >> > >> > > > >> > > > >> > > -- > >> > > Paolo Patierno > >> > > > >> > > *Senior Principal Software Engineer @ IBM**CNCF Ambassador* > >> > > > >> > > Twitter : @ppatierno <http://twitter.com/ppatierno> > >> > > Linkedin : paolopatierno <http://it.linkedin.com/in/paolopatierno> > >> > > GitHub : ppatierno <https://github.com/ppatierno> > >> > > > >> > > >> > > >> > -- > >> > Paolo Patierno > >> > > >> > *Senior Principal Software Engineer @ IBM**CNCF Ambassador* > >> > > >> > Twitter : @ppatierno <http://twitter.com/ppatierno> > >> > Linkedin : paolopatierno <http://it.linkedin.com/in/paolopatierno> > >> > GitHub : ppatierno <https://github.com/ppatierno> > >> > > >> > >
