Ryanne, > > b.a.replicate-me-0 > That's actually impossible with MM2.
Thanks, I see the isCycle check in MirrorSourceConnector. That makes me even more curious how the renameTopicPartition method triggers without a change such as the one that Jeroen has prototyped, since the only thing that emits offset syncs is the MirrorSourceTask, and it is disallowed from sending topics back in a cycle. Greg On Fri, Jan 12, 2024 at 6:13 AM Jeroen Schutrup <jer...@cloudflare.com.invalid> wrote: > > Hey Greg, > There are no offset collisions as the offset-syncs albeit being stored on > the same cluster, offsets from A->B are stored > in mm2-offset-syncs.b.internal whereas offsets from B->A are stored > in mm2-offset-syncs.a.internal. > What's curious though is the B->A checkpoint connector (which has > offset-syncs.topic.location: target) actually uses the offsets stored in > mm2-offset-syncs.b.internal (which contains the downstream offsets) while I > expected it to only read offsets stored in mm2-offset-syncs.a.internal, as > cluster A is its target. > > I'm positive on driving a KIP for this feature to see whether we can get it > implemented. I'm hoping to submit a draft in the upcoming weeks, though I'd > need a bit of time to get a better grasp on the mirror connector codebase. > > Thank you both for your valuable insights so far! > > Jeroen > > On Thu, Jan 11, 2024 at 8:06 PM Greg Harris <greg.har...@aiven.io.invalid> > wrote: > > > Hey Jeroen, > > > > Thanks for sharing your prototype! It is very interesting! > > > > > I couldn't reproduce your hypothesis. > > > > I think my hypothesis was for another setup which didn't involve code > > changes, and instead relied on A->B->A round trip replication to > > produce the "backwards" offset syncs. > > I believe this would replicate data from "replicate-me-0" to > > "b.a.replicate-me-0", and then possibly take the offsets intended for > > "b.a.replicate-me-0" and apply them to "replicate-me-0" creating the > > infinite cycle. > > I would not expect your implementation to suffer from this failure > > mode, because it's using the offset in "replicate-me-0" as the > > downstream offset, not the offset of "b.a.replicate-me-0". > > > > With your prototype, do you experience "collisions" in the > > offset-syncs topic? Since you're sharing a single offset-syncs topic > > between both replication flows, I would expect offsets for topics with > > the same names on both clusters to conflict, and cause the translation > > to happen using the opposite topic's offsets. > > It would also be visible in the state of the OffsetSyncStore here: > > [1], you can compare the normal A->B behavior before and after > > starting the B -> A source connector to see if the concurrent flows > > causes more syncs to be cleared, or the wrong syncs to be present. > > > > I think it is normal for every MM2 connector to have the same > > offset-syncs.topic.location to avoid these sorts of conflicts, so that > > each syncs topic is only used by one of the MM2 replication flows. > > I think that turning on bidirectional offset syncs will probably > > require a second producer in the MirrorSourceTask to contact the > > opposite cluster, or a second admin client in the > > MirrorCheckpointTask. > > > > > Do you think it'd be worthwhile proceeding with this? > > > > This is certainly a capability that MM2 is missing right now, and > > seems like it would be a natural component of failing consumers back > > and forth. If you see value in it, and are interested in driving the > > feature, you can open a KIP [2] to discuss the interface and design > > with the rest of the community. > > > > Thanks! > > Greg > > > > [1] > > https://github.com/apache/kafka/blob/2c6fb6c54472e90ae17439e62540ef3cb0426fe3/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L194 > > [2] > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals > > > > On Thu, Jan 11, 2024 at 9:27 AM Jeroen Schutrup > > <jer...@cloudflare.com.invalid> wrote: > > > > > > I see, makes complete sense to me. I built a custom version [1] based off > > > of Kafka 3.5.1 with bidirectional offset replication enabled so I could > > do > > > some more testing. Offset translation back upstream works well; I think > > > because of the reason Ryanne pointed out, both topics contain identical > > > data. Tested this by truncating the upstream topic before starting > > > replication (so the downstream/upstream topics have different offsets). > > > Truncating the upstream topic while replication is running neither > > results > > > in any weirdness. > > > > > > > Before starting the replication, insert a few records into > > > `a.replicate-me` to force replicate-me-0's offset n to replicate to > > > a.replicate-me-0's offset n+k. > > > I couldn't reproduce your hypothesis. After doing the above and then > > > starting replication I didn't see any offset replication loops. Once I > > > started producing data into the upstream topic and subscribing a > > > console-consumer on the downstream topic, offsets were translated and > > > replicated correctly back upstream. My guess is the CheckpointConnector > > can > > > offset these surplus of messages as the actual log offsets of the > > > downstream topic are written to the offset-sync topic. > > > > > > As this kind of active/active replication would be very beneficial to us > > > for reasons stated in my previous message, we'd love to help out building > > > this kind of offset replication into the Mirror connectors. I understand > > > this is not something that should be enabled by default, but having it > > > behind configuration toggle could help out users desiring a similar kind > > of > > > active/active setup and who understand the restrictions. Do you think > > it'd > > > be worthwhile proceeding with this? > > > > > > [1] > > > > > https://github.com/jeroen92/kafka/commit/1a27696ec6777c230f100cf9887368c431ebe0f8 > > > > > > On Thu, Jan 11, 2024 at 1:06 AM Greg Harris <greg.har...@aiven.io.invalid > > > > > > wrote: > > > > > > > Hi Jeroen, > > > > > > > > I'm glad you're experimenting with MM2, and I hope we can give you > > > > some more context to explain what you're seeing. > > > > > > > > > I wrote a small program to produce these offset syncs for the > > prefixed > > > > > topic, and this successfully triggers the Checkpoint connector to > > start > > > > > replicating the consumer offsets back to the primary cluster. > > > > > > > > This is interesting, and I wouldn't have expected it to work. > > > > > > > > To rewind, each flow Source->Target has a MirrorSourceConnector, an > > > > Offset Syncs Topic, and a MirrorCheckpointConnector. With both > > > > directions enabled, there are two separate flows each with Source, > > > > Syncs topic, and Checkpoint. > > > > With offset-syncs.topic.location=source, the > > > > mm2-offset-syncs.b.internal on the A cluster is used for the A -> B > > > > replication flow. It contains topic names from cluster A, and the > > > > corresponding offsets those records were written to on the B cluster. > > > > When translation is performed, the consumer groups from A are > > > > replicated to the B cluster, and the replication mapping (cluster > > > > prefix) is added. > > > > Using your syncs topic as an example, > > > > OffsetSync{topicPartition=replicate-me-0, upstreamOffset=28, > > > > downstreamOffset=28} will be used to write offsets for > > > > "a.replicate-me-0" for the equivalent group on the B cluster. > > > > > > > > When your artificial sync OffsetSync{topicPartition=a.replicate-me-0, > > > > upstreamOffset=29, downstreamOffset=29} is processed, it should be > > > > used to write offsets for "a.a.replicate-me-0" but it actually writes > > > > offsets to "replicate-me-0" due to this function that I've never > > > > encountered before: [1]. > > > > I think you could get those sorts of syncs into the syncs-topic if you > > > > had A->B configured with offset-syncs.topic.location=source, and B->A > > > > with offset-syncs-topic.location=target, and configured the topic > > > > filter to do A -> B -> A round trip replication. > > > > > > > > This appears to work as expected if there are no failures or restarts, > > > > but as soon as a record is re-delivered in either flow, I think the > > > > offsets should end up constantly advancing in an infinite loop. Maybe > > > > you can try that: Before starting the replication, insert a few > > > > records into `a.replicate-me` to force replicate-me-0's offset n to > > > > replicate to a.replicate-me-0's offset n+k. > > > > > > > > Ryanne, do you recall the purpose of the renameTopicPartition > > > > function? To me it looks like it could only be harmful, as it renames > > > > checkpoints to target topics that MirrorMaker2 isn't writing. It also > > > > looks like it isn't active in a typical MM2 setup. > > > > > > > > Thanks! > > > > Greg > > > > > > > > [1]: > > > > > > https://github.com/apache/kafka/blob/13a83d58f897de2f55d8d3342ffb058b230a9183/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L257-L267 > > > > > > > > On Tue, Jan 9, 2024 at 5:54 AM Jeroen Schutrup > > > > <jer...@cloudflare.com.invalid> wrote: > > > > > > > > > > Thank you both for your swift responses! > > > > > > > > > > Ryanne, the MirrorConnectorsIntegrationBaseTest only tests offset > > > > > replication in cases where the producer migrated to the secondary > > cluster > > > > > as well, starts feeding messages into the non-prefixed topic which > > are > > > > > subsequently consumed by the consumer on the secondary cluster. > > After the > > > > > fallback, it asserts the consumer offsets on the non-prefixed topic > > in > > > > the > > > > > secondary cluster are translated and replicated to the consumer > > offsets > > > > of > > > > > the prefixed topic in the primary cluster. > > > > > In my example, the producer keeps producing in the primary cluster > > > > whereas > > > > > only the consumer fails over to the secondary cluster and, after some > > > > time > > > > > fails back to the primary cluster. This consumer will then consume > > > > messages > > > > > from the prefixed topic in the secondary cluster, and I'd like to > > have > > > > > those offsets replicated back to the non-prefixed topic in the > > primary > > > > > cluster. If you like I can provide an illustration if that helps to > > > > clarify > > > > > this use case. > > > > > > > > > > To add some context on why I'd like to have this is to retain loose > > > > > coupling between producers and consumers so we're able to test > > failovers > > > > > for individual applications without the need for all > > producers/consumers > > > > to > > > > > failover and failback at once. > > > > > > > > > > Digging through the Connect debug logs I found the offset-syncs of > > the > > > > > prefixed topic not being pushed to mm2-offset-syncs.b.internal is > > likely > > > > > the reason the checkpoint connector doesn't replicate consumer > > offsets: > > > > > DEBUG translateDownstream(replication,a.replicate-me-0,25): Skipped > > > > (offset > > > > > sync not found) (org.apache.kafka.connect.mirror.OffsetSyncStore) > > > > > > > > > > I wrote a small program to produce these offset syncs for the > > prefixed > > > > > topic, and this successfully triggers the Checkpoint connector to > > start > > > > > replicating the consumer offsets back to the primary cluster. > > > > > OffsetSync{topicPartition=replicate-me-0, upstreamOffset=28, > > > > > downstreamOffset=28} > > > > > OffsetSync{topicPartition=replicate-me-0, upstreamOffset=29, > > > > > downstreamOffset=29} > > > > > OffsetSync{topicPartition=a.replicate-me-0, upstreamOffset=29, > > > > > downstreamOffset=29} <-- the artificially generated offset-sync > > > > > > > > > > At this point it goes a bit beyond my understanding of the MM2 > > internals > > > > of > > > > > whether this is a wise thing to do and if it would have any negative > > side > > > > > effects. I'd need to spend some more time in the MM2 source, though I > > > > > welcome any feedback on this hack :-) > > > > > > > > > > On the two complications you're mentioning Greg, the second one is > > > > > something we should figure out regardless, as any given consumer > > group > > > > may > > > > > not be active on both the primary and secondary cluster as it would > > block > > > > > MM2 from replicating its offsets from primary to the cluster-prefixed > > > > topic > > > > > on the secondary cluster already. On the first point, I think it > > would > > > > be a > > > > > good practice to only allow MM2 to produce to any cluster-prefixed > > topic > > > > by > > > > > using topic ACLs. In other words, the only application producing to a > > > > > cluster-prefixed (or downstream) topic would be mirror maker and I > > think > > > > > that prevents this kind of message 'drift'. In case a producer has to > > > > > failover, it starts producing to the non-prefixed topic on the > > secondary > > > > > cluster whose offsets are subject to a different Source/Checkpoint > > > > > connector replication stream. > > > > > > > > > > On Mon, Jan 8, 2024 at 9:12 PM Ryanne Dolan <ryannedo...@gmail.com> > > > > wrote: > > > > > > > > > > > Jeroen, MirrorClient will correctly translate offsets for both > > > > failover and > > > > > > failback, exactly as you describe. It's possible to automate > > failover > > > > and > > > > > > failback using that logic. The integration tests automatically fail > > > > over > > > > > > and fail back, for example. I've seen it done two ways: during > > startup > > > > > > within the consumer itself, or in an external tool which writes > > offsets > > > > > > directly. In either case MirrorClient will give you the correct > > > > offsets to > > > > > > resume from. > > > > > > > > > > > > MirrorCheckpointConnector will automatically write offsets, but > > only > > > > under > > > > > > certain conditions, to avoid accidentally overwriting offsets. I'm > > not > > > > sure > > > > > > whether you can failover and failback using just the automatic > > > > behavior. My > > > > > > guess is it works, but you are tripping over one of the safety > > checks. > > > > You > > > > > > might try deleting the consumer group on the source cluster prior > > to > > > > > > failback. > > > > > > > > > > > > Ryanne > > > > > > > > > > > > On Mon, Jan 8, 2024, 9:10 AM Jeroen Schutrup > > > > <jer...@cloudflare.com.invalid > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi all, > > > > > > > I'm exploring using the MirrorSourceConnector and > > > > > > MirrorCheckpointConnector > > > > > > > on Kafka Connect to setup active/active replication between two > > Kafka > > > > > > > clusters. Using the DefaultReplicationPolicy replication policy > > > > class, > > > > > > > messages originating from the source cluster get replicated as > > > > expected > > > > > > to > > > > > > > the cluster-prefixed topic in the target cluster. Consumergroup > > > > offsets > > > > > > > from the source to target cluster are replicated likewise. > > However, > > > > once > > > > > > > the consumer group migrates from the source to the target > > cluster, > > > > its > > > > > > > offsets are not replicated from the target back to the source > > > > cluster. > > > > > > > For an active/active setup I'd want consumer group offsets for > > topic > > > > > > > <source-cluster-alias>.<topic-name> in the target cluster to be > > > > > > replicated > > > > > > > back to <topic-name> in the source cluster. This would allow > > > > consumers to > > > > > > > failover & failback between clusters with minimal duplicate > > message > > > > > > > consumption. > > > > > > > > > > > > > > To clarify my setup a bit; I'm running two single-broker Kafka > > > > clusters > > > > > > in > > > > > > > Docker (cluster A & B), along with a single Connect instance on > > which > > > > > > I've > > > > > > > provisioned four source connectors: > > > > > > > - A MirrorSourceConnector replicating topics from cluster A to > > > > cluster B > > > > > > > - A MirrorSourceConnector replicating topics from cluster B to > > > > cluster A > > > > > > > - A MirrorCheckpointConnector translating & replicating offsets > > from > > > > > > > cluster A to cluster B > > > > > > > - A MirrorCheckpointConnector translating & replicating offsets > > from > > > > > > > cluster B to cluster A > > > > > > > > > > > > > > I'm not sure whether this is by design, or maybe I'm missing > > > > something. > > > > > > > I've seen a similar question posted to KAFKA-9076 [1] without a > > > > > > resolution. > > > > > > > > > > > > > > Regards, > > > > > > > Jeroen > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-9076?focusedCommentId=17268908&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17268908 > > > > > > > > > > > > > > > > > > >