Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-11-02 Thread via GitHub


dajac merged PR #14642:
URL: https://github.com/apache/kafka/pull/14642


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-11-02 Thread via GitHub


dajac commented on PR #14642:
URL: https://github.com/apache/kafka/pull/14642#issuecomment-1790298798

   Last build did not pass. It should not be related to this PR but to be sure 
I just triggered a new one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-11-01 Thread via GitHub


philipnee commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1379231747


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = 
GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT);
+public static final String GROUP_PROTOCOL_DOC = "The group protocol 
consumer should use. We currently " +
+"support \"generic\" or \"consumer\". If \"consumer\" is specified, 
then the consumer group protocol will be " +
+"used. Otherwise, the generic group protocol will be used.";
+
+/**
+* group.remote.assignor
+*/
+public static final String REMOTE_ASSIGNOR_CONFIG = 
"group.remote.assignor";
+public static final String DEFAULT_REMOTE_ASSIGNOR = null;
+public static final String REMOTE_ASSIGNOR_DOC = "The server-side assignor 
to use. If no assignor is specified, " +

Review Comment:
   To make it more consistent: DEFAULT_GROUP_REMOTE_ASSIGNOR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-11-01 Thread via GitHub


AndrewJSchofield commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1379170868


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = 
GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT);
+public static final String GROUP_PROTOCOL_DOC = "The group protocol 
consumer should use. We currently " +
+"support \"generic\" or \"consumer\". If \"consumer\" is specified, 
then the consumer group protocol will be " +
+"used. Otherwise, the generic group protocol will be used.";
+
+/**
+* group.remote.assignor
+*/
+public static final String REMOTE_ASSIGNOR_CONFIG = 
"group.remote.assignor";

Review Comment:
   `GROUP_REMOTE_ASSIGNOR_CONFIG` would be better.



##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = 
GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT);
+public static final String GROUP_PROTOCOL_DOC = "The group protocol 
consumer should use. We currently " +
+"support \"generic\" or \"consumer\". If \"consumer\" is specified, 
then the consumer group protocol will be " +
+"used. Otherwise, the generic group protocol will be used.";
+
+/**
+* group.remote.assignor
+*/
+public static final String REMOTE_ASSIGNOR_CONFIG = 
"group.remote.assignor";
+public static final String DEFAULT_REMOTE_ASSIGNOR = null;
+public static final String REMOTE_ASSIGNOR_DOC = "The server-side assignor 
to use. If no assignor is specified, " +

Review Comment:
   `GROUP_REMOTE_ASSIGNOR_DOC`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-31 Thread via GitHub


philipnee commented on PR #14642:
URL: https://github.com/apache/kafka/pull/14642#issuecomment-1787563665

   Hi @dajac - Again, thank you for the review. Your comments are addressed as 
of the latest commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-31 Thread via GitHub


dajac commented on PR #14642:
URL: https://github.com/apache/kafka/pull/14642#issuecomment-1786893772

   @philipnee Thanks for the update. I left one more comment and a suggestion 
for consideration.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-31 Thread via GitHub


dajac commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1377337596


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##
@@ -158,4 +163,54 @@ public void testCaseInsensitiveSecurityProtocol() {
 final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
 assertEquals(saslSslLowerCase, 
consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
 }
+
+@Test
+public void testDefaultConsumerGroupConfig() {
+final Map configs = new HashMap<>();
+configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+assertEquals("generic", 
consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
+
assertNull(consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG));
+}
+
+@Test
+public void testRemoteAssignorConfig() {
+String remoteAssignorName = 
"org.apache.kafka.clients.group.someAssignor";
+String protocol = "consumer";
+final Map configs = new HashMap<>();
+configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+configs.put(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG, remoteAssignorName);
+configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, protocol);
+final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+assertEquals(protocol, 
consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
+assertEquals(remoteAssignorName, 
consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG));
+}
+
+@ParameterizedTest
+@MethodSource("protocolNameSupplier")

