Hey Jeroen,

Thanks for sharing your prototype! It is very interesting!

> I couldn't reproduce your hypothesis.

I think my hypothesis was for another setup which didn't involve code
changes, and instead relied on A->B->A round trip replication to
produce the "backwards" offset syncs.
I believe this would replicate data from "replicate-me-0" to
"b.a.replicate-me-0", and then possibly take the offsets intended for
"b.a.replicate-me-0" and apply them to "replicate-me-0" creating the
infinite cycle.
I would not expect your implementation to suffer from this failure
mode, because it's using the offset in "replicate-me-0" as the
downstream offset, not the offset of "b.a.replicate-me-0".

With your prototype, do you experience "collisions" in the
offset-syncs topic? Since you're sharing a single offset-syncs topic
between both replication flows, I would expect offsets for topics with
the same names on both clusters to conflict, and cause the translation
to happen using the opposite topic's offsets.
It would also be visible in the state of the OffsetSyncStore here:
[1], you can compare the normal A->B behavior before and after
starting the B -> A source connector to see if the concurrent flows
causes more syncs to be cleared, or the wrong syncs to be present.

I think it is normal for every MM2 connector to have the same
offset-syncs.topic.location to avoid these sorts of conflicts, so that
each syncs topic is only used by one of the MM2 replication flows.
I think that turning on bidirectional offset syncs will probably
require a second producer in the MirrorSourceTask to contact the
opposite cluster, or a second admin client in the
MirrorCheckpointTask.

> Do you think it'd be worthwhile proceeding with this?

This is certainly a capability that MM2 is missing right now, and
seems like it would be a natural component of failing consumers back
and forth. If you see value in it, and are interested in driving the
feature, you can open a KIP [2] to discuss the interface and design
with the rest of the community.

Thanks!
Greg

[1] 
https://github.com/apache/kafka/blob/2c6fb6c54472e90ae17439e62540ef3cb0426fe3/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L194
[2] 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

