lianetm commented on code in PR #17651:
URL: https://github.com/apache/kafka/pull/17651#discussion_r1824701647


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -195,14 +196,23 @@ public void testConsumerHeartbeatRequestValidation() {
                 .setRebalanceTimeoutMs(5000)));
         assertEquals("TopicPartitions must be empty when (re-)joining.", 
ex.getMessage());
 
-        // SubscribedTopicNames must be present and empty in the first request 
(epoch == 0).
+        // SubscribedTopicNames can be present and non-empty in the first 
request (epoch == 0).

Review Comment:
   the comment is not really aligned with the code here right? this is ensuring 
SubscribedTopicNames or SubscribedTopicRegex must be present and non-empty...



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2406,6 +2411,17 @@ private boolean hasMemberSubscriptionChanged(
             if 
(!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
                 log.debug("[GroupId {}] Member {} updated its subscribed regex 
to: {}.",
                     groupId, memberId, updatedMember.subscribedTopicRegex());
+                // If the regular expression has changed, we compile it to 
ensure that
+                // its syntax is valid.
+                if (updatedMember.subscribedTopicRegex() != null) {
+                    try {
+                        Pattern.compile(updatedMember.subscribedTopicRegex());
+                    } catch (PatternSyntaxException ex) {
+                        throw new InvalidRegularExpression(
+                            String.format("SubscribedTopicRegex `%s` is not a 
valid regular expression: %s.",
+                                updatedMember.subscribedTopicRegex(), 
ex.getDescription()));
+                    }
+                }

Review Comment:
   would it be better maybe to encapsulate this in a kind of 
`throwIfInvalidRegularExpression`? (it aligns with the validation pattern we 
have in the mgr, and it would probably be the place to extend with caching 
changes) 



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -195,14 +196,23 @@ public void testConsumerHeartbeatRequestValidation() {
                 .setRebalanceTimeoutMs(5000)));
         assertEquals("TopicPartitions must be empty when (re-)joining.", 
ex.getMessage());
 
-        // SubscribedTopicNames must be present and empty in the first request 
(epoch == 0).
+        // SubscribedTopicNames can be present and non-empty in the first 
request (epoch == 0).
         ex = assertThrows(InvalidRequestException.class, () -> 
context.consumerGroupHeartbeat(
             new ConsumerGroupHeartbeatRequestData()
                 .setGroupId("foo")
                 .setMemberEpoch(0)
                 .setRebalanceTimeoutMs(5000)
                 .setTopicPartitions(Collections.emptyList())));
-        assertEquals("SubscribedTopicNames must be set in first request.", 
ex.getMessage());
+        assertEquals("SubscribedTopicNames or SubscribedTopicRegex must be set 
in first request.", ex.getMessage());
+
+        // SubscribedTopicRegex can be present and non-empty in the first 
request (epoch == 0).
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setTopicPartitions(Collections.emptyList())));
+        assertEquals("SubscribedTopicNames or SubscribedTopicRegex must be set 
in first request.", ex.getMessage());

Review Comment:
   this seems to be testing the same as above



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2406,6 +2411,17 @@ private boolean hasMemberSubscriptionChanged(
             if 
(!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
                 log.debug("[GroupId {}] Member {} updated its subscribed regex 
to: {}.",
                     groupId, memberId, updatedMember.subscribedTopicRegex());
+                // If the regular expression has changed, we compile it to 
ensure that
+                // its syntax is valid.
+                if (updatedMember.subscribedTopicRegex() != null) {
+                    try {
+                        Pattern.compile(updatedMember.subscribedTopicRegex());
+                    } catch (PatternSyntaxException ex) {
+                        throw new InvalidRegularExpression(

Review Comment:
   should we update the java doc for this func to show it `throws 
InvalidRegularExpression` now?



##########
core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala:
##########
@@ -166,6 +166,51 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
     assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
   }
 
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    )
+  )
+  def testConsumerGroupHeartbeatWithRegularExpression(): Unit = {

Review Comment:
   should we cover also the negative case? expecting a HB response but with  
`Errors.INVALID_REGULAR_EXPRESSION`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to