[ 
https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17740296#comment-17740296
 ] 

Greg Harris commented on KAFKA-15144:
-------------------------------------

[~ecomar] Yes this was changed recently as part of KAFKA-12468 .

The Checkpoint logic (and consumer group sync) are much more conservative now, 
in order to prevent negative lag and data loss scenarios.
We decided that these data loss scenarios were significant enough that the 
default behavior should change, rather than being a configurable opt-out.
Unfortunately it comes with the side-effect that there are strictly fewer 
checkpoints emitted, and that the downstream consumer lag may be greater than 
the previous implementation, causing more reprocessing of records downstream.

To explain the odd behavior you're seeing in the logs:

> Checkpoint{consumerGroupId=edogroup, 
>topicPartition=source.vf-mirroring-test-edo-0, {*}upstreamOffset=30, 
>downstreamOffset=29{*}, metadata=}
> I'd expect the upstreamOffset to be always the same as the upstreamOffset

The reason that the downstream offset was translated as `29` is because at the 
time that checkpoint was emitted, the sync `28:28` had been present, but 
`30:30` was not.
The offset translation logic emits either the downstream offset (28) if the 
upstream offsets match (28 = 28), or downstream + 1 (29) if the consumer group 
is ahead of the sync (30 > 28).
Similarly when the checkpoint `227:203` is emitted, the sync `202:202` was 
present, but that was the latest sync, so it translated it as 202 + 1 = 203.
And for the title issue, `0:0` is present, so it translates any `offset > 0` to 
`0 + 1 = 1`.

See the logic for this here: 
https://github.com/apache/kafka/blob/98fbd8afc7f3ba806d742690536090936738f1e7/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L148-L166

> MM2 Checkpoint downstreamOffset stuck to 1
> ------------------------------------------
>
>                 Key: KAFKA-15144
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15144
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>            Reporter: Edoardo Comar
>            Assignee: Edoardo Comar
>            Priority: Major
>         Attachments: edo-connect-mirror-maker-sourcetarget.properties
>
>
> Steps to reproduce :
> 1.Start the source cluster
> 2.Start the target cluster
> 3.Start connect-mirror-maker.sh using a config like the attached
> 4.Create a topic in source cluster
> 5.produce a few messages
> 6.consume them all with autocommit enabled
>  
> 7. then dump the Checkpoint topic content e.g.
> {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic 
> source.checkpoints.internal --from-beginning --formatter 
> org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}}
> {{{}Checkpoint{consumerGroupId=edogroup, 
> topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, 
> {*}downstreamOffset=1{*}, metadata={}}}}
>  
> the downstreamOffset remains at 1, while, in a fresh cluster pair like with 
> the source topic created while MM2 is running, 
> I'd expect the downstreamOffset to match the upstreamOffset.
> Note that dumping the offset sync topic, shows matching initial offsets
> {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic 
> mm2-offset-syncs.source.internal --from-beginning --formatter 
> org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}}
> {{{}OffsetSync{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, 
> downstreamOffset=0{}}}}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to