Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-06-07 Thread via GitHub


mjsax commented on PR #15869:
URL: https://github.com/apache/kafka/pull/15869#issuecomment-2155258201

   Thanks for the PR @Phuc-Hong-Tran -- merged to `trunk` and cherry-picked to 
`3.8` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-06-07 Thread via GitHub


mjsax merged PR #15869:
URL: https://github.com/apache/kafka/pull/15869


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-06-07 Thread via GitHub


lianetm commented on PR #15869:
URL: https://github.com/apache/kafka/pull/15869#issuecomment-2154913514

   Test failures all unrelated. 
   
   > New failing - 8
   > Build / JDK 8 and Scala 2.12 / 
"testUngracefulRemoteCloseDuringHandshakeWrite(Args).args=tlsProtocol=TLSv1.2, 
useInlinePem=false" – org.apache.kafka.common.network.SslTransportLayerTest
   > Build / JDK 8 and Scala 2.12 / 
testOffsetTranslationBehindReplicationFlow() – 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest
   > Build / JDK 8 and Scala 2.12 / testFenceMultipleBrokers() – 
org.apache.kafka.controller.QuorumControllerTest
   > Build / JDK 8 and Scala 2.12 / testTaskRequestWithOldStartMsGetsUpdated() 
– org.apache.kafka.trogdor.coordinator.CoordinatorTest
   > Build / JDK 21 and Scala 2.13 / 
"testNoConsumeWithDescribeAclViaAssign(String).quorum=kraft" – 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest
   > Build / JDK 21 and Scala 2.13 / 
shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_v2] – 
org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest
   > Build / JDK 21 and Scala 2.13 / shouldRestoreState() – 
org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest
   > Build / JDK 21 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() 
– org.apache.kafka.trogdor.coordinator.CoordinatorTest
   > 
   
   > Existing failures - 5
   > Build / JDK 8 and Scala 2.12 / testDynamicBrokerConfigUpdateUsingKraft [2] 
Type=Raft-Isolated, MetadataVersion=4.0-IV0,Security=PLAINTEXT – 
kafka.admin.ConfigCommandIntegrationTest
   > Build / JDK 21 and Scala 2.13 / testDescribeQuorumReplicationSuccessful 
[2] Type=Raft-Combined, MetadataVersion=4.0-IV0,Security=PLAINTEXT – 
org.apache.kafka.tools.MetadataQuorumCommandTest
   > Build / JDK 21 and Scala 2.13 / testDescribeQuorumReplicationSuccessful 
[5] Type=Raft-Isolated, MetadataVersion=4.0-IV0,Security=PLAINTEXT – 
org.apache.kafka.tools.MetadataQuorumCommandTest
   > Build / JDK 21 and Scala 2.13 / testDescribeQuorumStatusSuccessful [1] 
Type=Raft-Isolated, MetadataVersion=4.0-IV0,Security=PLAINTEXT – 
org.apache.kafka.tools.MetadataQuorumCommandTest
   > Build / JDK 21 and Scala 2.13 / testDescribeQuorumReplicationSuccessful 
