dajac commented on code in PR #14327: URL: https://github.com/apache/kafka/pull/14327#discussion_r1469244568
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1040,6 +1051,7 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr .maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs)) .maybeUpdateServerAssignorName(Optional.ofNullable(assignorName)) .maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames)) + .maybeUpdateSubscribedTopicRegex(Optional.ofNullable(subscribedTopicRegex)) Review Comment: I think that we miss a few fundamental things in `consumerGroupHeartbeat`. Let me explain. The `consumerGroupHeartbeat` is structured in 3 parts. 1) We update the member and its subscriptions. 2) We compute the new target assignment if needed. 3) We reconcile the member. I think that in 1), we need to update the subscription for the member as you do here. However, we also need to verify it before storing it and we also need to update the subscription metadata if the regex was changed. See L1080. In step 2), we also need to change the logic to include the topic matching the regex. See L1115. Step 3) is fine as it is. We also need a mechanism to periodically refresh the regexes in order to catch new topics or deleted topics. What was your plan for this? I thought that we could piggy back the mechanism to refresh the subscription metadata (L1076) to also refresh the regexes. I think that we also need to store the resolved regular expressions somehow. I mean a mapping from the regex (as string) to the matching topics because we need this for step 2). For this, I was considering whether we could just use a LRU cache. What do you think? ########## clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json: ########## @@ -35,6 +35,8 @@ "about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." }, { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "topicName", "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." }, + { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" }, Review Comment: We must bump the version of the API to version 1 and use version `1+` for this field in order to make it backward compatible. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1573,6 +1589,33 @@ private void updateGroupsByTopics( } }); } + if (!oldSubscribedTopicRegex.isEmpty()) { + oldSubscribedTopicRegex.forEach(regex -> { + groupsByRegex.computeIfPresent(regex, (__, groupIds) -> { + groupIds.remove(groupId); + return groupIds.isEmpty() ? null : groupIds; + }); + Pattern pattern = Pattern.compile(regex); + for (String topicName : metadataImage.topics().topicsByName().keySet()) { + if (pattern.matcher(topicName).matches()) { + unsubscribeGroupFromTopic(groupId, topicName); + } + } + }); + } + if (!newSubscribedTopicRegex.isEmpty()) { + newSubscribedTopicRegex.forEach(regex -> { + groupsByRegex + .computeIfAbsent(regex, __ -> new TimelineHashSet<>(snapshotRegistry, 1)) + .add(groupId); + Pattern pattern = Pattern.compile(regex); + for (String topicName : metadataImage.topics().topicsByName().keySet()) { + if (pattern.matcher(topicName).matches()) { + subscribeGroupToTopic(groupId, topicName); + } + } Review Comment: I am not sure about this for two reasons: (1) Computing the list of topics is quite expensive so doing it here may not be the best place; and (2) It would not catch the changes as it applies it only when the regex is stored. I think that we need to discuss the high level approach. Take a look at my previous comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org