Review Comment:
   fyi, the CsvSource is pretty handy in this case:
   
   ```
   @CsvSource(value = Array("consumer,true", "Consumer,true", "generic,true", 
"Generic,true", "invalid,false"))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-31 Thread via GitHub


dajac commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1377336442


##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##
@@ -158,4 +163,54 @@ public void testCaseInsensitiveSecurityProtocol() {
 final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
 assertEquals(saslSslLowerCase, 
consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
 }
+
+@Test
+public void testDefaultConsumerGroupConfig() {
+final Map configs = new HashMap<>();
+configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+assertEquals("generic", 
consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
+
assertNull(consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG));
+}
+
+@Test
+public void testRemoteAssignorConfig() {
+String remoteAssignorName = 
"org.apache.kafka.clients.group.someAssignor";
+String protocol = "consumer";
+final Map configs = new HashMap<>();
+configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+configs.put(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG, remoteAssignorName);
+configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, protocol);
+final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+assertEquals(protocol, 
consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
+assertEquals(remoteAssignorName, 
consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG));
+}
+
+@ParameterizedTest
+@MethodSource("protocolNameSupplier")
+public void testProtocolConfigValidation(String protocol, boolean isValid) 
{
+final Map configs = new HashMap<>();
+configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, protocol);
+try {
+ConsumerConfig config = new ConsumerConfig(configs);
+assertEquals(protocol, 
config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
+} catch (ConfigException e) {
+if (isValid)
+fail("Should not have thrown an exception");

Review Comment:
   I am not a fan of this because you don't know why it fails if it ever fails. 
At minimum, we should include the exception in the `fail` message.
   
   Otherwise, we could do it as follow:
   ```
   if (isValid) {
   ConsumerConfig config = new ConsumerConfig(configs);
assertEquals(protocol, 
config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
   } else {
   assertThrows(ConfigException.class, () -> new ConsumerConfig(configs));
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-30 Thread via GitHub


philipnee commented on PR #14642:
URL: https://github.com/apache/kafka/pull/14642#issuecomment-1786388893

   Hi @dajac - Thank you for the comments.  I addressed them in the latest 
commits. Would you mind reviewing the changes again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-30 Thread via GitHub


dajac commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1376703936


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = 
GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT);
+public static final String GROUP_PROTOCOL_DOC = "The group protocol 
consumer should use.  We currently " +

Review Comment:
   nit: Let's remove the double spaces after the dots.



##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##
@@ -158,4 +158,27 @@ public void testCaseInsensitiveSecurityProtocol() {
 final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
 assertEquals(saslSslLowerCase, 
consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
 }
+
+@Test
+public void testDefaultConsumerGroupConfig() {
+final Map configs = new HashMap<>();
+configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+assertEquals("generic", 
consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
+assertEquals(null, 
consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG));
+}
+
+@Test
+public void testValidConsumerGroupConfig() {
+String remoteAssignorName = 
"org.apache.kafka.clients.group.someAssignor";
+final Map configs = new HashMap<>();
+configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer");
+configs.put(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG, 
"org.apache.kafka.clients.group.someAssignor");

Review Comment:
   nit: I suppose that we could reuse `remoteAssignorName` here?



##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##
@@ -158,4 +158,27 @@ public void testCaseInsensitiveSecurityProtocol() {
 final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
 assertEquals(saslSslLowerCase, 
consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
 }
+
+@Test
+public void testDefaultConsumerGroupConfig() {
+final Map configs = new HashMap<>();
+configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+assertEquals("generic", 
consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
+assertEquals(null, 
consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG));
+}
+
+@Test
+public void testValidConsumerGroupConfig() {
+String remoteAssignorName = 
"org.apache.kafka.clients.group.someAssignor";
+final Map configs = new HashMap<>();
+configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer");
+configs.put(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG, 
"org.apache.kafka.clients.group.someAssignor");
+final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+assertEquals("consumer", 
consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
+assertEquals(remoteAssignorName, 
consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG));
+}

Review Comment:
   Should we add a test to ensure that `GROUP_PROTOCOL_CONFIG` can only accept 
`consumer` and `generic`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-27 Thread via GitHub


philipnee commented on PR #14642:
URL: https://github.com/apache/kafka/pull/14642#issuecomment-1783444304

   @kirktrue @lucasbru @dajac - Thanks for taking time to review this PR. I've 
addressed the recent comments.  Let me know if there's anything uncleared.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-27 Thread via GitHub


philipnee commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1374998305


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig {
 DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
 Importance.MEDIUM,
 ALLOW_AUTO_CREATE_TOPICS_DOC)
+.define(GROUP_PROTOCOL_CONFIG,
+Type.STRING,
+DEFAULT_GROUP_PROTOCOL,
+ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(GroupProtocol.class)),
+Importance.HIGH,
+GROUP_PROTOCOL_DOC)
+.define(REMOTE_ASSIGNOR_CONFIG,
+Type.STRING,
+DEFAULT_REMOTE_ASSIGNOR,
+Importance.MEDIUM,
+REMOTE_ASSIGNOR_DOC)

Review Comment:
   sorry - I thought you meant the whole block is off. corrected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-27 Thread via GitHub


dajac commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1374964660


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,23 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = 
GroupProtocol.GENERIC.name().toLowerCase();
+public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol 
consumer should use.  We currently " +
+"support \"generic\" or \"consumer\". If \"consumer\" is specified, 
then the consumer group protocol will be " +
+"used.  Otherwise, the generic group protocol will be used.";
+
+/**
+* group.remote.assignor
+*/
+public static final String REMOTE_ASSIGNOR_CONFIG = 
"group.remote.assignor";
+public static final String DEFAULT_REMOTE_ASSIGNOR = null;
+public static final String REMOTE_ASSIGNOR_DOC = "The server-side assignor 
to use. If no assignor is specified, " +

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-27 Thread via GitHub


dajac commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1374964423


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,23 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = 
GroupProtocol.GENERIC.name().toLowerCase();
+public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol 
consumer should use.  We currently " +

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-27 Thread via GitHub


dajac commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1374963674


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig {
 DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
 Importance.MEDIUM,
 ALLOW_AUTO_CREATE_TOPICS_DOC)
+.define(GROUP_PROTOCOL_CONFIG,
+Type.STRING,
+DEFAULT_GROUP_PROTOCOL,
+ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(GroupProtocol.class)),
+Importance.HIGH,
+GROUP_PROTOCOL_DOC)
+.define(REMOTE_ASSIGNOR_CONFIG,
+Type.STRING,
+DEFAULT_REMOTE_ASSIGNOR,
+Importance.MEDIUM,
+REMOTE_ASSIGNOR_DOC)

Review Comment:
   @philipnee It would be better to update the new code to follow the existing 
style. This is what we usually do…



##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig {
 DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
 Importance.MEDIUM,
 ALLOW_AUTO_CREATE_TOPICS_DOC)
+.define(GROUP_PROTOCOL_CONFIG,
+Type.STRING,
+DEFAULT_GROUP_PROTOCOL,
+ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(GroupProtocol.class)),
+Importance.HIGH,
+GROUP_PROTOCOL_DOC)
+.define(REMOTE_ASSIGNOR_CONFIG,
+Type.STRING,
+DEFAULT_REMOTE_ASSIGNOR,
+Importance.MEDIUM,
+REMOTE_ASSIGNOR_DOC)

