bseenu commented on a change in pull request #7577: URL: https://github.com/apache/kafka/pull/7577#discussion_r419913034
########## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java ########## @@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) { Checkpoint.unwrapGroup(record.sourcePartition()), System.currentTimeMillis() - record.timestamp()); } + + private void refreshIdleConsumerGroupOffset() { + Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc = targetAdminClient + .describeConsumerGroups(consumerGroups).describedGroups(); + + for (String group : consumerGroups) { + try { + ConsumerGroupDescription consumerGroupDesc = consumerGroupsDesc.get(group).get(); + ConsumerGroupState consumerGroupState = consumerGroupDesc.state(); + // sync offset to the target cluster only if the state of current consumer group is: + // (1) idle: because the consumer at target is not actively consuming the mirrored topic + // (2) dead: the new consumer that is recently created at source and never exist at target + if (consumerGroupState.equals(ConsumerGroupState.EMPTY)) { + idleConsumerGroupsOffset.put(group, targetAdminClient.listConsumerGroupOffsets(group) + .partitionsToOffsetAndMetadata().get().entrySet()); + } else if (consumerGroupState.equals(ConsumerGroupState.DEAD)) { + newConsumerGroup.add(group); + } + } catch (InterruptedException | ExecutionException e) { + log.error("Error querying for consumer group {} on cluster {}.", group, targetClusterAlias, e); + } + } + } + + Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() { + Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetToSyncAll = new HashMap<>(); + + // first, sync offsets for the idle consumers at target + for (Map.Entry<String, Set<Map.Entry<TopicPartition, OffsetAndMetadata>>> group : idleConsumerGroupsOffset.entrySet()) { + String consumerGroupId = group.getKey(); + // for each idle consumer at target, read the checkpoints (converted upstream offset) + // from the pre-populated map + Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = getConvertedUpstreamOffset(consumerGroupId); + + if (convertedUpstreamOffset == null) continue; + + Map<TopicPartition, OffsetAndMetadata> offsetToSync = new HashMap<>(); + for (Entry<TopicPartition, OffsetAndMetadata> entry : group.getValue()) { + long latestDownstreamOffset = entry.getValue().offset(); + TopicPartition topicPartition = entry.getKey(); + if (!convertedUpstreamOffset.containsKey(topicPartition)) { + log.trace("convertedUpstreamOffset does not contain TopicPartition: {}", topicPartition.toString()); + continue; + } + + // if translated offset from upstream is smaller than the current consumer offset + // in the target, skip updating the offset for that partition + long convertedOffset = convertedUpstreamOffset.get(topicPartition).offset(); + if (latestDownstreamOffset >= convertedOffset) { + log.trace("latestDownstreamOffset {} is larger than convertedUpstreamOffset {} for " + + "TopicPartition {}", latestDownstreamOffset, convertedOffset, topicPartition); + continue; + } Review comment: I would like to propose the following changes to sync the consumer group changes on source side ```suggestion for (Map.Entry<TopicPartition, OffsetAndMetadata> convertedEntry : convertedUpstreamOffset.entrySet()) { TopicPartition topicPartition = convertedEntry.getKey(); for (Entry<TopicPartition, OffsetAndMetadata> idleEntry : group.getValue()) { if (idleEntry.getKey() == topicPartition) { long latestDownstreamOffset = idleEntry.getValue().offset(); // if translated offset from upstream is smaller than the current consumer offset // in the target, skip updating the offset for that partition long convertedOffset = convertedUpstreamOffset.get(topicPartition).offset(); if (latestDownstreamOffset >= convertedOffset) { log.trace("latestDownstreamOffset {} is larger than convertedUpstreamOffset {} for " + "TopicPartition {}", latestDownstreamOffset, convertedOffset, topicPartition); continue; } } } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org