Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
mjsax commented on PR #15869: URL: https://github.com/apache/kafka/pull/15869#issuecomment-2155258201 Thanks for the PR @Phuc-Hong-Tran -- merged to `trunk` and cherry-picked to `3.8` branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
mjsax merged PR #15869: URL: https://github.com/apache/kafka/pull/15869 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on PR #15869: URL: https://github.com/apache/kafka/pull/15869#issuecomment-2154913514 Test failures all unrelated. > New failing - 8 > Build / JDK 8 and Scala 2.12 / "testUngracefulRemoteCloseDuringHandshakeWrite(Args).args=tlsProtocol=TLSv1.2, useInlinePem=false" – org.apache.kafka.common.network.SslTransportLayerTest > Build / JDK 8 and Scala 2.12 / testOffsetTranslationBehindReplicationFlow() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest > Build / JDK 8 and Scala 2.12 / testFenceMultipleBrokers() – org.apache.kafka.controller.QuorumControllerTest > Build / JDK 8 and Scala 2.12 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest > Build / JDK 21 and Scala 2.13 / "testNoConsumeWithDescribeAclViaAssign(String).quorum=kraft" – kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest > Build / JDK 21 and Scala 2.13 / shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_v2] – org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest > Build / JDK 21 and Scala 2.13 / shouldRestoreState() – org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest > Build / JDK 21 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest > > Existing failures - 5 > Build / JDK 8 and Scala 2.12 / testDynamicBrokerConfigUpdateUsingKraft [2] Type=Raft-Isolated, MetadataVersion=4.0-IV0,Security=PLAINTEXT – kafka.admin.ConfigCommandIntegrationTest > Build / JDK 21 and Scala 2.13 / testDescribeQuorumReplicationSuccessful [2] Type=Raft-Combined, MetadataVersion=4.0-IV0,Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest > Build / JDK 21 and Scala 2.13 / testDescribeQuorumReplicationSuccessful [5] Type=Raft-Isolated, MetadataVersion=4.0-IV0,Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest > Build / JDK 21 and Scala 2.13 / testDescribeQuorumStatusSuccessful [1] Type=Raft-Isolated, MetadataVersion=4.0-IV0,Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest > Build / JDK 21 and Scala 2.13 / testDescribeQuorumReplicationSuccessful [2] Type=Raft-Combined, MetadataVersion=4.0-IV0,Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
Phuc-Hong-Tran commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1629736401 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -463,6 +466,44 @@ public void onPartitionsAssigned(final Collection partitions) { assertTrue(callbackExecuted.get()); } +@Test +public void testCheckForNewTopicOnlyWhenMetadataChange() { +SubscriptionState subscriptions = mock(SubscriptionState.class); +Cluster cluster = mock(Cluster.class); + +consumer = newConsumer( +mock(FetchBuffer.class), +mock(ConsumerInterceptors.class), +mock(ConsumerRebalanceListenerInvoker.class), +subscriptions, +singletonList(new RoundRobinAssignor()), +"group-id", +"client-id"); + +final String topicName = "foo"; +final int partition = 3; +final TopicPartition tp = new TopicPartition(topicName, partition); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); +Map offsets = Collections.singletonMap(tp, new OffsetAndMetadata(1)); +completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +doReturn(cluster).when(metadata).fetch(); +doReturn(Collections.singleton(topicName)).when(cluster).topics(); + +consumer.subscribe(Pattern.compile("f*")); +verify(metadata).requestUpdateForNewTopics(); +verify(subscriptions).matchesSubscribedPattern(topicName); +clearInvocations(subscriptions); + +consumer.poll(Duration.ZERO); Review Comment: I've fixed this in the most recent commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
Phuc-Hong-Tran commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1629681404 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -463,6 +467,42 @@ public void onPartitionsAssigned(final Collection partitions) { assertTrue(callbackExecuted.get()); } +@Test +public void testCheckForNewTopicOnlyWhenMetadataChange() { +SubscriptionState subscriptions = mock(SubscriptionState.class); +Cluster cluster = mock(Cluster.class); + +consumer = newConsumer( +mock(FetchBuffer.class), +mock(ConsumerInterceptors.class), +mock(ConsumerRebalanceListenerInvoker.class), +subscriptions, +singletonList(new RoundRobinAssignor()), Review Comment: I understand, will include the change in the next commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1629677489 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -463,6 +467,42 @@ public void onPartitionsAssigned(final Collection partitions) { assertTrue(callbackExecuted.get()); } +@Test +public void testCheckForNewTopicOnlyWhenMetadataChange() { +SubscriptionState subscriptions = mock(SubscriptionState.class); +Cluster cluster = mock(Cluster.class); + +consumer = newConsumer( +mock(FetchBuffer.class), +mock(ConsumerInterceptors.class), +mock(ConsumerRebalanceListenerInvoker.class), +subscriptions, +singletonList(new RoundRobinAssignor()), Review Comment: This PR got merged so let's get the latest changes to get rid of the deprecated assignor here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1622585236 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -463,6 +466,44 @@ public void onPartitionsAssigned(final Collection partitions) { assertTrue(callbackExecuted.get()); } +@Test +public void testCheckForNewTopicOnlyWhenMetadataChange() { +SubscriptionState subscriptions = mock(SubscriptionState.class); +Cluster cluster = mock(Cluster.class); + +consumer = newConsumer( +mock(FetchBuffer.class), +mock(ConsumerInterceptors.class), +mock(ConsumerRebalanceListenerInvoker.class), +subscriptions, +singletonList(new RoundRobinAssignor()), +"group-id", +"client-id"); + +final String topicName = "foo"; +final int partition = 3; +final TopicPartition tp = new TopicPartition(topicName, partition); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); +Map offsets = Collections.singletonMap(tp, new OffsetAndMetadata(1)); +completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +doReturn(cluster).when(metadata).fetch(); +doReturn(Collections.singleton(topicName)).when(cluster).topics(); + +consumer.subscribe(Pattern.compile("f*")); +verify(metadata).requestUpdateForNewTopics(); +verify(subscriptions).matchesSubscribedPattern(topicName); +clearInvocations(subscriptions); + +consumer.poll(Duration.ZERO); Review Comment: ```suggestion when(subscriptions.hasPatternSubscription()).thenReturn(true); consumer.poll(Duration.ZERO); ``` Let's ensure that the poll knows it has pattern subscription (so that we know that nothing is blocking the way to eval the regex on ln 1976 of the AsyncConsumer. Just the metadata version check we're testing blocks it). This is what will make sure that if we loose the check by mistake, the test will actually fail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on PR #15869: URL: https://github.com/apache/kafka/pull/15869#issuecomment-2142480168 Thanks for the changes @Phuc-Hong-Tran, could you please rebase to get trunk latests changes? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1622568609 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -463,6 +466,44 @@ public void onPartitionsAssigned(final Collection partitions) { assertTrue(callbackExecuted.get()); } +@Test +public void testCheckForNewTopicOnlyWhenMetadataChange() { Review Comment: maybe clearer to rename this so it shows that it's about subscription regex eval and poll? what about `testSubscriptionRegexEvalOnPollOnlyIfMetadataChanges`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1622564165 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1467,12 +1471,11 @@ public void assign(Collection partitions) { } /** - * TODO: remove this when we implement the KIP-848 protocol. - * * - * The contents of this method are shamelessly stolen from - * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access - * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * This function evaluate the regex that the consumer subscribed to + * against the list of topic names from metadata and update Review Comment: ```suggestion * against the list of topic names from metadata, and updates ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1622562316 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1467,12 +1471,11 @@ public void assign(Collection partitions) { } /** - * TODO: remove this when we implement the KIP-848 protocol. - * * - * The contents of this method are shamelessly stolen from - * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access - * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * This function evaluate the regex that the consumer subscribed to Review Comment: evaluates -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
Phuc-Hong-Tran commented on PR #15869: URL: https://github.com/apache/kafka/pull/15869#issuecomment-2142441742 @lianetm, I've pushed the required changes. May you have a look once you have time? Thanks in advance -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on PR #15869: URL: https://github.com/apache/kafka/pull/15869#issuecomment-2134005505 Hey @Phuc-Hong-Tran , any update on this one? Let me know if you have any question. I also noticed that we should update the PR description to remove the reference to the old var name that was renamed, and I would also suggest we state what is done (not only how it's done). It should be helpful to explain that these are changes to avoid evaluating the subscription regex on every poll if metadata hasn't changed, identifying changes based on a metadata version snapshot. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on PR #15869: URL: https://github.com/apache/kafka/pull/15869#issuecomment-2115608996 Thanks for the updates @Phuc-Hong-Tran! Left some comments mainly to improve testing, but looking good overall. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1603634591 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -463,6 +467,42 @@ public void onPartitionsAssigned(final Collection partitions) { assertTrue(callbackExecuted.get()); } +@Test +public void testCheckForNewTopicOnlyWhenMetadataChange() { +SubscriptionState subscriptions = mock(SubscriptionState.class); +Cluster cluster = mock(Cluster.class); + +consumer = newConsumer( +mock(FetchBuffer.class), +mock(ConsumerInterceptors.class), +mock(ConsumerRebalanceListenerInvoker.class), +subscriptions, +singletonList(new RoundRobinAssignor()), +"group-id", +"client-id"); + +final String topicName = "foo"; +final int partition = 3; +final TopicPartition tp = new TopicPartition(topicName, partition); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); +Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); +completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +doReturn(cluster).when(metadata).fetch(); + +HashSet topics = new HashSet<>(); +topics.add(topicName); +doReturn(topics).when(cluster).topics(); + +consumer.subscribe(Pattern.compile("f*")); +verify(metadata).requestUpdateForNewTopics(); +verify(subscriptions).matchesSubscribedPattern(topicName); + +consumer.poll(Duration.ZERO); +clearInvocations(subscriptions); +verify(subscriptions, never()).matchesSubscribedPattern(topicName); Review Comment: I would say that we should complete the story here to really know that our fix is kicking in. Up to this point we tested that we don't evaluate the regex on poll, but we should make sure it's because of the logic we're adding to skip it if metadata did not change. So what about extending the test to mock a metadata change, poll, and see the regex is indeed evaluated in that case? ```suggestion when(metadata.updateVersion()).thenReturn(2); when(subscriptions.hasPatternSubscription()).thenReturn(true); consumer.poll(Duration.ZERO); verify(subscriptions).matchesSubscribedPattern(topicName); ``` ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -463,6 +467,42 @@ public void onPartitionsAssigned(final Collection partitions) { assertTrue(callbackExecuted.get()); } +@Test +public void testCheckForNewTopicOnlyWhenMetadataChange() { +SubscriptionState subscriptions = mock(SubscriptionState.class); +Cluster cluster = mock(Cluster.class); + +consumer = newConsumer( +mock(FetchBuffer.class), +mock(ConsumerInterceptors.class), +mock(ConsumerRebalanceListenerInvoker.class), +subscriptions, +singletonList(new RoundRobinAssignor()), +"group-id", +"client-id"); + +final String topicName = "foo"; +final int partition = 3; +final TopicPartition tp = new TopicPartition(topicName, partition); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); +Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); +completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +doReturn(cluster).when(metadata).fetch(); + +HashSet topics = new HashSet<>(); +topics.add(topicName); +doReturn(topics).when(cluster).topics(); + +consumer.subscribe(Pattern.compile("f*")); +verify(metadata).requestUpdateForNewTopics(); +verify(subscriptions).matchesSubscribedPattern(topicName); + +consumer.poll(Duration.ZERO); +clearInvocations(subscriptions); +verify(subscriptions, never()).matchesSubscribedPattern(topicName); +} Review Comment: I would say that we should complete the story here to really know that our fix is kicking in. Up to this point we tested that we don't evaluate the regex on poll, but we should make sure it's because of the logic we're adding to skip it if metadata did not change. So what about extending the test to mock a metadata change, poll, and see the regex is indeed evaluated in that case? ```suggestion when(metadata.updateVersion()).thenReturn(2); when(subscriptions.hasPatternSubscription()).thenReturn(true); consumer.poll(Duration.ZERO);
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1603634591 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -463,6 +467,42 @@ public void onPartitionsAssigned(final Collection partitions) { assertTrue(callbackExecuted.get()); } +@Test +public void testCheckForNewTopicOnlyWhenMetadataChange() { +SubscriptionState subscriptions = mock(SubscriptionState.class); +Cluster cluster = mock(Cluster.class); + +consumer = newConsumer( +mock(FetchBuffer.class), +mock(ConsumerInterceptors.class), +mock(ConsumerRebalanceListenerInvoker.class), +subscriptions, +singletonList(new RoundRobinAssignor()), +"group-id", +"client-id"); + +final String topicName = "foo"; +final int partition = 3; +final TopicPartition tp = new TopicPartition(topicName, partition); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); +Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); +completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +doReturn(cluster).when(metadata).fetch(); + +HashSet topics = new HashSet<>(); +topics.add(topicName); +doReturn(topics).when(cluster).topics(); + +consumer.subscribe(Pattern.compile("f*")); +verify(metadata).requestUpdateForNewTopics(); +verify(subscriptions).matchesSubscribedPattern(topicName); + +consumer.poll(Duration.ZERO); +clearInvocations(subscriptions); +verify(subscriptions, never()).matchesSubscribedPattern(topicName); Review Comment: I would say that we should complete the story here to really know that our fix is kicking in. Up to this point we tested that we don't evaluate the regex on poll, but we should make sure it's because of the logic we're adding to skip it if metadata did not change. So what about extending the test to mock a metadata change, poll, and see the regex is indeed evaluated in that case? ```suggestion when(metadata.updateVersion()).thenReturn(2); when(subscriptions.hasPatternSubscription()).thenReturn(true); consumer.poll(Duration.ZERO); verify(subscriptions).matchesSubscribedPattern(topicName); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1603624271 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -463,6 +467,42 @@ public void onPartitionsAssigned(final Collection partitions) { assertTrue(callbackExecuted.get()); } +@Test +public void testCheckForNewTopicOnlyWhenMetadataChange() { +SubscriptionState subscriptions = mock(SubscriptionState.class); +Cluster cluster = mock(Cluster.class); + +consumer = newConsumer( +mock(FetchBuffer.class), +mock(ConsumerInterceptors.class), +mock(ConsumerRebalanceListenerInvoker.class), +subscriptions, +singletonList(new RoundRobinAssignor()), +"group-id", +"client-id"); + +final String topicName = "foo"; +final int partition = 3; +final TopicPartition tp = new TopicPartition(topicName, partition); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); +Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); Review Comment: What about `Collections.singletonMap(tp, new OffsetAndMetadata(1));` ? seems simpler -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1603605365 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -463,6 +467,42 @@ public void onPartitionsAssigned(final Collection partitions) { assertTrue(callbackExecuted.get()); } +@Test +public void testCheckForNewTopicOnlyWhenMetadataChange() { +SubscriptionState subscriptions = mock(SubscriptionState.class); +Cluster cluster = mock(Cluster.class); + +consumer = newConsumer( +mock(FetchBuffer.class), +mock(ConsumerInterceptors.class), +mock(ConsumerRebalanceListenerInvoker.class), +subscriptions, +singletonList(new RoundRobinAssignor()), +"group-id", +"client-id"); + +final String topicName = "foo"; +final int partition = 3; +final TopicPartition tp = new TopicPartition(topicName, partition); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); +Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); +completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +doReturn(cluster).when(metadata).fetch(); + +HashSet topics = new HashSet<>(); +topics.add(topicName); +doReturn(topics).when(cluster).topics(); + +consumer.subscribe(Pattern.compile("f*")); +verify(metadata).requestUpdateForNewTopics(); +verify(subscriptions).matchesSubscribedPattern(topicName); + +consumer.poll(Duration.ZERO); +clearInvocations(subscriptions); Review Comment: this call should be done before we poll (otherwise it's because of this here that the next `verify(subscriptions, never())` succeeds, so not really testing the logic) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1603594084 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -463,6 +467,42 @@ public void onPartitionsAssigned(final Collection partitions) { assertTrue(callbackExecuted.get()); } +@Test +public void testCheckForNewTopicOnlyWhenMetadataChange() { +SubscriptionState subscriptions = mock(SubscriptionState.class); +Cluster cluster = mock(Cluster.class); + +consumer = newConsumer( +mock(FetchBuffer.class), +mock(ConsumerInterceptors.class), +mock(ConsumerRebalanceListenerInvoker.class), +subscriptions, +singletonList(new RoundRobinAssignor()), +"group-id", +"client-id"); + +final String topicName = "foo"; +final int partition = 3; +final TopicPartition tp = new TopicPartition(topicName, partition); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); +Map offsets = mkMap(mkEntry(tp, new OffsetAndMetadata(1))); +completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); +doReturn(cluster).when(metadata).fetch(); + +HashSet topics = new HashSet<>(); +topics.add(topicName); +doReturn(topics).when(cluster).topics(); Review Comment: we could simplify this to a single `doReturn(Collections.singleton(topicName)).when(cluster).topics();` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1603537376 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1482,7 +1486,7 @@ private void updatePatternSubscription(Cluster cluster) { .collect(Collectors.toSet()); if (subscriptions.subscribeFromPattern(topicsToSubscribe)) { applicationEventHandler.add(new SubscriptionChangeEvent()); -metadata.requestUpdateForNewTopics(); +this.metadataVersionSnapshot = metadata.requestUpdateForNewTopics(); Review Comment: Since we're completing the regex implementation, could you clean up the func doc? We should remove the TODO, and I would suggest we rephrase the description, and simply state what this evaluates the regex against the list of topic names from metadata, and updates the subscription state with the matching topics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1603504298 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -463,6 +467,42 @@ public void onPartitionsAssigned(final Collection partitions) { assertTrue(callbackExecuted.get()); } +@Test +public void testCheckForNewTopicOnlyWhenMetadataChange() { +SubscriptionState subscriptions = mock(SubscriptionState.class); +Cluster cluster = mock(Cluster.class); + +consumer = newConsumer( +mock(FetchBuffer.class), +mock(ConsumerInterceptors.class), +mock(ConsumerRebalanceListenerInvoker.class), +subscriptions, +singletonList(new RoundRobinAssignor()), Review Comment: we should not have to deal with the old assignors in the new consumer, so this made me notice we have a bug that makes you have to provide this. That needs to be fixed, so I filed https://issues.apache.org/jira/browse/KAFKA-16786 for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
Phuc-Hong-Tran commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1603361345 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1968,9 +1972,13 @@ SubscriptionState subscriptions() { } private void maybeUpdateSubscriptionMetadata() { -if (subscriptions.hasPatternSubscription()) { -updatePatternSubscription(metadata.fetch()); +if (this.metadataVersionSnapshot < metadata.updateVersion()) { +this.metadataVersionSnapshot = metadata.updateVersion(); +if (subscriptions.hasPatternSubscription()) { +updatePatternSubscription(metadata.fetch()); +} } +updatePatternSubscription(metadata.fetch()); Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
Phuc-Hong-Tran commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1603357750 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1968,9 +1972,13 @@ SubscriptionState subscriptions() { } private void maybeUpdateSubscriptionMetadata() { -if (subscriptions.hasPatternSubscription()) { -updatePatternSubscription(metadata.fetch()); +if (this.metadataVersionSnapshot < metadata.updateVersion()) { +this.metadataVersionSnapshot = metadata.updateVersion(); +if (subscriptions.hasPatternSubscription()) { +updatePatternSubscription(metadata.fetch()); +} } +updatePatternSubscription(metadata.fetch()); Review Comment: My bad, I'll fix it now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1603351190 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1968,9 +1972,13 @@ SubscriptionState subscriptions() { } private void maybeUpdateSubscriptionMetadata() { -if (subscriptions.hasPatternSubscription()) { -updatePatternSubscription(metadata.fetch()); +if (this.metadataVersionSnapshot < metadata.updateVersion()) { +this.metadataVersionSnapshot = metadata.updateVersion(); +if (subscriptions.hasPatternSubscription()) { +updatePatternSubscription(metadata.fetch()); +} } +updatePatternSubscription(metadata.fetch()); Review Comment: I would say this duplicated call ended up here by mistake? (and should be removed)...we only want to check/update the pattern if the metadata changed and `hasPatternSubscription` (done above on ln 1978) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1603351190 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1968,9 +1972,13 @@ SubscriptionState subscriptions() { } private void maybeUpdateSubscriptionMetadata() { -if (subscriptions.hasPatternSubscription()) { -updatePatternSubscription(metadata.fetch()); +if (this.metadataVersionSnapshot < metadata.updateVersion()) { +this.metadataVersionSnapshot = metadata.updateVersion(); +if (subscriptions.hasPatternSubscription()) { +updatePatternSubscription(metadata.fetch()); +} } +updatePatternSubscription(metadata.fetch()); Review Comment: I would say this duplicated call ended up here by mistake? we only want to check/update the pattern if the metadata changed and `hasPatternSubscription` (done above on ln 1978) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
Phuc-Hong-Tran commented on PR #15869: URL: https://github.com/apache/kafka/pull/15869#issuecomment-2115102574 @lianetm, I've added the required test and refactored according to your advices, may you have a look once you have time? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
Phuc-Hong-Tran commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1602533374 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1968,7 +1972,7 @@ SubscriptionState subscriptions() { } private void maybeUpdateSubscriptionMetadata() { -if (subscriptions.hasPatternSubscription()) { +if (subscriptions.hasPatternSubscription() && this.mirroredMetadataUpdateVersion < metadata.updateVersion()) { Review Comment: I understand, will update it accordingly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on PR #15869: URL: https://github.com/apache/kafka/pull/15869#issuecomment-2101139225 Hey @Phuc-Hong-Tran , thanks for the changes, left some comments. Could we add unit tests for this? Definitely helpful to validate the main goal of this patch, which is to ensure the regex in only evaluated once, and not on following calls to poll (when metadata remains unchanged). I would suggest playing with the mocks of the metadata object to mock values for `metadata.updateVersion()`, and verify calls to `subscription.matchesSubscribedPattern(topic)`. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1594430619 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1968,7 +1972,7 @@ SubscriptionState subscriptions() { } private void maybeUpdateSubscriptionMetadata() { -if (subscriptions.hasPatternSubscription()) { +if (subscriptions.hasPatternSubscription() && this.mirroredMetadataUpdateVersion < metadata.updateVersion()) { Review Comment: shouldn't we update the `mirroredMetadataUpdateVersion` here, when we know that it changed but there is no pattern subscription at that moment? We update it whenever there are calls to subscribe that make the subscription change, but this path is triggered from the poll regularly, so if we know metadata changed, better to update it just for consistency and to have an accurate snapshot (even if the user makes no more calls to subscribe) ```suggestion if (this.metadataVersionSnapshot < metadata.updateVersion()) { this.metadataVersionSnapshot = metadata.updateVersion(); if (subscriptions.hasPatternSubscription()) { updatePatternSubscription(metadata.fetch()); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16493: Avoid unneeded subscription regex check if metadata version unchanged [kafka]
lianetm commented on code in PR #15869: URL: https://github.com/apache/kafka/pull/15869#discussion_r1594393442 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -260,6 +260,7 @@ private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { private final SubscriptionState subscriptions; private final ConsumerMetadata metadata; +private int mirroredMetadataUpdateVersion; Review Comment: maybe `metadataVersionSnapshot` would reflect better what this var is for? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org