Hey Alan,

I'm running into the same issue as you and I believe I've figured it out.

I noticed that consumer partitions on the destination cluster that have a
LOG-END-OFFSET=0 all exhibit this issue. It looks like Mirrormaker's offset
sync does not work correctly if the partition is empty. Instead of setting
the destination's offset to 0 for these empty partitions, it sets
destination's offset to the same numerical value as the source's offset.

If you now send a message to these partitions, making them non-empty, the
offset will still not update until the "correct" offset is greater than the
offset on the partition. This is likely why you are also seeing negative
lag on some partitions with LOG-END-OFFSET>0. The only workaround I've
found is to make these partitions non-empty, then delete the consumer group
on the destination cluster and let Mirrormaker resync it.

Hope this helps,
Frank

On 2021/03/15 21:59:03, Alan Ning <a...@gmail.com> wrote:
> I am running MirrorMaker 2 (Kafka 2.7), trying to migrate all topics
from>
> one cluster to another while preserving through>
> `sync.group.offsets.enabled=true`. My source cluster is running Kafka
0.10,>
> while the target cluster is running 2.6.1.>
>
> While I can see data being replicated, the data on the replicated
Consumer>
> Group in the target cluster looks wrong. The lag values of the
replicated>
> Consumer Group are large negative values, and the LOG-END-OFFSET are
mostly>
> 0. I determined this information from kafka-consumer-groups.sh.>
>
> I checked the kafka_consumer_consumer_fetch_manager_metrics_records_lag
JMX>
> metrics in MM2 and the reported lag is zero for all partitions.>
>
> By using `sync.group.offsets.enabled=true`, I envisioned that MM2 will>
> automatically replicate and sync all Consumer Groups with a meaningful>
> offset in the target cluster. Am I misunderstanding how MM2 is supposed
to>
> work?>
>
> Here is my mm2.properties and the CG details.>
>
> # mm2.properties>
> ```>
> clusters = src, dst>
> src.bootstrap.servers = 10.0.0.1:9092>
> dst.bootstrap.servers = 10.0.0.2:9092>
> src->dst.enabled = true>
> src->dst.topics = compute.*>
> src->dst.offset.flush.timeout.ms=60000>
> src->dst.buffer.memory=10000>
> dst->src.enabled = true>
> dst->src.topics = .*>
> replication.factor=3>
> src->dst.sync.group.offsets.enabled = true>
> src->dst.emit.checkpoints.enabled = true>
> src->dst.consumer.auto.offset.reset=latest>
> consumer.auto.offset.reset = latest>
> auto.offset.reset = latest>
> replication.policy.class =>
> com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy>
> checkpoints.topic.replication.factor=3>
> heartbeats.topic.replication.factor=3>
> offset-syncs.topic.replication.factor=3>
> offset.storage.replication.factor=3>
> status.storage.replication.factor=3>
> config.storage.replication.factor=3>
> sync.topic.acls.enabled = false>
> sync.group.offsets.enabled = true>
> emit.checkpoints.enabled = true>
> tasks.max = 8>
> dst.producer.offset.flush.timeout.ms = 60000>
> dst.offset.flush.timeout.ms = 60000>
> ```>
>
> Consumer Group details>
> ```>
> GROUP                                         TOPIC>
>             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG>
> CONSUMER-ID     HOST            CLIENT-ID>
> kafka-group-Compute-Requests Compute-Requests 57         5305947
0>
>               -5305947        -               -               ->
> kafka-group-Compute-Requests Compute-Requests 20         5164205
0>
>               -5164205        -               -               ->
> kafka-group-Compute-Requests Compute-Requests 53         4208527
0>
>               -4208527        -               -               ->
> kafka-group-Compute-Requests Compute-Requests 82         5247928
0>
>               -5247928        -               -               ->
> kafka-group-Compute-Requests Compute-Requests 65         5574520
0>
>               -5574520        -               -               ->
> kafka-group-Compute-Requests Compute-Requests 11         5190708>
> 209             -5190499        -               -               ->
> ```>
>
> Thanks>
>
> ... Alan>
>

Reply via email to