bseenu commented on a change in pull request #7577: URL: https://github.com/apache/kafka/pull/7577#discussion_r416436144
########## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java ########## @@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) { Checkpoint.unwrapGroup(record.sourcePartition()), System.currentTimeMillis() - record.timestamp()); } + + private void refreshIdleConsumerGroupOffset() { + Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc = targetAdminClient + .describeConsumerGroups(consumerGroups).describedGroups(); + + for (String group : consumerGroups) { + try { + if (consumerGroupsDesc.get(group) == null) { + // if consumerGroupsDesc does not contain this group, it should be the new consumer + // group created at source cluster and its offsets should be sync-ed to target + newConsumerGroup.add(group); + continue; + } + ConsumerGroupDescription consumerGroupDesc = consumerGroupsDesc.get(group).get(); + // sync offset to the target cluster only if the state of current consumer group is idle or dead + ConsumerGroupState consumerGroupState = consumerGroupDesc.state(); + if (consumerGroupState.equals(ConsumerGroupState.EMPTY) || consumerGroupState.equals(ConsumerGroupState.DEAD)) { + idleConsumerGroupsOffset.put(group, targetAdminClient.listConsumerGroupOffsets(group) + .partitionsToOffsetAndMetadata().get().entrySet()); + } + } catch (InterruptedException | ExecutionException e) { + log.error("Error querying for consumer group {} on cluster {}.", group, targetClusterAlias, e); + } + } + } + + Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() { + Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetToSyncAll = new HashMap<>(); + + // first, sync offsets for the idle consumers at target + for (Map.Entry<String, Set<Map.Entry<TopicPartition, OffsetAndMetadata>>> group : idleConsumerGroupsOffset.entrySet()) { + String consumerGroupId = group.getKey(); + // for each idle consumer at target, read the checkpoints (converted upstream offset) + // from the pre-populated map + Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = getConvertedUpstreamOffset(consumerGroupId); + + if (convertedUpstreamOffset == null) continue; + + Map<TopicPartition, OffsetAndMetadata> offsetToSync = new HashMap<>(); + for (Entry<TopicPartition, OffsetAndMetadata> entry : group.getValue()) { Review comment: @thspinto I think i am running into the same issue which you pointed here , so the source consumer group has different topic and is active and the target consumer group is idle but having different topic, but the code only checks for the topics and partitions matching the target site and adds them only when the target offset is less than source, it ignores other topics at the source ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org