Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]

2023-12-03 Thread via GitHub


lucasbru merged PR #14872:
URL: https://github.com/apache/kafka/pull/14872


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]

2023-11-30 Thread via GitHub


cadonna commented on code in PR #14872:
URL: https://github.com/apache/kafka/pull/14872#discussion_r1410551672


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -752,6 +757,70 @@ public void testSubscriptionOnEmptyTopic() {
 assertThrows(IllegalArgumentException.class, () -> 
consumer.subscribe(singletonList(emptyTopic)));
 }
 
+@Test
+public void testGroupIdIsNull() {
+final Properties props = requiredConsumerProperties();
+props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1);
+props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
+final ConsumerConfig config = new ConsumerConfig(props);
+
+try (AsyncKafkaConsumer consumer =
+ new AsyncKafkaConsumer<>(config, new StringDeserializer(), 
new StringDeserializer())) {
+
assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+
assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
+} catch (final Exception exception) {
+throw new AssertionFailedError("The following exception was not 
expected:", exception);
+}
+}
+
+@Test
+public void testGroupIdIsNotNullAndValid() {
+final Properties props = requiredConsumerProperties();
+props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
+props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1);
+props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
+final ConsumerConfig config = new ConsumerConfig(props);
+
+try (AsyncKafkaConsumer consumer =
+ new AsyncKafkaConsumer<>(config, new StringDeserializer(), 
new StringDeserializer())) {
+
assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+
assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
+} catch (final Exception exception) {
+throw new AssertionFailedError("The following exception was not 
expected:", exception);
+}
+}
+
+@Test
+public void testGroupIdEmpty() {
+testGroupId("");
+}
+
+@Test
+public void testGroupIdOnlyWhitespaces() {
+testGroupId("   ");
+}
+
+private void testGroupId(final String groupId) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]

2023-11-30 Thread via GitHub


cadonna commented on code in PR #14872:
URL: https://github.com/apache/kafka/pull/14872#discussion_r1410551139


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -752,6 +757,70 @@ public void testSubscriptionOnEmptyTopic() {
 assertThrows(IllegalArgumentException.class, () -> 
consumer.subscribe(singletonList(emptyTopic)));
 }
 
+@Test
+public void testGroupIdIsNull() {
+final Properties props = requiredConsumerProperties();
+props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1);
+props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
+final ConsumerConfig config = new ConsumerConfig(props);
+
+try (AsyncKafkaConsumer consumer =
+ new AsyncKafkaConsumer<>(config, new StringDeserializer(), 
new StringDeserializer())) {
+
assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+
assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
+} catch (final Exception exception) {
+throw new AssertionFailedError("The following exception was not 
expected:", exception);
+}
+}
+
+@Test
+public void testGroupIdIsNotNullAndValid() {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]

2023-11-30 Thread via GitHub


