[ 
https://issues.apache.org/jira/browse/KAFKA-16798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848404#comment-17848404
 ] 

Greg Harris commented on KAFKA-16798:
-------------------------------------

Hi [~sektor.coder] thanks for the ticket!

The log message that you're looking at is in WorkerSourceTask: 
[https://github.com/apache/kafka/blob/7de58f7359a6c255701107d0078d359047c31457/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L235]
It should be printed every offset.flush.interval.ms: 
[https://github.com/apache/kafka/blob/7de58f7359a6c255701107d0078d359047c31457/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L75-L78]
 
That periodic operation is only for writing the task "source offsets", which 
are used to store the replication progress. It is completely separate from the 
consumer group sync feature that it sounds like you're trying to configure.


The analogous log message for that is this one in the MirrorCheckpointTask: 
[https://github.com/apache/kafka/blob/7de58f7359a6c255701107d0078d359047c31457/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L387]
 

If you're not seeing that log message, you might look at the OffsetSyncStore: 
[https://github.com/apache/kafka/blob/7de58f7359a6c255701107d0078d359047c31457/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L113-L154]
 and in MirrorCheckpointTask: 
[https://github.com/apache/kafka/blob/7de58f7359a6c255701107d0078d359047c31457/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java#L197-L224]
 for reasons why the offset translation/syncing might not be happening as you 
expect.

> Mirrormaker2 dedicated mode - sync.group.offsets.interval not working
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-16798
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16798
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 3.7.0
>            Reporter: Thanos Athanasopoulos
>            Priority: Major
>
> Single instance MirrorMaker2 in dedicated mode, active passive replication 
> logic.
> sync.group.offsets.interval.seconds=2 configuration is enabled and active
> {noformat}
> [root@xxxxx-xxxxxxxxx ~]# docker logs cc-mm 2>&1 -f | grep -i 
> "auto.commit.interval\|checkpoint.interval\|consumer.commit.interval\|sync.topics.interval\|sync.group.offsets.interval\|offset-syncs.interval.seconds
>                                  "
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
>         auto.commit.interval.ms = 5000
>         sync.group.offsets.interval.seconds = 2
>         sync.group.offsets.interval.seconds = 2
>         auto.commit.interval.ms = 5000
> {noformat}
> but is not working, the commit of offsets happens *always every 60 seconds* 
> as you can see in the logs
> {noformat}
> [2024-05-20 09:32:44,847] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,852] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,875] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,881] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:32:44,890] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,850] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 21 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,854] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,878] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,883] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 2 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:33:44,895] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:34:44,852] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:34:44,857] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:34:44,880] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:34:44,886] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:34:44,897] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:35:44,855] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:35:44,860] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:35:44,883] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:35:44,888] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:35:44,901] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:36:44,860] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:36:44,863] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:36:44,886] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:36:44,893] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:36:44,904] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:37:44,863] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 21 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:37:44,866] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:37:44,888] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:37:44,895] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:37:44,906] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:38:44,866] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:38:44,869] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:38:44,890] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:38:44,898] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 2 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:38:44,910] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:39:44,869] INFO [MirrorSourceConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 20 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:39:44,872] INFO [MirrorSourceConnector|task-1|offsets] 
> WorkerSourceTask{id=MirrorSourceConnector-1} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:39:44,892] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:39:44,900] INFO [MirrorCheckpointConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets for 1 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> [2024-05-20 09:39:44,915] INFO [MirrorHeartbeatConnector|task-0|offsets] 
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 12 
> acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
> {noformat}
>  
> MM2 Configuration:
>  
> {noformat}
> clusters = cl01,cl02
> cl01.bootstrap.servers = 
> cl01-kafka-01:9092,cl01-kafka-02:9092,cl01-kafka-03:9092
> cl02.bootstrap.servers = 
> cl02-kafka-01:9092,cl02-kafka-02:9092,cl02-kafka-03:9092
> ##############################
> ### First cluster config
> ##############################
> # source.cluster.alias = cl01
> # target.cluster.alias = cl02
> cl01->cl02.enabled=true
> cl01->cl02.enable.auto.commit=true
> cl01->cl02.sync.group.offsets.enabled=true
> cl01->cl02.refresh.groups.enabled=true
> cl01->cl02.refresh.topics.enabled=true
> cl01->cl02.sync.topics.enabled=true
> cl01->cl02.sync.topic.acls.enabled=false
> cl01->cl02.sync.acls.enabled=false
> cl01->cl02.emit.heartbeats.enabled=true
> cl01->cl02.emit.checkpoints.enabled=true
> cl01->cl02.sync.topic.configs.enabled=true
> cl01->cl02.emit.offset-syncs.enabled=true
> cl01.consumer.auto.offset.reset=earliest
> ##############################
> ### Second cluster (DR) config
> ##############################
> # source.cluster.alias = cl01
> # target.cluster.alias = cl02
> cl02->cl01.enabled=false
> cl02->cl01.enable.auto.commit=false
> cl02->cl01.emit.offset-syncs.enabled=false
> cl02->cl01.sync.group.offsets.enabled=false
> cl02->cl01.sync.topic.acls.enabled=false
> cl02->cl01.sync.acls.enabled=false
> cl02->cl01.refresh.groups.enabled=true
> cl02->cl01.refresh.topics.enabled=true
> cl02->cl01.sync.topics.enabled=true
> cl02->cl01.emit.heartbeats.enabled=true
> cl02->cl01.emit.checkpoints.enabled=true
> cl02->cl01.sync.topic.configs.enabled=true
> #cl02.consumer.auto.offset.reset=earliest
> ##############################
> ### CC Configs
> ##############################
> group.id=crosscluster-consumer-group
> client.id=crosscluster-clientgroups=.*
> #groups='.*cc-consumer-group.*'
> offset-syncs.topic.whitelist='cl.*\.test-topic'
> #topics='cl.*\.test-topic'
> topics.blacklist='[.*[\-\.]internal, .*\.replica, __.*]'
> key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
> value.converter = 
> org.apache.kafka.connect.converters.ByteArrayConverteremit.heartbeats.interval.seconds=5
> emit.checkpoints.interval.seconds=5
> emit.offset-syncs.interval.seconds=2
> auto.commit.interval.seconds=5
> checkpoint.interval.seconds=5
> consumer.commit.interval.seconds=5
> offset.flush.interval.seconds=5
> sync.topics.interval.seconds=2
> sync.group.offsets.interval.seconds=2
> sync.acls.interval.seconds=5
> refresh.groups.interval.seconds=5
> refresh.topics.interval.seconds=5
> max.poll.interval.seconds=5
> heartbeats.topic.retention.hours=1
> checkpoints.topic.retention.hours=1
> offset.syncs.topic.retention.hours=1
> errors.log.enable=true
> errors.log.include.messages=true
> dedicated.mode.enable.internal.rest=true
> cluster.producer.enable.idempotence=true
> allow.auto.create.topics=truetasks.max=48
> replication.factor=1
> config.storage.replication.factor=1
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> offset-syncs.topic.replication.factor=3
> heartbeats.topic.replication.factor=1
> checkpoints.topic.replication.factor=1
> offsets.storage.replication.factor=1
> {noformat}
>  
> I see that {color:#cccccc}*offset.flush.interval.seconds* is always 60000 
> whatever change I make in the MM2 properties file and II suspect that this is 
> the culprit but needs some more investigation.{color}
> {noformat}
> docker logs cc-mm 2>&1 -f | grep -i "offset.flush.interval"
>         offset.flush.interval.ms = 60000
>         offset.flush.interval.ms = 60000
>         offset.flush.interval.ms = 60000
>         offset.flush.interval.ms = 60000
> {noformat}
> {color:#cccccc} 
> The docker-compose service spec{color}
> {noformat}
>   cc-mm:
>     image: 'bitnami/kafka:3.7.0'
>     container_name: cc-mm
>     hostname: cc-mm
>     ports:
>       - "9912:8083"
>     command: /opt/bitnami/kafka/bin/connect-mirror-maker.sh 
> /opt/bitnami/kafka/config/mm2.properties
>     environment:
>       <<: *kafka-cc-env-common
>     volumes:
>       - ./files/configs-compose/jmx-exporter:/opt/jmx-exporter
>       - 
> ./files/configs-compose/mm2/single-cluster/cc-single-mm.properties:/opt/bitnami/kafka/config/mm2.properties
>       - 
> ./files/configs-compose/kafka/log4j.properties:/opt/bitnami/kafka/config/log4j.properties:ro
>     networks:
>       - kafka-1
>       - kafka-2
>     depends_on:
>       cl01-kafka-01:
>         condition: service_healthy
>       cl01-kafka-02:
>         condition: service_healthy
>       cl01-kafka-03:
>         condition: service_healthy
>       cl02-kafka-01:
>         condition: service_healthy
>       cl02-kafka-02:
>         condition: service_healthy
>       cl02-kafka-03:
>         condition: service_healthy
>         {noformat}
> {color:#cccccc} {color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to