C0urante commented on code in PR #13446: URL: https://github.com/apache/kafka/pull/13446#discussion_r1244478202
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ########## @@ -150,10 +156,31 @@ private void loadInitialConsumerGroups() List<String> findConsumerGroups() throws InterruptedException, ExecutionException { - return listConsumerGroups().stream() + List<String> filteredGroups = listConsumerGroups().stream() .map(ConsumerGroupListing::groupId) - .filter(this::shouldReplicate) + .filter(this::shouldReplicateByGroupFilter) .collect(Collectors.toList()); + + List<String> checkpointGroups = new LinkedList<>(); + List<String> irrelevantGroups = new LinkedList<>(); + + for (String group : filteredGroups) { + Set<String> consumedTopics = listConsumerGroupOffsets(group).keySet().stream() + .map(TopicPartition::topic) + .filter(this::shouldReplicateByTopicFilter) Review Comment: I don't believe MM2 ever supported that. From [KIP-382](https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP382:MirrorMaker2.0-InternalTopics), regarding checkpoints (emphasis mine): > The connector will periodically query the source cluster for all committed offsets from all consumer groups, **filter for those topics being replicated**, and emit a message to a topic IIUC, MM2 also performs offset syncing based on the contents of the offset syncs topic, which is only populated by the source connector (i.e., the connector that replicates topics). @blacktooth have you experienced otherwise? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org