bseenu commented on a change in pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#discussion_r416436144



##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) {
             Checkpoint.unwrapGroup(record.sourcePartition()),
             System.currentTimeMillis() - record.timestamp());
     }
+
+    private void refreshIdleConsumerGroupOffset() {
+        Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc 
= targetAdminClient
+            .describeConsumerGroups(consumerGroups).describedGroups();
+
+        for (String group : consumerGroups) {
+            try {
+                if (consumerGroupsDesc.get(group) == null) {
+                    // if consumerGroupsDesc does not contain this group, it 
should be the new consumer
+                    // group created at source cluster and its offsets should 
be sync-ed to target
+                    newConsumerGroup.add(group);
+                    continue;
+                }
+                ConsumerGroupDescription consumerGroupDesc = 
consumerGroupsDesc.get(group).get();
+                // sync offset to the target cluster only if the state of 
current consumer group is idle or dead
+                ConsumerGroupState consumerGroupState = 
consumerGroupDesc.state();
+                if (consumerGroupState.equals(ConsumerGroupState.EMPTY) || 
consumerGroupState.equals(ConsumerGroupState.DEAD)) {
+                    idleConsumerGroupsOffset.put(group, 
targetAdminClient.listConsumerGroupOffsets(group)
+                        .partitionsToOffsetAndMetadata().get().entrySet());
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Error querying for consumer group {} on cluster 
{}.", group, targetClusterAlias, e);
+            }
+        }
+    }
+
+    Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() {
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetToSyncAll = 
new HashMap<>();
+
+        // first, sync offsets for the idle consumers at target
+        for (Map.Entry<String, Set<Map.Entry<TopicPartition, 
OffsetAndMetadata>>> group : idleConsumerGroupsOffset.entrySet()) {
+            String consumerGroupId = group.getKey();
+            // for each idle consumer at target, read the checkpoints 
(converted upstream offset)
+            // from the pre-populated map
+            Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = 
getConvertedUpstreamOffset(consumerGroupId);
+
+            if (convertedUpstreamOffset == null) continue;
+
+            Map<TopicPartition, OffsetAndMetadata> offsetToSync = new 
HashMap<>();
+            for (Entry<TopicPartition, OffsetAndMetadata> entry : 
group.getValue()) {

Review comment:
       @thspinto I think i am running into the same issue which you pointed 
here , so the source consumer group has different topic and is active and the 
target consumer group is idle but having different topic, but the code only 
checks for the topics and partitions matching the target site and adds them 
only when the target offset is less than source, it ignores other topics at the 
source




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to