[ 
https://issues.apache.org/jira/browse/KAFKA-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhaoli updated KAFKA-14807:
---------------------------
    Description: 
We use MirrorMaker2 to replicate messages and consumer group offsets from the 
Kafka cluster `source` to cluster `target`.
To reduce the load on the source cluster, we add this configuration to mm2 to 
avoid replicating the whole history messages:
{code:java}
source.consumer.auto.offset.reset=latest {code}
After that, we found part of the consumer group offsets had stopped replicating.
The common characteristic of these consumer groups is their EMPTY status, which 
means they have no active members at that moment. All the active consumer 
groups‘ offset replication work as normal.
After researching the source code, we found this is because the configuration 
above also affects the consumption of topic `mm2-offset-syncs`, therefore the 
map `offsetSyncs` doesn't hold the whole topic partitions:
{code:java}
private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>(); 
{code}
And the lost topicPartitions lead to the pause of replication of the EMPTY 
consumer groups, which is not expected.
{code:java}
OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long 
upstreamOffset) {
    Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition);
    if (offsetSync.isPresent()) {
        if (offsetSync.get().upstreamOffset() > upstreamOffset) {
            // Offset is too far in the past to translate accurately
            return OptionalLong.of(-1L);
        }
        long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset();
        return OptionalLong.of(offsetSync.get().downstreamOffset() + 
upstreamStep);
    } else {
        return OptionalLong.empty();
    }
}{code}
 

  was:
We use MirrorMaker2 to replicate messages and consumergroup offsets from kafka 
cluster `source` to cluster `target`.
In order to reduce the load on the source cluster, we add this configuration to 
mm2 to avoid replicate the whole history messages:
{code:java}
source.consumer.auto.offset.reset=latest {code}
After that, we found part of the consumergroup offsets have stopped replicating.

The common characteristic of these consumergroups is  their EMPTY status,which 
means they have no active members at that monent. All the active 
consumergroups‘ offset replication work as normal.

After researching the source code,we found this is because the configuration 
above also affect the consumption of topic `mm2-offset-syncs`, therefore the 
map `offsetSyncs` dosen't hold the whole topicPartitions:
{code:java}
private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>(); 
{code}
And the lost topicPartitions lead to the pause of replication of the EMPTY 
consumer groups, which is not expected.
{code:java}
OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long 
upstreamOffset) {
    Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition);
    if (offsetSync.isPresent()) {
        if (offsetSync.get().upstreamOffset() > upstreamOffset) {
            // Offset is too far in the past to translate accurately
            return OptionalLong.of(-1L);
        }
        long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset();
        return OptionalLong.of(offsetSync.get().downstreamOffset() + 
upstreamStep);
    } else {
        return OptionalLong.empty();
    }
}{code}
 


> MirrorMaker2 config source.consumer.auto.offset.reset=latest leading to the 
> pause of replication of consumer groups
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14807
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14807
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 3.4.0, 3.3.1, 3.3.2
>         Environment: centos7
>            Reporter: Zhaoli
>            Priority: Major
>
> We use MirrorMaker2 to replicate messages and consumer group offsets from the 
> Kafka cluster `source` to cluster `target`.
> To reduce the load on the source cluster, we add this configuration to mm2 to 
> avoid replicating the whole history messages:
> {code:java}
> source.consumer.auto.offset.reset=latest {code}
> After that, we found part of the consumer group offsets had stopped 
> replicating.
> The common characteristic of these consumer groups is their EMPTY status, 
> which means they have no active members at that moment. All the active 
> consumer groups‘ offset replication work as normal.
> After researching the source code, we found this is because the configuration 
> above also affects the consumption of topic `mm2-offset-syncs`, therefore the 
> map `offsetSyncs` doesn't hold the whole topic partitions:
> {code:java}
> private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>(); 
> {code}
> And the lost topicPartitions lead to the pause of replication of the EMPTY 
> consumer groups, which is not expected.
> {code:java}
> OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long 
> upstreamOffset) {
>     Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition);
>     if (offsetSync.isPresent()) {
>         if (offsetSync.get().upstreamOffset() > upstreamOffset) {
>             // Offset is too far in the past to translate accurately
>             return OptionalLong.of(-1L);
>         }
>         long upstreamStep = upstreamOffset - 
> offsetSync.get().upstreamOffset();
>         return OptionalLong.of(offsetSync.get().downstreamOffset() + 
> upstreamStep);
>     } else {
>         return OptionalLong.empty();
>     }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to