cadonna commented on code in PR #14872:
URL: https://github.com/apache/kafka/pull/14872#discussion_r1410550926


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1370,7 +1395,11 @@ public KafkaConsumerMetrics kafkaConsumerMetrics() {
 private void maybeThrowFencedInstanceException() {
 if (isFenced) {
 throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " +
-groupInstanceId.orElse("null"));
+groupMetadata.orElseThrow(
+() -> new IllegalStateException("No group metadata found 
although a group ID was provided. This is a bug!")

Review Comment:
   I think the most important thing is that the  `FencedInstanceIdException` is 
thrown. I changed it to just log errors before `FencedInstanceIdException` is 
thrown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]

2023-11-30 Thread via GitHub


cadonna commented on code in PR #14872:
URL: https://github.com/apache/kafka/pull/14872#discussion_r1410538266


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -672,12 +693,16 @@ public Map 
committed(final Set

Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]

2023-11-30 Thread via GitHub


cadonna commented on code in PR #14872:
URL: https://github.com/apache/kafka/pull/14872#discussion_r1410527548


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -672,12 +693,16 @@ public Map 
committed(final Set

Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]

2023-11-30 Thread via GitHub


cadonna commented on code in PR #14872:
URL: https://github.com/apache/kafka/pull/14872#discussion_r1410526919


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -398,6 +387,38 @@ public class AsyncKafkaConsumer implements 
ConsumerDelegate {
 requestManagersSupplier);
 }
 
+private Optional initializeGroupMetadata(final 
ConsumerConfig config,
+final 
GroupRebalanceConfig groupRebalanceConfig) {
+final Optional groupMetadata = 
initializeGroupMetadata(
+groupRebalanceConfig.groupId,
+groupRebalanceConfig.groupInstanceId
+);
+if (!groupMetadata.isPresent()) {
+config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+}
+return groupMetadata;
+}
+
+private Optional initializeGroupMetadata(final 
String groupId,
+final 
Optional groupInstanceId) {
+if (groupId != null) {
+if (groupId.isEmpty()) {
+throwInInvalidGroupIdException();
+return Optional.empty();

Review Comment:
   Yes, I know, but without it won't compile because it needs a return 
statement in each branch. However, I think I found a better solution, now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]

2023-11-30 Thread via GitHub


lucasbru commented on code in PR #14872:
URL: https://github.com/apache/kafka/pull/14872#discussion_r1410466885


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -398,6 +387,38 @@ public class AsyncKafkaConsumer implements 
ConsumerDelegate {
 requestManagersSupplier);
 }
 
+private Optional initializeGroupMetadata(final 
ConsumerConfig config,
+final 
GroupRebalanceConfig groupRebalanceConfig) {
+final Optional groupMetadata = 
initializeGroupMetadata(
+groupRebalanceConfig.groupId,
+groupRebalanceConfig.groupInstanceId
+);
+if (!groupMetadata.isPresent()) {
+config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+}
+return groupMetadata;
+}
+
+private Optional initializeGroupMetadata(final 
String groupId,
+final 
Optional groupInstanceId) {
+if (groupId != null) {
+if (groupId.isEmpty()) {
+throwInInvalidGroupIdException();
+return Optional.empty();

Review Comment:
   Unreachable!



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -752,6 +757,70 @@ public void testSubscriptionOnEmptyTopic() {
 assertThrows(IllegalArgumentException.class, () -> 
consumer.subscribe(singletonList(emptyTopic)));
 }
 
+@Test
+public void testGroupIdIsNull() {
+final Properties props = requiredConsumerProperties();
+props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1);
+props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
+final ConsumerConfig config = new ConsumerConfig(props);
+
+try (AsyncKafkaConsumer consumer =
+ new AsyncKafkaConsumer<>(config, new StringDeserializer(), 
new StringDeserializer())) {
+
assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+
assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
+} catch (final Exception exception) {
+throw new AssertionFailedError("The following exception was not 
expected:", exception);
+}
+}
+
+@Test
+public void testGroupIdIsNotNullAndValid() {

Review Comment:
   I'd personally drop the `Is`, to be consistent with the tests below. Also 
above.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -672,12 +693,16 @@ public Map 
committed(final Set 
consumer.subscribe(singletonList(emptyTopic)));
 }
 
+@Test
+public void testGroupIdIsNull() {
+final Properties props = requiredConsumerProperties();
+props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1);
+props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
+final ConsumerConfig config = new ConsumerConfig(props);
+
+try (AsyncKafkaConsumer consumer =
+ new AsyncKafkaConsumer<>(config, new StringDeserializer(), 
new StringDeserializer())) {
+
assertFalse(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+
assertFalse(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
+} catch (final Exception exception) {
+throw new AssertionFailedError("The following exception was not 
expected:", exception);
+}
+}
+
+@Test
+public void testGroupIdIsNotNullAndValid() {
+final Properties props = requiredConsumerProperties();
+props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupA");
+props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1);
+props.put(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED, true);
+final ConsumerConfig config = new ConsumerConfig(props);
+
+try (AsyncKafkaConsumer consumer =
+ new AsyncKafkaConsumer<>(config, new StringDeserializer(), 
new StringDeserializer())) {
+
assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+
assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
+} catch (final Exception exception) {
+throw new AssertionFailedError("The following exception was not 
expected:", exception);
+}
+}
+
+@Test
+public void testGroupIdEmpty() {
+testGroupId("");
+}
+
+@Test
+public void testGroupIdOnlyWhitespaces() {
+testGroupId("   ");
+}
+
+private void testGroupId(final String groupId) {

Review Comment:
   `testInvalidGroupId`?



##

Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]

2023-11-29 Thread via GitHub


cadonna commented on code in PR #14872:
URL: https://github.com/apache/kafka/pull/14872#discussion_r1409850375


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -752,6 +757,70 @@ public void testSubscriptionOnEmptyTopic() {
 assertThrows(IllegalArgumentException.class, () -> 
consumer.subscribe(singletonList(emptyTopic)));
 }
 
+@Test

Review Comment:
   I put the tests here because they do only apply to the async consumer for 
the moment. As I wrote in a different comment, moving forward, we should move 
this tests to the `ConsumerConfig`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]

2023-11-29 Thread via GitHub


cadonna commented on code in PR #14872:
URL: https://github.com/apache/kafka/pull/14872#discussion_r1409848052


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -398,6 +387,38 @@ public class AsyncKafkaConsumer implements 
ConsumerDelegate {
 requestManagersSupplier);
 }
 
+private Optional initializeGroupMetadata(final 
ConsumerConfig config,
+final 
GroupRebalanceConfig groupRebalanceConfig) {
+final Optional groupMetadata = 
initializeGroupMetadata(
+groupRebalanceConfig.groupId,
+groupRebalanceConfig.groupInstanceId
+);
+if (!groupMetadata.isPresent()) {
+config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+config.ignore(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+}
+return groupMetadata;
+}
+
+private Optional initializeGroupMetadata(final 
String groupId,
+final 
Optional groupInstanceId) {
+if (groupId != null) {
+if (groupId.isEmpty()) {

Review Comment:
   I think moving forward when we will also change the validation of the group 
ID for the legacy consumer in 4.0 we should write a validator for the group ID 
config in `ConsumerConfig` throw there. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]

2023-11-29 Thread via GitHub


cadonna commented on PR #14872:
URL: https://github.com/apache/kafka/pull/14872#issuecomment-1832675480

   @kirktrue please review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14438: Throw if async consumer configured with invalid group ID [kafka]

2023-11-29 Thread via GitHub


cadonna opened a new pull request, #14872:
URL: https://github.com/apache/kafka/pull/14872

   Verifies that the group ID passed into the async consumer is valid. That is, 
if the group ID is not null, it is not empty or it does not consist of only 
whitespaces.
   
   This change stores the group ID in the group metadata because KAFKA-15281 
about the group metadata API will build on that.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org