Review Comment:
   @philipnee It would be better to update the new code to follow the existing 
style. This is what we usually do…



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-27 Thread via GitHub


lucasbru commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1374443906


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,23 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = 
GroupProtocol.GENERIC.name().toLowerCase();
+public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol 
consumer should use.  We currently " +

Review Comment:
   Maybe we should stick with "group protocol" and not use the synonymous 
"rebalance protocol" here, although I know we use this interchangeably.
   
   Also, maybe that's just the non-native speaker in me, but for me the 
sentence would be easier to parse with an article for consumer.



##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,23 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = 
GroupProtocol.GENERIC.name().toLowerCase();
+public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol 
consumer should use.  We currently " +
+"support \"generic\" or \"consumer\". If \"consumer\" is specified, 
then the consumer group protocol will be " +
+"used.  Otherwise, the generic group protocol will be used.";
+
+/**
+* group.remote.assignor
+*/
+public static final String REMOTE_ASSIGNOR_CONFIG = 
"group.remote.assignor";
+public static final String DEFAULT_REMOTE_ASSIGNOR = null;
+public static final String REMOTE_ASSIGNOR_DOC = "The server-side assignor 
to use. If no assignor is specified, " +

Review Comment:
   Should we specify that the settings only applies when group.protocol = 
consumer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-26 Thread via GitHub


philipnee commented on PR #14642:
URL: https://github.com/apache/kafka/pull/14642#issuecomment-1782236753

   Hi @kirktrue Thanks for taking time reviewing my code. I made changes 
according to your suggestions. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-26 Thread via GitHub


