Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]
lucasbru merged PR #14872: URL: https://github.com/apache/kafka/pull/14872 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]
cadonna commented on code in PR #14872: URL: https://github.com/apache/kafka/pull/14872#discussion_r1410551672 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -752,6 +757,70 @@ public void testSubscriptionOnEmptyTopic() { assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(singletonList(emptyTopic))); } +@Test +public void testGroupIdIsNull() { +final Properties props = requiredConsumerProperties(); +props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1); +props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); +final ConsumerConfig config = new ConsumerConfig(props); + +try (AsyncKafkaConsumer consumer = + new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { + assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); +} catch (final Exception exception) { +throw new AssertionFailedError("The following exception was not expected:", exception); +} +} + +@Test +public void testGroupIdIsNotNullAndValid() { +final Properties props = requiredConsumerProperties(); +props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA"); +props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1); +props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); +final ConsumerConfig config = new ConsumerConfig(props); + +try (AsyncKafkaConsumer consumer = + new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { + assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); +} catch (final Exception exception) { +throw new AssertionFailedError("The following exception was not expected:", exception); +} +} + +@Test +public void testGroupIdEmpty() { +testGroupId(""); +} + +@Test +public void testGroupIdOnlyWhitespaces() { +testGroupId(" "); +} + +private void testGroupId(final String groupId) { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]
cadonna commented on code in PR #14872: URL: https://github.com/apache/kafka/pull/14872#discussion_r1410551139 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -752,6 +757,70 @@ public void testSubscriptionOnEmptyTopic() { assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(singletonList(emptyTopic))); } +@Test +public void testGroupIdIsNull() { +final Properties props = requiredConsumerProperties(); +props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1); +props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); +final ConsumerConfig config = new ConsumerConfig(props); + +try (AsyncKafkaConsumer consumer = + new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { + assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); +} catch (final Exception exception) { +throw new AssertionFailedError("The following exception was not expected:", exception); +} +} + +@Test +public void testGroupIdIsNotNullAndValid() { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]
cadonna commented on code in PR #14872: URL: https://github.com/apache/kafka/pull/14872#discussion_r1410550926 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1370,7 +1395,11 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() { private void maybeThrowFencedInstanceException() { if (isFenced) { throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + -groupInstanceId.orElse("null")); +groupMetadata.orElseThrow( +() -> new IllegalStateException("No group metadata found although a group ID was provided. This is a bug!") Review Comment: I think the most important thing is that the `FencedInstanceIdException` is thrown. I changed it to just log errors before `FencedInstanceIdException` is thrown. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]
cadonna commented on code in PR #14872: URL: https://github.com/apache/kafka/pull/14872#discussion_r1410538266 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -672,12 +693,16 @@ public Map committed(final Set
Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]
cadonna commented on code in PR #14872: URL: https://github.com/apache/kafka/pull/14872#discussion_r1410527548 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -672,12 +693,16 @@ public Map committed(final Set
Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]
cadonna commented on code in PR #14872: URL: https://github.com/apache/kafka/pull/14872#discussion_r1410526919 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -398,6 +387,38 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { requestManagersSupplier); } +private Optional initializeGroupMetadata(final ConsumerConfig config, +final GroupRebalanceConfig groupRebalanceConfig) { +final Optional groupMetadata = initializeGroupMetadata( +groupRebalanceConfig.groupId, +groupRebalanceConfig.groupInstanceId +); +if (!groupMetadata.isPresent()) { +config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); +config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED); +} +return groupMetadata; +} + +private Optional initializeGroupMetadata(final String groupId, +final Optional groupInstanceId) { +if (groupId != null) { +if (groupId.isEmpty()) { +throwInInvalidGroupIdException(); +return Optional.empty(); Review Comment: Yes, I know, but without it won't compile because it needs a return statement in each branch. However, I think I found a better solution, now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]
lucasbru commented on code in PR #14872: URL: https://github.com/apache/kafka/pull/14872#discussion_r1410466885 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -398,6 +387,38 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { requestManagersSupplier); } +private Optional initializeGroupMetadata(final ConsumerConfig config, +final GroupRebalanceConfig groupRebalanceConfig) { +final Optional groupMetadata = initializeGroupMetadata( +groupRebalanceConfig.groupId, +groupRebalanceConfig.groupInstanceId +); +if (!groupMetadata.isPresent()) { +config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); +config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED); +} +return groupMetadata; +} + +private Optional initializeGroupMetadata(final String groupId, +final Optional groupInstanceId) { +if (groupId != null) { +if (groupId.isEmpty()) { +throwInInvalidGroupIdException(); +return Optional.empty(); Review Comment: Unreachable! ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -752,6 +757,70 @@ public void testSubscriptionOnEmptyTopic() { assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(singletonList(emptyTopic))); } +@Test +public void testGroupIdIsNull() { +final Properties props = requiredConsumerProperties(); +props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1); +props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); +final ConsumerConfig config = new ConsumerConfig(props); + +try (AsyncKafkaConsumer consumer = + new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { + assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); +} catch (final Exception exception) { +throw new AssertionFailedError("The following exception was not expected:", exception); +} +} + +@Test +public void testGroupIdIsNotNullAndValid() { Review Comment: I'd personally drop the `Is`, to be consistent with the tests below. Also above. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -672,12 +693,16 @@ public Map committed(final Set consumer.subscribe(singletonList(emptyTopic))); } +@Test +public void testGroupIdIsNull() { +final Properties props = requiredConsumerProperties(); +props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1); +props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); +final ConsumerConfig config = new ConsumerConfig(props); + +try (AsyncKafkaConsumer consumer = + new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { + assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); +} catch (final Exception exception) { +throw new AssertionFailedError("The following exception was not expected:", exception); +} +} + +@Test +public void testGroupIdIsNotNullAndValid() { +final Properties props = requiredConsumerProperties(); +props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA"); +props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1); +props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true); +final ConsumerConfig config = new ConsumerConfig(props); + +try (AsyncKafkaConsumer consumer = + new AsyncKafkaConsumer<>(config, new StringDeserializer(), new StringDeserializer())) { + assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); + assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED)); +} catch (final Exception exception) { +throw new AssertionFailedError("The following exception was not expected:", exception); +} +} + +@Test +public void testGroupIdEmpty() { +testGroupId(""); +} + +@Test +public void testGroupIdOnlyWhitespaces() { +testGroupId(" "); +} + +private void testGroupId(final String groupId) { Review Comment: `testInvalidGroupId`? ##
Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]
cadonna commented on code in PR #14872: URL: https://github.com/apache/kafka/pull/14872#discussion_r1409850375 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -752,6 +757,70 @@ public void testSubscriptionOnEmptyTopic() { assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(singletonList(emptyTopic))); } +@Test Review Comment: I put the tests here because they do only apply to the async consumer for the moment. As I wrote in a different comment, moving forward, we should move this tests to the `ConsumerConfig`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]
cadonna commented on code in PR #14872: URL: https://github.com/apache/kafka/pull/14872#discussion_r1409848052 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -398,6 +387,38 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { requestManagersSupplier); } +private Optional initializeGroupMetadata(final ConsumerConfig config, +final GroupRebalanceConfig groupRebalanceConfig) { +final Optional groupMetadata = initializeGroupMetadata( +groupRebalanceConfig.groupId, +groupRebalanceConfig.groupInstanceId +); +if (!groupMetadata.isPresent()) { +config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); +config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED); +} +return groupMetadata; +} + +private Optional initializeGroupMetadata(final String groupId, +final Optional groupInstanceId) { +if (groupId != null) { +if (groupId.isEmpty()) { Review Comment: I think moving forward when we will also change the validation of the group ID for the legacy consumer in 4.0 we should write a validator for the group ID config in `ConsumerConfig` throw there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]
cadonna commented on PR #14872: URL: https://github.com/apache/kafka/pull/14872#issuecomment-1832675480 @kirktrue please review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]
cadonna opened a new pull request, #14872: URL: https://github.com/apache/kafka/pull/14872 Verifies that the group ID passed into the async consumer is valid. That is, if the group ID is not null, it is not empty or it does not consist of only whitespaces. This change stores the group ID in the group metadata because KAFKA-15281 about the group metadata API will build on that. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org