[2] Type=Raft-Combined, MetadataVersion=4.0-IV0,Security=PLAINTEXT – 
org.apache.kafka.tools.MetadataQuorumCommandTest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-06-06 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1629736401


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +466,44 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {
+SubscriptionState subscriptions = mock(SubscriptionState.class);
+Cluster cluster = mock(Cluster.class);
+
+consumer = newConsumer(
+mock(FetchBuffer.class),
+mock(ConsumerInterceptors.class),
+mock(ConsumerRebalanceListenerInvoker.class),
+subscriptions,
+singletonList(new RoundRobinAssignor()),
+"group-id",
+"client-id");
+
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+Map offsets = 
Collections.singletonMap(tp, new OffsetAndMetadata(1));
+completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+doReturn(cluster).when(metadata).fetch();
+doReturn(Collections.singleton(topicName)).when(cluster).topics();
+
+consumer.subscribe(Pattern.compile("f*"));
+verify(metadata).requestUpdateForNewTopics();
+verify(subscriptions).matchesSubscribedPattern(topicName);
+clearInvocations(subscriptions);
+
+consumer.poll(Duration.ZERO);

Review Comment:
   I've fixed this in the most recent commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-06-06 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1629681404


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +467,42 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {
+SubscriptionState subscriptions = mock(SubscriptionState.class);
+Cluster cluster = mock(Cluster.class);
+
+consumer = newConsumer(
+mock(FetchBuffer.class),
+mock(ConsumerInterceptors.class),
+mock(ConsumerRebalanceListenerInvoker.class),
+subscriptions,
+singletonList(new RoundRobinAssignor()),

Review Comment:
   I understand, will include the change in the next commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-06-06 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1629677489


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +467,42 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {
+SubscriptionState subscriptions = mock(SubscriptionState.class);
+Cluster cluster = mock(Cluster.class);
+
+consumer = newConsumer(
+mock(FetchBuffer.class),
+mock(ConsumerInterceptors.class),
+mock(ConsumerRebalanceListenerInvoker.class),
+subscriptions,
+singletonList(new RoundRobinAssignor()),

Review Comment:
   This PR got merged so let's get the latest changes to get rid of the 
deprecated assignor here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-31 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1622585236


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +466,44 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {
+SubscriptionState subscriptions = mock(SubscriptionState.class);
+Cluster cluster = mock(Cluster.class);
+
+consumer = newConsumer(
+mock(FetchBuffer.class),
+mock(ConsumerInterceptors.class),
+mock(ConsumerRebalanceListenerInvoker.class),
+subscriptions,
+singletonList(new RoundRobinAssignor()),
+"group-id",
+"client-id");
+
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+Map offsets = 
Collections.singletonMap(tp, new OffsetAndMetadata(1));
+completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+doReturn(cluster).when(metadata).fetch();
+doReturn(Collections.singleton(topicName)).when(cluster).topics();
+
+consumer.subscribe(Pattern.compile("f*"));
+verify(metadata).requestUpdateForNewTopics();
+verify(subscriptions).matchesSubscribedPattern(topicName);
+clearInvocations(subscriptions);
+
+consumer.poll(Duration.ZERO);

Review Comment:
   ```suggestion
   when(subscriptions.hasPatternSubscription()).thenReturn(true);
   consumer.poll(Duration.ZERO);
   ```
   
   Let's ensure that the poll knows it has pattern subscription (so that we 
know that nothing is blocking the way to eval the regex on ln 1976 of the 
AsyncConsumer. Just the metadata version check we're testing blocks it). This 
is what will make sure that if we loose the check by mistake, the test will 
actually fail.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-31 Thread via GitHub


lianetm commented on PR #15869:
URL: https://github.com/apache/kafka/pull/15869#issuecomment-2142480168

   Thanks for the changes @Phuc-Hong-Tran, could you please rebase to get trunk 
latests changes? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-31 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1622568609


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +466,44 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {

Review Comment:
   maybe clearer to rename this so it shows that it's about subscription regex 
eval and poll? what about 
`testSubscriptionRegexEvalOnPollOnlyIfMetadataChanges`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-31 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1622564165


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1467,12 +1471,11 @@ public void assign(Collection 
partitions) {
 }
 
 /**
- * TODO: remove this when we implement the KIP-848 protocol.
- *
  * 
- * The contents of this method are shamelessly stolen from
- * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are 
used here because we won't have access
- * to a {@link ConsumerCoordinator} in this code. Perhaps it could be 
moved to a ConsumerUtils class?
+ *
+ * This function evaluate the regex that the consumer subscribed to
+ * against the list of topic names from metadata and update

Review Comment:
   ```suggestion
* against the list of topic names from metadata, and updates
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-31 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1622562316


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1467,12 +1471,11 @@ public void assign(Collection 
partitions) {
 }
 
 /**
- * TODO: remove this when we implement the KIP-848 protocol.
- *
  * 
- * The contents of this method are shamelessly stolen from
- * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are 
used here because we won't have access
- * to a {@link ConsumerCoordinator} in this code. Perhaps it could be 
moved to a ConsumerUtils class?
+ *
+ * This function evaluate the regex that the consumer subscribed to

Review Comment:
   evaluates



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-31 Thread via GitHub


Phuc-Hong-Tran commented on PR #15869:
URL: https://github.com/apache/kafka/pull/15869#issuecomment-2142441742

   @lianetm, I've pushed the required changes. May you have a look once you 
have time? Thanks in advance


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-27 Thread via GitHub


lianetm commented on PR #15869:
URL: https://github.com/apache/kafka/pull/15869#issuecomment-2134005505

   Hey @Phuc-Hong-Tran , any update on this one? Let me know if you have any 
question. 
   
   I also noticed that we should update the PR description to remove the 
reference to the old var name that was renamed, and I would also suggest we 
state what is done (not only how it's done). It should be helpful to explain 
that these are changes to avoid evaluating the subscription regex on every poll 
if metadata hasn't changed, identifying changes based on a metadata version 
snapshot. Thanks! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


lianetm commented on PR #15869:
URL: https://github.com/apache/kafka/pull/15869#issuecomment-2115608996

   Thanks for the updates @Phuc-Hong-Tran! Left some comments mainly to improve 
testing, but looking good overall.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1603634591


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +467,42 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {
+SubscriptionState subscriptions = mock(SubscriptionState.class);
+Cluster cluster = mock(Cluster.class);
+
+consumer = newConsumer(
+mock(FetchBuffer.class),
+mock(ConsumerInterceptors.class),
+mock(ConsumerRebalanceListenerInvoker.class),
+subscriptions,
+singletonList(new RoundRobinAssignor()),
+"group-id",
+"client-id");
+
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+Map offsets = mkMap(mkEntry(tp, new 
OffsetAndMetadata(1)));
+completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+doReturn(cluster).when(metadata).fetch();
+
+HashSet topics = new HashSet<>();
+topics.add(topicName);
+doReturn(topics).when(cluster).topics();
+
+consumer.subscribe(Pattern.compile("f*"));
+verify(metadata).requestUpdateForNewTopics();
+verify(subscriptions).matchesSubscribedPattern(topicName);
+
+consumer.poll(Duration.ZERO);
+clearInvocations(subscriptions);
+verify(subscriptions, never()).matchesSubscribedPattern(topicName);

Review Comment:
   I would say that we should complete the story here to really know that our 
fix is kicking in. Up to this point we tested that we don't evaluate the regex 
on poll, but we should make sure it's because of the logic we're adding to skip 
it if metadata did not change. So what about extending the test to mock a 
metadata change, poll, and see the regex is indeed evaluated in that case? 
   
   ```suggestion
   when(metadata.updateVersion()).thenReturn(2);
   when(subscriptions.hasPatternSubscription()).thenReturn(true);
   consumer.poll(Duration.ZERO);
   verify(subscriptions).matchesSubscribedPattern(topicName);
   ```



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +467,42 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {
+SubscriptionState subscriptions = mock(SubscriptionState.class);
+Cluster cluster = mock(Cluster.class);
+
+consumer = newConsumer(
+mock(FetchBuffer.class),
+mock(ConsumerInterceptors.class),
+mock(ConsumerRebalanceListenerInvoker.class),
+subscriptions,
+singletonList(new RoundRobinAssignor()),
+"group-id",
+"client-id");
+
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+Map offsets = mkMap(mkEntry(tp, new 
OffsetAndMetadata(1)));
+completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+doReturn(cluster).when(metadata).fetch();
+
+HashSet topics = new HashSet<>();
+topics.add(topicName);
+doReturn(topics).when(cluster).topics();
+
+consumer.subscribe(Pattern.compile("f*"));
+verify(metadata).requestUpdateForNewTopics();
+verify(subscriptions).matchesSubscribedPattern(topicName);
+
+consumer.poll(Duration.ZERO);
+clearInvocations(subscriptions);
+verify(subscriptions, never()).matchesSubscribedPattern(topicName);
+}

Review Comment:
   I would say that we should complete the story here to really know that our 
fix is kicking in. Up to this point we tested that we don't evaluate the regex 
on poll, but we should make sure it's because of the logic we're adding to skip 
it if metadata did not change. So what about extending the test to mock a 
metadata change, poll, and see the regex is indeed evaluated in that case? 
   
   ```suggestion
   when(metadata.updateVersion()).thenReturn(2);
   when(subscriptions.hasPatternSubscription()).thenReturn(true);
   consumer.poll(Duration.ZERO);
 

Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1603634591


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +467,42 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {
+SubscriptionState subscriptions = mock(SubscriptionState.class);
+Cluster cluster = mock(Cluster.class);
+
+consumer = newConsumer(
+mock(FetchBuffer.class),
+mock(ConsumerInterceptors.class),
+mock(ConsumerRebalanceListenerInvoker.class),
+subscriptions,
+singletonList(new RoundRobinAssignor()),
+"group-id",
+"client-id");
+
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+Map offsets = mkMap(mkEntry(tp, new 
OffsetAndMetadata(1)));
+completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+doReturn(cluster).when(metadata).fetch();
+
+HashSet topics = new HashSet<>();
+topics.add(topicName);
+doReturn(topics).when(cluster).topics();
+
+consumer.subscribe(Pattern.compile("f*"));
+verify(metadata).requestUpdateForNewTopics();
+verify(subscriptions).matchesSubscribedPattern(topicName);
+
+consumer.poll(Duration.ZERO);
+clearInvocations(subscriptions);
+verify(subscriptions, never()).matchesSubscribedPattern(topicName);

Review Comment:
   I would say that we should complete the story here to really know that our 
fix is kicking in. Up to this point we tested that we don't evaluate the regex 
on poll, but we should make sure it's because of the logic we're adding to skip 
it if metadata did not change. So what about extending the test to mock a 
metadata change, poll, and see the regex is indeed evaluated in that case? 
   
   ```suggestion
   when(metadata.updateVersion()).thenReturn(2);
   when(subscriptions.hasPatternSubscription()).thenReturn(true);
   consumer.poll(Duration.ZERO);
   verify(subscriptions).matchesSubscribedPattern(topicName);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1603624271


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +467,42 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {
+SubscriptionState subscriptions = mock(SubscriptionState.class);
+Cluster cluster = mock(Cluster.class);
+
+consumer = newConsumer(
+mock(FetchBuffer.class),
+mock(ConsumerInterceptors.class),
+mock(ConsumerRebalanceListenerInvoker.class),
+subscriptions,
+singletonList(new RoundRobinAssignor()),
+"group-id",
+"client-id");
+
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+Map offsets = mkMap(mkEntry(tp, new 
OffsetAndMetadata(1)));

Review Comment:
   What about `Collections.singletonMap(tp, new OffsetAndMetadata(1));` ? seems 
simpler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1603605365


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +467,42 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {
+SubscriptionState subscriptions = mock(SubscriptionState.class);
+Cluster cluster = mock(Cluster.class);
+
+consumer = newConsumer(
+mock(FetchBuffer.class),
+mock(ConsumerInterceptors.class),
+mock(ConsumerRebalanceListenerInvoker.class),
+subscriptions,
+singletonList(new RoundRobinAssignor()),
+"group-id",
+"client-id");
+
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+Map offsets = mkMap(mkEntry(tp, new 
OffsetAndMetadata(1)));
+completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+doReturn(cluster).when(metadata).fetch();
+
+HashSet topics = new HashSet<>();
+topics.add(topicName);
+doReturn(topics).when(cluster).topics();
+
+consumer.subscribe(Pattern.compile("f*"));
+verify(metadata).requestUpdateForNewTopics();
+verify(subscriptions).matchesSubscribedPattern(topicName);
+
+consumer.poll(Duration.ZERO);
+clearInvocations(subscriptions);

Review Comment:
   this call should be done before we poll (otherwise it's because of this here 
that the next `verify(subscriptions, never())` succeeds, so not really testing 
the logic)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1603594084


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +467,42 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {
+SubscriptionState subscriptions = mock(SubscriptionState.class);
+Cluster cluster = mock(Cluster.class);
+
+consumer = newConsumer(
+mock(FetchBuffer.class),
+mock(ConsumerInterceptors.class),
+mock(ConsumerRebalanceListenerInvoker.class),
+subscriptions,
+singletonList(new RoundRobinAssignor()),
+"group-id",
+"client-id");
+
+final String topicName = "foo";
+final int partition = 3;
+final TopicPartition tp = new TopicPartition(topicName, partition);
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+Map offsets = mkMap(mkEntry(tp, new 
OffsetAndMetadata(1)));
+completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+doReturn(cluster).when(metadata).fetch();
+
+HashSet topics = new HashSet<>();
+topics.add(topicName);
+doReturn(topics).when(cluster).topics();

Review Comment:
   we could simplify this to a single 
`doReturn(Collections.singleton(topicName)).when(cluster).topics();`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1603537376


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1482,7 +1486,7 @@ private void updatePatternSubscription(Cluster cluster) {
 .collect(Collectors.toSet());
 if (subscriptions.subscribeFromPattern(topicsToSubscribe)) {
 applicationEventHandler.add(new SubscriptionChangeEvent());
-metadata.requestUpdateForNewTopics();
+this.metadataVersionSnapshot = 
metadata.requestUpdateForNewTopics();

Review Comment:
   Since we're completing the regex implementation, could you clean up the func 
doc? We should remove the TODO, and I would suggest we rephrase the 
description, and simply state what this evaluates the regex against the list of 
topic names from metadata, and updates the subscription state with the matching 
topics. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1603504298


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -463,6 +467,42 @@ public void onPartitionsAssigned(final 
Collection partitions) {
 assertTrue(callbackExecuted.get());
 }
 
+@Test
+public void testCheckForNewTopicOnlyWhenMetadataChange() {
+SubscriptionState subscriptions = mock(SubscriptionState.class);
+Cluster cluster = mock(Cluster.class);
+
+consumer = newConsumer(
+mock(FetchBuffer.class),
+mock(ConsumerInterceptors.class),
+mock(ConsumerRebalanceListenerInvoker.class),
+subscriptions,
+singletonList(new RoundRobinAssignor()),

Review Comment:
   we should not have to deal with the old assignors in the new consumer, so 
this made me notice we have a bug that makes you have to provide this. That 
needs to be fixed, so I filed https://issues.apache.org/jira/browse/KAFKA-16786 
for it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1603361345


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1968,9 +1972,13 @@ SubscriptionState subscriptions() {
 }
 
 private void maybeUpdateSubscriptionMetadata() {
-if (subscriptions.hasPatternSubscription()) {
-updatePatternSubscription(metadata.fetch());
+if (this.metadataVersionSnapshot < metadata.updateVersion()) {
+this.metadataVersionSnapshot = metadata.updateVersion();
+if (subscriptions.hasPatternSubscription()) {
+updatePatternSubscription(metadata.fetch());
+}
 }
+updatePatternSubscription(metadata.fetch());

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1603357750


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1968,9 +1972,13 @@ SubscriptionState subscriptions() {
 }
 
 private void maybeUpdateSubscriptionMetadata() {
-if (subscriptions.hasPatternSubscription()) {
-updatePatternSubscription(metadata.fetch());
+if (this.metadataVersionSnapshot < metadata.updateVersion()) {
+this.metadataVersionSnapshot = metadata.updateVersion();
+if (subscriptions.hasPatternSubscription()) {
+updatePatternSubscription(metadata.fetch());
+}
 }
+updatePatternSubscription(metadata.fetch());

Review Comment:
   My bad, I'll fix it now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1603351190


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1968,9 +1972,13 @@ SubscriptionState subscriptions() {
 }
 
 private void maybeUpdateSubscriptionMetadata() {
-if (subscriptions.hasPatternSubscription()) {
-updatePatternSubscription(metadata.fetch());
+if (this.metadataVersionSnapshot < metadata.updateVersion()) {
+this.metadataVersionSnapshot = metadata.updateVersion();
+if (subscriptions.hasPatternSubscription()) {
+updatePatternSubscription(metadata.fetch());
+}
 }
+updatePatternSubscription(metadata.fetch());

Review Comment:
   I would say this duplicated call ended up here by mistake? (and should be 
removed)...we only want to check/update the pattern if the metadata changed and 
`hasPatternSubscription` (done above on ln 1978)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1603351190


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1968,9 +1972,13 @@ SubscriptionState subscriptions() {
 }
 
 private void maybeUpdateSubscriptionMetadata() {
-if (subscriptions.hasPatternSubscription()) {
-updatePatternSubscription(metadata.fetch());
+if (this.metadataVersionSnapshot < metadata.updateVersion()) {
+this.metadataVersionSnapshot = metadata.updateVersion();
+if (subscriptions.hasPatternSubscription()) {
+updatePatternSubscription(metadata.fetch());
+}
 }
+updatePatternSubscription(metadata.fetch());

Review Comment:
   I would say this duplicated call ended up here by mistake? we only want to 
check/update the pattern if the metadata changed and `hasPatternSubscription` 
(done above on ln 1978)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-16 Thread via GitHub


Phuc-Hong-Tran commented on PR #15869:
URL: https://github.com/apache/kafka/pull/15869#issuecomment-2115102574

   @lianetm, I've added the required test and refactored according to your 
advices, may you have a look once you have time? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-15 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1602533374


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1968,7 +1972,7 @@ SubscriptionState subscriptions() {
 }
 
 private void maybeUpdateSubscriptionMetadata() {
-if (subscriptions.hasPatternSubscription()) {
+if (subscriptions.hasPatternSubscription() && 
this.mirroredMetadataUpdateVersion < metadata.updateVersion()) {

Review Comment:
   I understand, will update it accordingly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-08 Thread via GitHub


lianetm commented on PR #15869:
URL: https://github.com/apache/kafka/pull/15869#issuecomment-2101139225

   Hey @Phuc-Hong-Tran , thanks for the changes, left some comments. Could we 
add unit tests for this? Definitely helpful to validate the main goal of this 
patch, which is to ensure the regex in only evaluated once, and not on 
following calls to poll (when metadata remains unchanged). I would suggest 
playing with the mocks of the metadata object to mock values for 
`metadata.updateVersion()`, and verify calls to 
`subscription.matchesSubscribedPattern(topic)`. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-08 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1594430619


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1968,7 +1972,7 @@ SubscriptionState subscriptions() {
 }
 
 private void maybeUpdateSubscriptionMetadata() {
-if (subscriptions.hasPatternSubscription()) {
+if (subscriptions.hasPatternSubscription() && 
this.mirroredMetadataUpdateVersion < metadata.updateVersion()) {

Review Comment:
   shouldn't we update the `mirroredMetadataUpdateVersion` here, when we know 
that it changed but there is no pattern subscription at that moment? We update 
it whenever there are calls to subscribe that make the subscription change, but 
this path is triggered from the poll regularly, so if we know metadata changed, 
better to update it just for consistency and to have an accurate snapshot (even 
if the user makes no more calls to subscribe)
```suggestion
   if (this.metadataVersionSnapshot < metadata.updateVersion()) {
   this.metadataVersionSnapshot = metadata.updateVersion();
   if (subscriptions.hasPatternSubscription()) {
   updatePatternSubscription(metadata.fetch());
   }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]

2024-05-08 Thread via GitHub


lianetm commented on code in PR #15869:
URL: https://github.com/apache/kafka/pull/15869#discussion_r1594393442


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -260,6 +260,7 @@ private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
 
 private final SubscriptionState subscriptions;
 private final ConsumerMetadata metadata;
+private int mirroredMetadataUpdateVersion;

Review Comment:
   maybe `metadataVersionSnapshot` would reflect better what this var is for?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org