C0urante commented on code in PR #13446:
URL: https://github.com/apache/kafka/pull/13446#discussion_r1244478202


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##########
@@ -150,10 +156,31 @@ private void loadInitialConsumerGroups()
 
     List<String> findConsumerGroups()
             throws InterruptedException, ExecutionException {
-        return listConsumerGroups().stream()
+        List<String> filteredGroups = listConsumerGroups().stream()
                 .map(ConsumerGroupListing::groupId)
-                .filter(this::shouldReplicate)
+                .filter(this::shouldReplicateByGroupFilter)
                 .collect(Collectors.toList());
+
+        List<String> checkpointGroups = new LinkedList<>();
+        List<String> irrelevantGroups = new LinkedList<>();
+
+        for (String group : filteredGroups) {
+            Set<String> consumedTopics = 
listConsumerGroupOffsets(group).keySet().stream()
+                    .map(TopicPartition::topic)
+                    .filter(this::shouldReplicateByTopicFilter)

Review Comment:
   I don't believe MM2 ever supported that. From 
[KIP-382](https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP382:MirrorMaker2.0-InternalTopics),
 regarding checkpoints (emphasis mine):
   
   > The connector will periodically query the source cluster for all committed 
offsets from all consumer groups, **filter for those topics being replicated**, 
and emit a message to a topic
   
   IIUC, MM2 also performs offset syncing based on the contents of the offset 
syncs topic, which is only populated by the source connector (i.e., the 
connector that replicates topics).
   
   @blacktooth have you experienced otherwise?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to