On Thu, Jan 11, 2024 at 9:27 AM Jeroen Schutrup
<jer...@cloudflare.com.invalid> wrote:
>
> I see, makes complete sense to me. I built a custom version [1] based off
> of Kafka 3.5.1 with bidirectional offset replication enabled so I could do
> some more testing. Offset translation back upstream works well; I think
> because of the reason Ryanne pointed out, both topics contain identical
> data. Tested this by truncating the upstream topic before starting
> replication (so the downstream/upstream topics have different offsets).
> Truncating the upstream topic while replication is running neither results
> in any weirdness.
>
> > Before starting the replication, insert a few records into
> `a.replicate-me` to force replicate-me-0's offset n to replicate to
> a.replicate-me-0's offset n+k.
> I couldn't reproduce your hypothesis. After doing the above and then
> starting replication I didn't see any offset replication loops. Once I
> started producing data into the upstream topic and subscribing a
> console-consumer on the downstream topic, offsets were translated and
> replicated correctly back upstream. My guess is the CheckpointConnector can
> offset these surplus of messages as the actual log offsets of the
> downstream topic are written to the offset-sync topic.
>
> As this kind of active/active replication would be very beneficial to us
> for reasons stated in my previous message, we'd love to help out building
> this kind of offset replication into the Mirror connectors. I understand
> this is not something that should be enabled by default, but having it
> behind configuration toggle could help out users desiring a similar kind of
> active/active setup and who understand the restrictions. Do you think it'd
> be worthwhile proceeding with this?
>
> [1]
> https://github.com/jeroen92/kafka/commit/1a27696ec6777c230f100cf9887368c431ebe0f8
>
> On Thu, Jan 11, 2024 at 1:06 AM Greg Harris <greg.har...@aiven.io.invalid>
> wrote:
>
> > Hi Jeroen,
> >
> > I'm glad you're experimenting with MM2, and I hope we can give you
> > some more context to explain what you're seeing.
> >
> > > I wrote a small program to produce these offset syncs for the prefixed
> > > topic, and this successfully triggers the Checkpoint connector to start
> > > replicating the consumer offsets back to the primary cluster.
> >
> > This is interesting, and I wouldn't have expected it to work.
> >
> > To rewind, each flow Source->Target has a MirrorSourceConnector, an
> > Offset Syncs Topic, and a MirrorCheckpointConnector. With both
> > directions enabled, there are two separate flows each with Source,
> > Syncs topic, and Checkpoint.
> > With offset-syncs.topic.location=source, the
> > mm2-offset-syncs.b.internal on the A cluster is used for the A -> B
> > replication flow. It contains topic names from cluster A, and the
> > corresponding offsets those records were written to on the B cluster.
> > When translation is performed, the consumer groups from A are
> > replicated to the B cluster, and the replication mapping (cluster
> > prefix) is added.
> > Using your syncs topic as an example,
> > OffsetSync{topicPartition=replicate-me-0, upstreamOffset=28,
> > downstreamOffset=28} will be used to write offsets for
> > "a.replicate-me-0" for the equivalent group on the B cluster.
> >
> > When your artificial sync OffsetSync{topicPartition=a.replicate-me-0,
> > upstreamOffset=29, downstreamOffset=29} is processed, it should be
> > used to write offsets for "a.a.replicate-me-0" but it actually writes
> > offsets to "replicate-me-0" due to this function that I've never
> > encountered before: [1].
> > I think you could get those sorts of syncs into the syncs-topic if you
> > had A->B configured with offset-syncs.topic.location=source, and B->A
> > with offset-syncs-topic.location=target, and configured the topic
> > filter to do A -> B -> A round trip replication.
> >
> > This appears to work as expected if there are no failures or restarts,
> > but as soon as a record is re-delivered in either flow, I think the
> > offsets should end up constantly advancing in an infinite loop. Maybe
> > you can try that: Before starting the replication, insert a few
> > records into `a.replicate-me` to force replicate-me-0's offset n to
> > replicate to a.replicate-me-0's offset n+k.
> >
> > Ryanne, do you recall the purpose of the renameTopicPartition
> > function? To me it looks like it could only be harmful, as it renames
> > checkpoints to target topics that MirrorMaker2 isn't writing. It also
> > looks like it isn't active in a typical MM2 setup.
> >
> > Thanks!
> > Greg
> >
> > [1]:
> > https://github.com/apache/kafka/blob/13a83d58f897de2f55d8d3342ffb058b230a9183/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L257-L267
> >
> > On Tue, Jan 9, 2024 at 5:54 AM Jeroen Schutrup
> > <jer...@cloudflare.com.invalid> wrote:
> > >
> > > Thank you both for your swift responses!
> > >
> > > Ryanne, the MirrorConnectorsIntegrationBaseTest only tests offset
> > > replication in cases where the producer migrated to the secondary cluster
> > > as well, starts feeding messages into the non-prefixed topic which are
> > > subsequently consumed by the consumer on the secondary cluster. After the
> > > fallback, it asserts the consumer offsets on the non-prefixed topic in
> > the
> > > secondary cluster are translated and replicated to the consumer offsets
> > of
> > > the prefixed topic in the primary cluster.
> > > In my example, the producer keeps producing in the primary cluster
> > whereas
> > > only the consumer fails over to the secondary cluster and, after some
> > time
> > > fails back to the primary cluster. This consumer will then consume
> > messages
> > > from the prefixed topic in the secondary cluster, and I'd like to have
> > > those offsets replicated back to the non-prefixed topic in the primary
> > > cluster. If you like I can provide an illustration if that helps to
> > clarify
> > > this use case.
> > >
> > > To add some context on why I'd like to have this is to retain loose
> > > coupling between producers and consumers so we're able to test failovers
> > > for individual applications without the need for all producers/consumers
> > to
> > > failover and failback at once.
> > >
> > > Digging through the Connect debug logs I found the offset-syncs of the
> > > prefixed topic not being pushed to mm2-offset-syncs.b.internal is likely
> > > the reason the checkpoint connector doesn't replicate consumer offsets:
> > > DEBUG translateDownstream(replication,a.replicate-me-0,25): Skipped
> > (offset
> > > sync not found) (org.apache.kafka.connect.mirror.OffsetSyncStore)
> > >
> > > I wrote a small program to produce these offset syncs for the prefixed
> > > topic, and this successfully triggers the Checkpoint connector to start
> > > replicating the consumer offsets back to the primary cluster.
> > > OffsetSync{topicPartition=replicate-me-0, upstreamOffset=28,
> > > downstreamOffset=28}
> > > OffsetSync{topicPartition=replicate-me-0, upstreamOffset=29,
> > > downstreamOffset=29}
> > > OffsetSync{topicPartition=a.replicate-me-0, upstreamOffset=29,
> > > downstreamOffset=29} <-- the artificially generated offset-sync
> > >
> > > At this point it goes a bit beyond my understanding of the MM2 internals
> > of
> > > whether this is a wise thing to do and if it would have any negative side
> > > effects. I'd need to spend some more time in the MM2 source, though I
> > > welcome any feedback on this hack :-)
> > >
> > > On the two complications you're mentioning Greg, the second one is
> > > something we should figure out regardless, as any given consumer group
> > may
> > > not be active on both the primary and secondary cluster as it would block
> > > MM2 from replicating its offsets from primary to the cluster-prefixed
> > topic
> > > on the secondary cluster already. On the first point, I think it would
> > be a
> > > good practice to only allow MM2 to produce to any cluster-prefixed topic
> > by
> > > using topic ACLs. In other words, the only application producing to a
> > > cluster-prefixed (or downstream) topic would be mirror maker and I think
> > > that prevents this kind of message 'drift'. In case a producer has to
> > > failover, it starts producing to the non-prefixed topic on the secondary
> > > cluster whose offsets are subject to a different Source/Checkpoint
> > > connector replication stream.
> > >
> > > On Mon, Jan 8, 2024 at 9:12 PM Ryanne Dolan <ryannedo...@gmail.com>
> > wrote:
> > >
> > > > Jeroen, MirrorClient will correctly translate offsets for both
> > failover and
> > > > failback, exactly as you describe. It's possible to automate failover
> > and
> > > > failback using that logic. The integration tests automatically fail
> > over
> > > > and fail back, for example. I've seen it done two ways: during startup
> > > > within the consumer itself, or in an external tool which writes offsets
> > > > directly. In either case MirrorClient will give you the correct
> > offsets to
> > > > resume from.
> > > >
> > > > MirrorCheckpointConnector will automatically write offsets, but only
> > under
> > > > certain conditions, to avoid accidentally overwriting offsets. I'm not
> > sure
> > > > whether you can failover and failback using just the automatic
> > behavior. My
> > > > guess is it works, but you are tripping over one of the safety checks.
> > You
> > > > might try deleting the consumer group on the source cluster prior to
> > > > failback.
> > > >
> > > > Ryanne
> > > >
> > > > On Mon, Jan 8, 2024, 9:10 AM Jeroen Schutrup
> > <jer...@cloudflare.com.invalid
> > > > >
> > > > wrote:
> > > >
> > > > > Hi all,
> > > > > I'm exploring using the MirrorSourceConnector and
> > > > MirrorCheckpointConnector
> > > > > on Kafka Connect to setup active/active replication between two Kafka
> > > > > clusters. Using the DefaultReplicationPolicy replication policy
> > class,
> > > > > messages originating from the source cluster get replicated as
> > expected
> > > > to
> > > > > the cluster-prefixed topic in the target cluster. Consumergroup
> > offsets
> > > > > from the source to target cluster are replicated likewise. However,
> > once
> > > > > the consumer group migrates from the source to the target cluster,
> > its
> > > > > offsets are not replicated from the target back to the source
> > cluster.
> > > > > For an active/active setup I'd want consumer group offsets for topic
> > > > > <source-cluster-alias>.<topic-name> in the target cluster to be
> > > > replicated
> > > > > back to <topic-name> in the source cluster. This would allow
> > consumers to
> > > > > failover & failback between clusters with minimal duplicate message
> > > > > consumption.
> > > > >
> > > > > To clarify my setup a bit; I'm running two single-broker Kafka
> > clusters
> > > > in
> > > > > Docker (cluster A & B), along with a single Connect instance on which
> > > > I've
> > > > > provisioned four source connectors:
> > > > > - A MirrorSourceConnector replicating topics from cluster A to
> > cluster B
> > > > > - A MirrorSourceConnector replicating topics from cluster B to
> > cluster A
> > > > > - A MirrorCheckpointConnector translating & replicating offsets from
> > > > > cluster A to cluster B
> > > > > - A MirrorCheckpointConnector translating & replicating offsets from
> > > > > cluster B to cluster A
> > > > >
> > > > > I'm not sure whether this is by design, or maybe I'm missing
> > something.
> > > > > I've seen a similar question posted to KAFKA-9076 [1] without a
> > > > resolution.
> > > > >
> > > > > Regards,
> > > > > Jeroen
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > https://issues.apache.org/jira/browse/KAFKA-9076?focusedCommentId=17268908&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17268908
> > > > >
> > > >
> >

Reply via email to