philipnee commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1374036176


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = "generic";
+public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol 
consumer should use.  We currently " +
+"support GENERIC or CONSUMER. If CONSUMER is specified, then the 
consumer group protocol will be used.  " +
+"Otherwise, the generic group protocol will be used.";
+
+/**
+* group.remote.assignor
+*/
+public static final String REMOTE_ASSIGNOR_CONFIG = 
"group.remote.assignor";
+public static final String DEFAULT_REMOTE_ASSIGNOR = null;
+public static final String REMOTE_ASSIGNOR_DOC = "The server side assignor 
to use. It cannot be used in " +
+"conjunction with group.local.assignor. The group 
coordinator will choose the assignor if no " +

Review Comment:
   Good call. Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-26 Thread via GitHub


philipnee commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1374035895


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig {
 DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
 Importance.MEDIUM,
 ALLOW_AUTO_CREATE_TOPICS_DOC)
+.define(GROUP_PROTOCOL_CONFIG,
+Type.STRING,
+DEFAULT_GROUP_PROTOCOL,
+ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(GroupProtocol.class)),

Review Comment:
   108 char width!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-26 Thread via GitHub


philipnee commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1374035641


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig {
 DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
 Importance.MEDIUM,
 ALLOW_AUTO_CREATE_TOPICS_DOC)
+.define(GROUP_PROTOCOL_CONFIG,
+Type.STRING,
+DEFAULT_GROUP_PROTOCOL,
+ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(GroupProtocol.class)),
+Importance.HIGH,
+GROUP_PROTOCOL_DOC)
+.define(REMOTE_ASSIGNOR_CONFIG,
+Type.STRING,
+DEFAULT_REMOTE_ASSIGNOR,
+Importance.MEDIUM,
+REMOTE_ASSIGNOR_DOC)

Review Comment:
   You are right! I kept it that way because I was trying to avoid editing a 
few hundred lines of code!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-26 Thread via GitHub


kirktrue commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1373924481


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = "generic";
+public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol 
consumer should use.  We currently " +
+"support GENERIC or CONSUMER. If CONSUMER is specified, then the 
consumer group protocol will be used.  " +

Review Comment:
   I think we should use the lower-case versions of these strings:
   
   ```suggestion
   "support \"generic\" or \"consumer\". If \"consumer\" is specified, 
then the consumer group protocol will be used.  " +
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = "generic";
+public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol 
consumer should use.  We currently " +
+"support GENERIC or CONSUMER. If CONSUMER is specified, then the 
consumer group protocol will be used.  " +
+"Otherwise, the generic group protocol will be used.";
+
+/**
+* group.remote.assignor
+*/
+public static final String REMOTE_ASSIGNOR_CONFIG = 
"group.remote.assignor";
+public static final String DEFAULT_REMOTE_ASSIGNOR = null;
+public static final String REMOTE_ASSIGNOR_DOC = "The server side assignor 
to use. It cannot be used in " +
+"conjunction with group.local.assignor. The group 
coordinator will choose the assignor if no " +

Review Comment:
   Two questions:
   
   1. Is `group.local.assignor` the same as `group.local.assignors`?
   2. If `group.local.assignors` isn't in this PR, should we just omit that 
sentence?



##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig {
 DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
 Importance.MEDIUM,
 ALLOW_AUTO_CREATE_TOPICS_DOC)
+.define(GROUP_PROTOCOL_CONFIG,
+Type.STRING,
+DEFAULT_GROUP_PROTOCOL,
+ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(GroupProtocol.class)),

Review Comment:
   You need a wider monitor, @philipnee 
   
   ```suggestion
   
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(GroupProtocol.class)),
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig {
 DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
 Importance.MEDIUM,
 ALLOW_AUTO_CREATE_TOPICS_DOC)
+.define(GROUP_PROTOCOL_CONFIG,
+Type.STRING,
+DEFAULT_GROUP_PROTOCOL,
+ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(GroupProtocol.class)),

Review Comment:
   You need a wider monitor, @philipnee 
   
   ```suggestion
   
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(GroupProtocol.class)),
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String 

Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-25 Thread via GitHub


philipnee commented on PR #14642:
URL: https://github.com/apache/kafka/pull/14642#issuecomment-1780387305

   Apparently, it is not very easy to validate if the server-side assignor is 
used with the local assignor option instantiating the config.  We should 
validate this when starting up the client.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org