bseenu commented on a change in pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#discussion_r419913034



##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) {
             Checkpoint.unwrapGroup(record.sourcePartition()),
             System.currentTimeMillis() - record.timestamp());
     }
+
+    private void refreshIdleConsumerGroupOffset() {
+        Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc 
= targetAdminClient
+            .describeConsumerGroups(consumerGroups).describedGroups();
+
+        for (String group : consumerGroups) {
+            try {
+                ConsumerGroupDescription consumerGroupDesc = 
consumerGroupsDesc.get(group).get();
+                ConsumerGroupState consumerGroupState = 
consumerGroupDesc.state();
+                // sync offset to the target cluster only if the state of 
current consumer group is:
+                // (1) idle: because the consumer at target is not actively 
consuming the mirrored topic
+                // (2) dead: the new consumer that is recently created at 
source and never exist at target
+                if (consumerGroupState.equals(ConsumerGroupState.EMPTY)) {
+                    idleConsumerGroupsOffset.put(group, 
targetAdminClient.listConsumerGroupOffsets(group)
+                        .partitionsToOffsetAndMetadata().get().entrySet());
+                } else if (consumerGroupState.equals(ConsumerGroupState.DEAD)) 
{
+                    newConsumerGroup.add(group);
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Error querying for consumer group {} on cluster 
{}.", group, targetClusterAlias, e);
+            }
+        }
+    }
+
+    Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() {
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetToSyncAll = 
new HashMap<>();
+
+        // first, sync offsets for the idle consumers at target
+        for (Map.Entry<String, Set<Map.Entry<TopicPartition, 
OffsetAndMetadata>>> group : idleConsumerGroupsOffset.entrySet()) {
+            String consumerGroupId = group.getKey();
+            // for each idle consumer at target, read the checkpoints 
(converted upstream offset)
+            // from the pre-populated map
+            Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = 
getConvertedUpstreamOffset(consumerGroupId);
+
+            if (convertedUpstreamOffset == null) continue;
+
+            Map<TopicPartition, OffsetAndMetadata> offsetToSync = new 
HashMap<>();
+            for (Entry<TopicPartition, OffsetAndMetadata> entry : 
group.getValue()) {
+                long latestDownstreamOffset = entry.getValue().offset();
+                TopicPartition topicPartition = entry.getKey();
+                if (!convertedUpstreamOffset.containsKey(topicPartition)) {
+                    log.trace("convertedUpstreamOffset does not contain 
TopicPartition: {}", topicPartition.toString());
+                    continue;
+                }
+
+                // if translated offset from upstream is smaller than the 
current consumer offset
+                // in the target, skip updating the offset for that partition
+                long convertedOffset = 
convertedUpstreamOffset.get(topicPartition).offset();
+                if (latestDownstreamOffset >= convertedOffset) {
+                    log.trace("latestDownstreamOffset {} is larger than 
convertedUpstreamOffset {} for "
+                        + "TopicPartition {}", latestDownstreamOffset, 
convertedOffset, topicPartition);
+                    continue;
+                }

Review comment:
       I would like to propose the following changes to sync the consumer group 
changes on source side 
   
   ```suggestion
        for (Map.Entry<TopicPartition, OffsetAndMetadata> convertedEntry : 
convertedUpstreamOffset.entrySet()) {
                   TopicPartition topicPartition = convertedEntry.getKey();
                   for (Entry<TopicPartition, OffsetAndMetadata> idleEntry : 
group.getValue()) {
                       if (idleEntry.getKey() == topicPartition) {
                           long latestDownstreamOffset = 
idleEntry.getValue().offset();
                           // if translated offset from upstream is smaller 
than the current consumer offset
                           // in the target, skip updating the offset for that 
partition
                           long convertedOffset = 
convertedUpstreamOffset.get(topicPartition).offset();
                           if (latestDownstreamOffset >= convertedOffset) {
                               log.trace("latestDownstreamOffset {} is larger 
than convertedUpstreamOffset {} for "
                                   + "TopicPartition {}", 
latestDownstreamOffset, convertedOffset, topicPartition);
                               continue;
                           }
                       }
                   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to