C0urante commented on code in PR #13446:
URL: https://github.com/apache/kafka/pull/13446#discussion_r1149871568


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##########
@@ -150,10 +156,30 @@ private void loadInitialConsumerGroups()
 
     List<String> findConsumerGroups()
             throws InterruptedException, ExecutionException {
-        return listConsumerGroups().stream()
+        List<String> filterGroups = listConsumerGroups().stream()
                 .map(ConsumerGroupListing::groupId)
-                .filter(this::shouldReplicate)
+                .filter(this::shouldReplicateByGroupFilter)
                 .collect(Collectors.toList());
+
+        List<String> checkpointGroups = new LinkedList<>();
+        List<String> irrelevantGroups = new LinkedList<>();
+
+        for (String group : filterGroups) {
+            Set<String> consumeTopics = 
listConsumerGroupOffsets(group).keySet().stream()
+                    .map(TopicPartition::topic)
+                    .filter(this::shouldReplicateByTopicFilter)
+                    .collect(Collectors.toSet());
+            // As long as a group consumes a topic configured in "topics", it 
needs to sync for this group.
+            // Although the topic in "topics.exclude" may be consumed in this 
group, this will be filtered out in the MirrorCheckpointTask.

Review Comment:
   This part refers to properties of the 
[DefaultTopicFilter](https://github.com/apache/kafka/blob/31440b00f3ed8de65f368d41d6cf2efb07ca4a5c/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java),
 but users can specify a different `TopicFilter` if they'd like. We should 
probably refer to what the `TopicFilter` does instead of properties specific to 
one `TopicFilter` implementation:
   ```suggestion
               // Only perform checkpoints for groups that have offsets for at 
least one topic
               // that's accepted by the topic filter
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##########
@@ -150,10 +156,30 @@ private void loadInitialConsumerGroups()
 
     List<String> findConsumerGroups()
             throws InterruptedException, ExecutionException {
-        return listConsumerGroups().stream()
+        List<String> filterGroups = listConsumerGroups().stream()

Review Comment:
   Nit: "filtered" instead of "filter"
   
   ```suggestion
           List<String> filteredGroups = listConsumerGroups().stream()
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##########
@@ -150,10 +156,30 @@ private void loadInitialConsumerGroups()
 
     List<String> findConsumerGroups()
             throws InterruptedException, ExecutionException {
-        return listConsumerGroups().stream()
+        List<String> filterGroups = listConsumerGroups().stream()
                 .map(ConsumerGroupListing::groupId)
-                .filter(this::shouldReplicate)
+                .filter(this::shouldReplicateByGroupFilter)
                 .collect(Collectors.toList());
+
+        List<String> checkpointGroups = new LinkedList<>();
+        List<String> irrelevantGroups = new LinkedList<>();
+
+        for (String group : filterGroups) {
+            Set<String> consumeTopics = 
listConsumerGroupOffsets(group).keySet().stream()
+                    .map(TopicPartition::topic)
+                    .filter(this::shouldReplicateByTopicFilter)
+                    .collect(Collectors.toSet());
+            // As long as a group consumes a topic configured in "topics", it 
needs to sync for this group.
+            // Although the topic in "topics.exclude" may be consumed in this 
group, this will be filtered out in the MirrorCheckpointTask.
+            if (consumeTopics.size() > 0) {
+                checkpointGroups.add(group);
+            } else {
+                irrelevantGroups.add(group);
+            }
+        }
+
+        log.debug("The irrelevant synchronous group list after topic filtering 
is: {}", irrelevantGroups);

Review Comment:
   We could make this a bit clearer for users:
   ```suggestion
           log.debug(
                   "Ignoring the following groups which do not have any offsets 
for topics that are accepted by the topic filter: {}",
                   irrelevantGroups
           );
   ```
   
   



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java:
##########
@@ -123,13 +127,21 @@ public void testFindConsumerGroups() throws Exception {
         Collection<ConsumerGroupListing> groups = Arrays.asList(
                 new ConsumerGroupListing("g1", true),
                 new ConsumerGroupListing("g2", false));
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
         doReturn(groups).when(connector).listConsumerGroups();
-        doReturn(true).when(connector).shouldReplicate(anyString());
+        
doReturn(true).when(connector).shouldReplicateByTopicFilter(anyString());
+        
doReturn(true).when(connector).shouldReplicateByGroupFilter(anyString());
+        
doReturn(offsets).when(connector).listConsumerGroupOffsets(anyString());
         List<String> groupFound = connector.findConsumerGroups();
 
         Set<String> expectedGroups = 
groups.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toSet());
         assertEquals(expectedGroups, new HashSet<>(groupFound),
                 "Expected groups are not the same as findConsumerGroups");
+
+        
doReturn(false).when(connector).shouldReplicateByTopicFilter(anyString());

Review Comment:
   This test covers all-or-nothing scenarios where either every consumer group 
is checkpointed or none is. Can we add coverage for the scenario where there 
are two consumer groups which each have committed offsets for a different set 
of topics, and only one of them is selected for checkpointing because at least 
one of the topics it has offsets for is accepted by the topic filter but none 
of the topics for the other consumer group are?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java:
##########
@@ -123,13 +127,21 @@ public void testFindConsumerGroups() throws Exception {
         Collection<ConsumerGroupListing> groups = Arrays.asList(
                 new ConsumerGroupListing("g1", true),
                 new ConsumerGroupListing("g2", false));
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
         doReturn(groups).when(connector).listConsumerGroups();
-        doReturn(true).when(connector).shouldReplicate(anyString());
+        
doReturn(true).when(connector).shouldReplicateByTopicFilter(anyString());
+        
doReturn(true).when(connector).shouldReplicateByGroupFilter(anyString());
+        
doReturn(offsets).when(connector).listConsumerGroupOffsets(anyString());
         List<String> groupFound = connector.findConsumerGroups();
 
         Set<String> expectedGroups = 
groups.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toSet());
         assertEquals(expectedGroups, new HashSet<>(groupFound),
                 "Expected groups are not the same as findConsumerGroups");
+
+        
doReturn(false).when(connector).shouldReplicateByTopicFilter(anyString());
+        List<String> topicFilterGroupFound = connector.findConsumerGroups();
+        assertTrue(topicFilterGroupFound.size() == 0);

Review Comment:
   Nit: we can improve failure messages by making a more general comparison 
here:
   ```suggestion
           assertEquals(Collections.emptyList(), topicFilterGroupFound);
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##########
@@ -150,10 +156,30 @@ private void loadInitialConsumerGroups()
 
     List<String> findConsumerGroups()
             throws InterruptedException, ExecutionException {
-        return listConsumerGroups().stream()
+        List<String> filterGroups = listConsumerGroups().stream()
                 .map(ConsumerGroupListing::groupId)
-                .filter(this::shouldReplicate)
+                .filter(this::shouldReplicateByGroupFilter)
                 .collect(Collectors.toList());
+
+        List<String> checkpointGroups = new LinkedList<>();
+        List<String> irrelevantGroups = new LinkedList<>();
+
+        for (String group : filterGroups) {
+            Set<String> consumeTopics = 
listConsumerGroupOffsets(group).keySet().stream()

Review Comment:
   Nit: "consumed" instead of "consume"
   ```suggestion
               Set<String> consumedTopics = 
listConsumerGroupOffsets(group).keySet().stream()
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -178,6 +178,7 @@ private List<SourceRecord> sourceRecordsForGroup(String 
group) throws Interrupte
         }
     }
 
+    // There may be a group that consumes the topic in "topics.exclude", so it 
also needs to be filtered

Review Comment:
   Isn't this fairly redundant? We already filter the results for 
`shouldCheckpointTopic`, and I don't really see how the PR here makes that any 
more/less likely.
   
   Also, if we really need a comment here, it shouldn't refer to 
`topics.exclude` (since that property is specific to the `DefaultTopicFilter`) 
and should instead just refer to whether or not the user-configured topic 
filter accepts the topic(s).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to