C0urante commented on code in PR #13446: URL: https://github.com/apache/kafka/pull/13446#discussion_r1149871568
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ########## @@ -150,10 +156,30 @@ private void loadInitialConsumerGroups() List<String> findConsumerGroups() throws InterruptedException, ExecutionException { - return listConsumerGroups().stream() + List<String> filterGroups = listConsumerGroups().stream() .map(ConsumerGroupListing::groupId) - .filter(this::shouldReplicate) + .filter(this::shouldReplicateByGroupFilter) .collect(Collectors.toList()); + + List<String> checkpointGroups = new LinkedList<>(); + List<String> irrelevantGroups = new LinkedList<>(); + + for (String group : filterGroups) { + Set<String> consumeTopics = listConsumerGroupOffsets(group).keySet().stream() + .map(TopicPartition::topic) + .filter(this::shouldReplicateByTopicFilter) + .collect(Collectors.toSet()); + // As long as a group consumes a topic configured in "topics", it needs to sync for this group. + // Although the topic in "topics.exclude" may be consumed in this group, this will be filtered out in the MirrorCheckpointTask. Review Comment: This part refers to properties of the [DefaultTopicFilter](https://github.com/apache/kafka/blob/31440b00f3ed8de65f368d41d6cf2efb07ca4a5c/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java), but users can specify a different `TopicFilter` if they'd like. We should probably refer to what the `TopicFilter` does instead of properties specific to one `TopicFilter` implementation: ```suggestion // Only perform checkpoints for groups that have offsets for at least one topic // that's accepted by the topic filter ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ########## @@ -150,10 +156,30 @@ private void loadInitialConsumerGroups() List<String> findConsumerGroups() throws InterruptedException, ExecutionException { - return listConsumerGroups().stream() + List<String> filterGroups = listConsumerGroups().stream() Review Comment: Nit: "filtered" instead of "filter" ```suggestion List<String> filteredGroups = listConsumerGroups().stream() ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ########## @@ -150,10 +156,30 @@ private void loadInitialConsumerGroups() List<String> findConsumerGroups() throws InterruptedException, ExecutionException { - return listConsumerGroups().stream() + List<String> filterGroups = listConsumerGroups().stream() .map(ConsumerGroupListing::groupId) - .filter(this::shouldReplicate) + .filter(this::shouldReplicateByGroupFilter) .collect(Collectors.toList()); + + List<String> checkpointGroups = new LinkedList<>(); + List<String> irrelevantGroups = new LinkedList<>(); + + for (String group : filterGroups) { + Set<String> consumeTopics = listConsumerGroupOffsets(group).keySet().stream() + .map(TopicPartition::topic) + .filter(this::shouldReplicateByTopicFilter) + .collect(Collectors.toSet()); + // As long as a group consumes a topic configured in "topics", it needs to sync for this group. + // Although the topic in "topics.exclude" may be consumed in this group, this will be filtered out in the MirrorCheckpointTask. + if (consumeTopics.size() > 0) { + checkpointGroups.add(group); + } else { + irrelevantGroups.add(group); + } + } + + log.debug("The irrelevant synchronous group list after topic filtering is: {}", irrelevantGroups); Review Comment: We could make this a bit clearer for users: ```suggestion log.debug( "Ignoring the following groups which do not have any offsets for topics that are accepted by the topic filter: {}", irrelevantGroups ); ``` ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java: ########## @@ -123,13 +127,21 @@ public void testFindConsumerGroups() throws Exception { Collection<ConsumerGroupListing> groups = Arrays.asList( new ConsumerGroupListing("g1", true), new ConsumerGroupListing("g2", false)); + Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); + offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0)); doReturn(groups).when(connector).listConsumerGroups(); - doReturn(true).when(connector).shouldReplicate(anyString()); + doReturn(true).when(connector).shouldReplicateByTopicFilter(anyString()); + doReturn(true).when(connector).shouldReplicateByGroupFilter(anyString()); + doReturn(offsets).when(connector).listConsumerGroupOffsets(anyString()); List<String> groupFound = connector.findConsumerGroups(); Set<String> expectedGroups = groups.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toSet()); assertEquals(expectedGroups, new HashSet<>(groupFound), "Expected groups are not the same as findConsumerGroups"); + + doReturn(false).when(connector).shouldReplicateByTopicFilter(anyString()); Review Comment: This test covers all-or-nothing scenarios where either every consumer group is checkpointed or none is. Can we add coverage for the scenario where there are two consumer groups which each have committed offsets for a different set of topics, and only one of them is selected for checkpointing because at least one of the topics it has offsets for is accepted by the topic filter but none of the topics for the other consumer group are? ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java: ########## @@ -123,13 +127,21 @@ public void testFindConsumerGroups() throws Exception { Collection<ConsumerGroupListing> groups = Arrays.asList( new ConsumerGroupListing("g1", true), new ConsumerGroupListing("g2", false)); + Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); + offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0)); doReturn(groups).when(connector).listConsumerGroups(); - doReturn(true).when(connector).shouldReplicate(anyString()); + doReturn(true).when(connector).shouldReplicateByTopicFilter(anyString()); + doReturn(true).when(connector).shouldReplicateByGroupFilter(anyString()); + doReturn(offsets).when(connector).listConsumerGroupOffsets(anyString()); List<String> groupFound = connector.findConsumerGroups(); Set<String> expectedGroups = groups.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toSet()); assertEquals(expectedGroups, new HashSet<>(groupFound), "Expected groups are not the same as findConsumerGroups"); + + doReturn(false).when(connector).shouldReplicateByTopicFilter(anyString()); + List<String> topicFilterGroupFound = connector.findConsumerGroups(); + assertTrue(topicFilterGroupFound.size() == 0); Review Comment: Nit: we can improve failure messages by making a more general comparison here: ```suggestion assertEquals(Collections.emptyList(), topicFilterGroupFound); ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ########## @@ -150,10 +156,30 @@ private void loadInitialConsumerGroups() List<String> findConsumerGroups() throws InterruptedException, ExecutionException { - return listConsumerGroups().stream() + List<String> filterGroups = listConsumerGroups().stream() .map(ConsumerGroupListing::groupId) - .filter(this::shouldReplicate) + .filter(this::shouldReplicateByGroupFilter) .collect(Collectors.toList()); + + List<String> checkpointGroups = new LinkedList<>(); + List<String> irrelevantGroups = new LinkedList<>(); + + for (String group : filterGroups) { + Set<String> consumeTopics = listConsumerGroupOffsets(group).keySet().stream() Review Comment: Nit: "consumed" instead of "consume" ```suggestion Set<String> consumedTopics = listConsumerGroupOffsets(group).keySet().stream() ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ########## @@ -178,6 +178,7 @@ private List<SourceRecord> sourceRecordsForGroup(String group) throws Interrupte } } + // There may be a group that consumes the topic in "topics.exclude", so it also needs to be filtered Review Comment: Isn't this fairly redundant? We already filter the results for `shouldCheckpointTopic`, and I don't really see how the PR here makes that any more/less likely. Also, if we really need a comment here, it shouldn't refer to `topics.exclude` (since that property is specific to the `DefaultTopicFilter`) and should instead just refer to whether or not the user-configured topic filter accepts the topic(s). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org