[ https://issues.apache.org/jira/browse/KAFKA-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17714097#comment-17714097 ]
Daniel Urban commented on KAFKA-14807: -------------------------------------- [~fisher91] do you use MM2 dedicated mode, or use the Connectors directly in a Connect cluster? If the latter, you can fix this by only passing source.consumer.auto.offset.reset=latest to MirrorSourceConnector, but not to the MirrorCheckpointConnector. For MM2 dedicated mode, I'm not aware of any workarounds. > MirrorMaker2 config source.consumer.auto.offset.reset=latest leading to the > pause of replication of consumer groups > ------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-14807 > URL: https://issues.apache.org/jira/browse/KAFKA-14807 > Project: Kafka > Issue Type: Bug > Components: mirrormaker > Affects Versions: 3.4.0, 3.3.1, 3.3.2 > Environment: centos7 > Reporter: Zhaoli > Priority: Major > > We use MirrorMaker2 to replicate messages and consumer group offsets from the > Kafka cluster `source` to cluster `target`. > To reduce the load on the source cluster, we add this configuration to mm2 to > avoid replicating the whole history messages: > {code:java} > source.consumer.auto.offset.reset=latest {code} > After that, we found part of the consumer group offsets had stopped > replicating. > The common characteristic of these consumer groups is their EMPTY status, > which means they have no active members at that moment. All the active > consumer groups‘ offset replication work as normal. > After researching the source code, we found this is because the configuration > above also affects the consumption of topic `mm2-offset-syncs`, therefore the > map `offsetSyncs` doesn't hold the whole topic partitions: > {code:java} > private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>(); > {code} > And the lost topicPartitions lead to the pause of replication of the EMPTY > consumer groups, which is not expected. > {code:java} > OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long > upstreamOffset) { > Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition); > if (offsetSync.isPresent()) { > if (offsetSync.get().upstreamOffset() > upstreamOffset) { > // Offset is too far in the past to translate accurately > return OptionalLong.of(-1L); > } > long upstreamStep = upstreamOffset - > offsetSync.get().upstreamOffset(); > return OptionalLong.of(offsetSync.get().downstreamOffset() + > upstreamStep); > } else { > return OptionalLong.empty(); > } > }{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)