dajac commented on code in PR #14327:
URL: https://github.com/apache/kafka/pull/14327#discussion_r1469244568


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1040,6 +1051,7 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
             .maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
             .maybeUpdateServerAssignorName(Optional.ofNullable(assignorName))
             
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
+            
.maybeUpdateSubscribedTopicRegex(Optional.ofNullable(subscribedTopicRegex))

Review Comment:
   I think that we miss a few fundamental things in `consumerGroupHeartbeat`. 
Let me explain. The `consumerGroupHeartbeat` is structured in 3 parts.
   1) We update the member and its subscriptions.
   2) We compute the new target assignment if needed.
   3) We reconcile the member.
   
   I think that in 1), we need to update the subscription for the member as you 
do here. However, we also need to verify it before storing it and we also need 
to update the subscription metadata if the regex was changed. See L1080. In 
step 2), we also need to change the logic to include the topic matching the 
regex. See L1115. Step 3) is fine as it is.
   
   We also need a mechanism to periodically refresh the regexes in order to 
catch new topics or deleted topics. What was your plan for this? I thought that 
we could piggy back the mechanism to refresh the subscription metadata (L1076) 
to also refresh the regexes.
   
   I think that we also need to store the resolved regular expressions somehow. 
I mean a mapping from the regex (as string) to the matching topics because we 
need this for step 2). For this, I was considering whether we could just use a 
LRU cache. What do you think?



##########
clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json:
##########
@@ -35,6 +35,8 @@
       "about": "-1 if it didn't change since the last heartbeat; the maximum 
time in milliseconds that the coordinator will wait on the member to revoke its 
partitions otherwise." },
     { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", 
"nullableVersions": "0+", "default": "null", "entityType": "topicName",
       "about": "null if it didn't change since the last heartbeat; the 
subscribed topic names otherwise." },
+    { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "null if it didn't change since the last heartbeat; the 
subscribed topic regex otherwise" },

Review Comment:
   We must bump the version of the API to version 1 and use version `1+` for 
this field in order to make it backward compatible.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1573,6 +1589,33 @@ private void updateGroupsByTopics(
                 }
             });
         }
+        if (!oldSubscribedTopicRegex.isEmpty()) {
+            oldSubscribedTopicRegex.forEach(regex -> {
+                groupsByRegex.computeIfPresent(regex, (__, groupIds) -> {
+                    groupIds.remove(groupId);
+                    return groupIds.isEmpty() ? null : groupIds;
+                });
+                Pattern pattern = Pattern.compile(regex);
+                for (String topicName : 
metadataImage.topics().topicsByName().keySet()) {
+                    if (pattern.matcher(topicName).matches()) {
+                        unsubscribeGroupFromTopic(groupId, topicName);
+                    }
+                }
+            });
+        }
+        if (!newSubscribedTopicRegex.isEmpty()) {
+            newSubscribedTopicRegex.forEach(regex -> {
+                groupsByRegex
+                        .computeIfAbsent(regex, __ -> new 
TimelineHashSet<>(snapshotRegistry, 1))
+                        .add(groupId);
+                Pattern pattern = Pattern.compile(regex);
+                for (String topicName : 
metadataImage.topics().topicsByName().keySet()) {
+                    if (pattern.matcher(topicName).matches()) {
+                        subscribeGroupToTopic(groupId, topicName);
+                    }
+                }

Review Comment:
   I am not sure about this for two reasons: (1) Computing the list of topics 
is quite expensive so doing it here may not be the best place; and (2) It would 
not catch the changes as it applies it only when the regex is stored. I think 
that we need to discuss the high level approach. Take a look at my previous 
comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to