Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
dajac merged PR #14642: URL: https://github.com/apache/kafka/pull/14642 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
dajac commented on PR #14642: URL: https://github.com/apache/kafka/pull/14642#issuecomment-1790298798 Last build did not pass. It should not be related to this PR but to be sure I just triggered a new one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
philipnee commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1379231747 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT); +public static final String GROUP_PROTOCOL_DOC = "The group protocol consumer should use. We currently " + +"support \"generic\" or \"consumer\". If \"consumer\" is specified, then the consumer group protocol will be " + +"used. Otherwise, the generic group protocol will be used."; + +/** +* group.remote.assignor +*/ +public static final String REMOTE_ASSIGNOR_CONFIG = "group.remote.assignor"; +public static final String DEFAULT_REMOTE_ASSIGNOR = null; +public static final String REMOTE_ASSIGNOR_DOC = "The server-side assignor to use. If no assignor is specified, " + Review Comment: To make it more consistent: DEFAULT_GROUP_REMOTE_ASSIGNOR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
AndrewJSchofield commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1379170868 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT); +public static final String GROUP_PROTOCOL_DOC = "The group protocol consumer should use. We currently " + +"support \"generic\" or \"consumer\". If \"consumer\" is specified, then the consumer group protocol will be " + +"used. Otherwise, the generic group protocol will be used."; + +/** +* group.remote.assignor +*/ +public static final String REMOTE_ASSIGNOR_CONFIG = "group.remote.assignor"; Review Comment: `GROUP_REMOTE_ASSIGNOR_CONFIG` would be better. ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT); +public static final String GROUP_PROTOCOL_DOC = "The group protocol consumer should use. We currently " + +"support \"generic\" or \"consumer\". If \"consumer\" is specified, then the consumer group protocol will be " + +"used. Otherwise, the generic group protocol will be used."; + +/** +* group.remote.assignor +*/ +public static final String REMOTE_ASSIGNOR_CONFIG = "group.remote.assignor"; +public static final String DEFAULT_REMOTE_ASSIGNOR = null; +public static final String REMOTE_ASSIGNOR_DOC = "The server-side assignor to use. If no assignor is specified, " + Review Comment: `GROUP_REMOTE_ASSIGNOR_DOC` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
philipnee commented on PR #14642: URL: https://github.com/apache/kafka/pull/14642#issuecomment-1787563665 Hi @dajac - Again, thank you for the review. Your comments are addressed as of the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
dajac commented on PR #14642: URL: https://github.com/apache/kafka/pull/14642#issuecomment-1786893772 @philipnee Thanks for the update. I left one more comment and a suggestion for consideration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
dajac commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1377337596 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java: ## @@ -158,4 +163,54 @@ public void testCaseInsensitiveSecurityProtocol() { final ConsumerConfig consumerConfig = new ConsumerConfig(configs); assertEquals(saslSslLowerCase, consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); } + +@Test +public void testDefaultConsumerGroupConfig() { +final Map configs = new HashMap<>(); +configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass); +configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass); +final ConsumerConfig consumerConfig = new ConsumerConfig(configs); +assertEquals("generic", consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG)); + assertNull(consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG)); +} + +@Test +public void testRemoteAssignorConfig() { +String remoteAssignorName = "org.apache.kafka.clients.group.someAssignor"; +String protocol = "consumer"; +final Map configs = new HashMap<>(); +configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass); +configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass); +configs.put(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG, remoteAssignorName); +configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, protocol); +final ConsumerConfig consumerConfig = new ConsumerConfig(configs); +assertEquals(protocol, consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG)); +assertEquals(remoteAssignorName, consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG)); +} + +@ParameterizedTest +@MethodSource("protocolNameSupplier") Review Comment: fyi, the CsvSource is pretty handy in this case: ``` @CsvSource(value = Array("consumer,true", "Consumer,true", "generic,true", "Generic,true", "invalid,false")) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
dajac commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1377336442 ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java: ## @@ -158,4 +163,54 @@ public void testCaseInsensitiveSecurityProtocol() { final ConsumerConfig consumerConfig = new ConsumerConfig(configs); assertEquals(saslSslLowerCase, consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); } + +@Test +public void testDefaultConsumerGroupConfig() { +final Map configs = new HashMap<>(); +configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass); +configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass); +final ConsumerConfig consumerConfig = new ConsumerConfig(configs); +assertEquals("generic", consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG)); + assertNull(consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG)); +} + +@Test +public void testRemoteAssignorConfig() { +String remoteAssignorName = "org.apache.kafka.clients.group.someAssignor"; +String protocol = "consumer"; +final Map configs = new HashMap<>(); +configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass); +configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass); +configs.put(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG, remoteAssignorName); +configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, protocol); +final ConsumerConfig consumerConfig = new ConsumerConfig(configs); +assertEquals(protocol, consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG)); +assertEquals(remoteAssignorName, consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG)); +} + +@ParameterizedTest +@MethodSource("protocolNameSupplier") +public void testProtocolConfigValidation(String protocol, boolean isValid) { +final Map configs = new HashMap<>(); +configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass); +configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass); +configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, protocol); +try { +ConsumerConfig config = new ConsumerConfig(configs); +assertEquals(protocol, config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG)); +} catch (ConfigException e) { +if (isValid) +fail("Should not have thrown an exception"); Review Comment: I am not a fan of this because you don't know why it fails if it ever fails. At minimum, we should include the exception in the `fail` message. Otherwise, we could do it as follow: ``` if (isValid) { ConsumerConfig config = new ConsumerConfig(configs); assertEquals(protocol, config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG)); } else { assertThrows(ConfigException.class, () -> new ConsumerConfig(configs)); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
philipnee commented on PR #14642: URL: https://github.com/apache/kafka/pull/14642#issuecomment-1786388893 Hi @dajac - Thank you for the comments. I addressed them in the latest commits. Would you mind reviewing the changes again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
dajac commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1376703936 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT); +public static final String GROUP_PROTOCOL_DOC = "The group protocol consumer should use. We currently " + Review Comment: nit: Let's remove the double spaces after the dots. ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java: ## @@ -158,4 +158,27 @@ public void testCaseInsensitiveSecurityProtocol() { final ConsumerConfig consumerConfig = new ConsumerConfig(configs); assertEquals(saslSslLowerCase, consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); } + +@Test +public void testDefaultConsumerGroupConfig() { +final Map configs = new HashMap<>(); +configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass); +configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass); +final ConsumerConfig consumerConfig = new ConsumerConfig(configs); +assertEquals("generic", consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG)); +assertEquals(null, consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG)); +} + +@Test +public void testValidConsumerGroupConfig() { +String remoteAssignorName = "org.apache.kafka.clients.group.someAssignor"; +final Map configs = new HashMap<>(); +configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass); +configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass); +configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer"); +configs.put(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG, "org.apache.kafka.clients.group.someAssignor"); Review Comment: nit: I suppose that we could reuse `remoteAssignorName` here? ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java: ## @@ -158,4 +158,27 @@ public void testCaseInsensitiveSecurityProtocol() { final ConsumerConfig consumerConfig = new ConsumerConfig(configs); assertEquals(saslSslLowerCase, consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); } + +@Test +public void testDefaultConsumerGroupConfig() { +final Map configs = new HashMap<>(); +configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass); +configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass); +final ConsumerConfig consumerConfig = new ConsumerConfig(configs); +assertEquals("generic", consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG)); +assertEquals(null, consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG)); +} + +@Test +public void testValidConsumerGroupConfig() { +String remoteAssignorName = "org.apache.kafka.clients.group.someAssignor"; +final Map configs = new HashMap<>(); +configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass); +configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass); +configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer"); +configs.put(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG, "org.apache.kafka.clients.group.someAssignor"); +final ConsumerConfig consumerConfig = new ConsumerConfig(configs); +assertEquals("consumer", consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG)); +assertEquals(remoteAssignorName, consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG)); +} Review Comment: Should we add a test to ensure that `GROUP_PROTOCOL_CONFIG` can only accept `consumer` and `generic`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
philipnee commented on PR #14642: URL: https://github.com/apache/kafka/pull/14642#issuecomment-1783444304 @kirktrue @lucasbru @dajac - Thanks for taking time to review this PR. I've addressed the recent comments. Let me know if there's anything uncleared. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
philipnee commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1374998305 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig { DEFAULT_ALLOW_AUTO_CREATE_TOPICS, Importance.MEDIUM, ALLOW_AUTO_CREATE_TOPICS_DOC) +.define(GROUP_PROTOCOL_CONFIG, +Type.STRING, +DEFAULT_GROUP_PROTOCOL, +ConfigDef.CaseInsensitiveValidString + .in(Utils.enumOptions(GroupProtocol.class)), +Importance.HIGH, +GROUP_PROTOCOL_DOC) +.define(REMOTE_ASSIGNOR_CONFIG, +Type.STRING, +DEFAULT_REMOTE_ASSIGNOR, +Importance.MEDIUM, +REMOTE_ASSIGNOR_DOC) Review Comment: sorry - I thought you meant the whole block is off. corrected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
dajac commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1374964660 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,23 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = GroupProtocol.GENERIC.name().toLowerCase(); +public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol consumer should use. We currently " + +"support \"generic\" or \"consumer\". If \"consumer\" is specified, then the consumer group protocol will be " + +"used. Otherwise, the generic group protocol will be used."; + +/** +* group.remote.assignor +*/ +public static final String REMOTE_ASSIGNOR_CONFIG = "group.remote.assignor"; +public static final String DEFAULT_REMOTE_ASSIGNOR = null; +public static final String REMOTE_ASSIGNOR_DOC = "The server-side assignor to use. If no assignor is specified, " + Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
dajac commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1374964423 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,23 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = GroupProtocol.GENERIC.name().toLowerCase(); +public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol consumer should use. We currently " + Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
dajac commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1374963674 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig { DEFAULT_ALLOW_AUTO_CREATE_TOPICS, Importance.MEDIUM, ALLOW_AUTO_CREATE_TOPICS_DOC) +.define(GROUP_PROTOCOL_CONFIG, +Type.STRING, +DEFAULT_GROUP_PROTOCOL, +ConfigDef.CaseInsensitiveValidString + .in(Utils.enumOptions(GroupProtocol.class)), +Importance.HIGH, +GROUP_PROTOCOL_DOC) +.define(REMOTE_ASSIGNOR_CONFIG, +Type.STRING, +DEFAULT_REMOTE_ASSIGNOR, +Importance.MEDIUM, +REMOTE_ASSIGNOR_DOC) Review Comment: @philipnee It would be better to update the new code to follow the existing style. This is what we usually do… ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig { DEFAULT_ALLOW_AUTO_CREATE_TOPICS, Importance.MEDIUM, ALLOW_AUTO_CREATE_TOPICS_DOC) +.define(GROUP_PROTOCOL_CONFIG, +Type.STRING, +DEFAULT_GROUP_PROTOCOL, +ConfigDef.CaseInsensitiveValidString + .in(Utils.enumOptions(GroupProtocol.class)), +Importance.HIGH, +GROUP_PROTOCOL_DOC) +.define(REMOTE_ASSIGNOR_CONFIG, +Type.STRING, +DEFAULT_REMOTE_ASSIGNOR, +Importance.MEDIUM, +REMOTE_ASSIGNOR_DOC) Review Comment: @philipnee It would be better to update the new code to follow the existing style. This is what we usually do… -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
lucasbru commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1374443906 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,23 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = GroupProtocol.GENERIC.name().toLowerCase(); +public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol consumer should use. We currently " + Review Comment: Maybe we should stick with "group protocol" and not use the synonymous "rebalance protocol" here, although I know we use this interchangeably. Also, maybe that's just the non-native speaker in me, but for me the sentence would be easier to parse with an article for consumer. ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,23 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = GroupProtocol.GENERIC.name().toLowerCase(); +public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol consumer should use. We currently " + +"support \"generic\" or \"consumer\". If \"consumer\" is specified, then the consumer group protocol will be " + +"used. Otherwise, the generic group protocol will be used."; + +/** +* group.remote.assignor +*/ +public static final String REMOTE_ASSIGNOR_CONFIG = "group.remote.assignor"; +public static final String DEFAULT_REMOTE_ASSIGNOR = null; +public static final String REMOTE_ASSIGNOR_DOC = "The server-side assignor to use. If no assignor is specified, " + Review Comment: Should we specify that the settings only applies when group.protocol = consumer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
philipnee commented on PR #14642: URL: https://github.com/apache/kafka/pull/14642#issuecomment-1782236753 Hi @kirktrue Thanks for taking time reviewing my code. I made changes according to your suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
philipnee commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1374036176 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = "generic"; +public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol consumer should use. We currently " + +"support GENERIC or CONSUMER. If CONSUMER is specified, then the consumer group protocol will be used. " + +"Otherwise, the generic group protocol will be used."; + +/** +* group.remote.assignor +*/ +public static final String REMOTE_ASSIGNOR_CONFIG = "group.remote.assignor"; +public static final String DEFAULT_REMOTE_ASSIGNOR = null; +public static final String REMOTE_ASSIGNOR_DOC = "The server side assignor to use. It cannot be used in " + +"conjunction with group.local.assignor. The group coordinator will choose the assignor if no " + Review Comment: Good call. Yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
philipnee commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1374035895 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig { DEFAULT_ALLOW_AUTO_CREATE_TOPICS, Importance.MEDIUM, ALLOW_AUTO_CREATE_TOPICS_DOC) +.define(GROUP_PROTOCOL_CONFIG, +Type.STRING, +DEFAULT_GROUP_PROTOCOL, +ConfigDef.CaseInsensitiveValidString + .in(Utils.enumOptions(GroupProtocol.class)), Review Comment: 108 char width! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
philipnee commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1374035641 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig { DEFAULT_ALLOW_AUTO_CREATE_TOPICS, Importance.MEDIUM, ALLOW_AUTO_CREATE_TOPICS_DOC) +.define(GROUP_PROTOCOL_CONFIG, +Type.STRING, +DEFAULT_GROUP_PROTOCOL, +ConfigDef.CaseInsensitiveValidString + .in(Utils.enumOptions(GroupProtocol.class)), +Importance.HIGH, +GROUP_PROTOCOL_DOC) +.define(REMOTE_ASSIGNOR_CONFIG, +Type.STRING, +DEFAULT_REMOTE_ASSIGNOR, +Importance.MEDIUM, +REMOTE_ASSIGNOR_DOC) Review Comment: You are right! I kept it that way because I was trying to avoid editing a few hundred lines of code! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
kirktrue commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1373924481 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = "generic"; +public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol consumer should use. We currently " + +"support GENERIC or CONSUMER. If CONSUMER is specified, then the consumer group protocol will be used. " + Review Comment: I think we should use the lower-case versions of these strings: ```suggestion "support \"generic\" or \"consumer\". If \"consumer\" is specified, then the consumer group protocol will be used. " + ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = "generic"; +public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol consumer should use. We currently " + +"support GENERIC or CONSUMER. If CONSUMER is specified, then the consumer group protocol will be used. " + +"Otherwise, the generic group protocol will be used."; + +/** +* group.remote.assignor +*/ +public static final String REMOTE_ASSIGNOR_CONFIG = "group.remote.assignor"; +public static final String DEFAULT_REMOTE_ASSIGNOR = null; +public static final String REMOTE_ASSIGNOR_DOC = "The server side assignor to use. It cannot be used in " + +"conjunction with group.local.assignor. The group coordinator will choose the assignor if no " + Review Comment: Two questions: 1. Is `group.local.assignor` the same as `group.local.assignors`? 2. If `group.local.assignors` isn't in this PR, should we just omit that sentence? ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig { DEFAULT_ALLOW_AUTO_CREATE_TOPICS, Importance.MEDIUM, ALLOW_AUTO_CREATE_TOPICS_DOC) +.define(GROUP_PROTOCOL_CONFIG, +Type.STRING, +DEFAULT_GROUP_PROTOCOL, +ConfigDef.CaseInsensitiveValidString + .in(Utils.enumOptions(GroupProtocol.class)), Review Comment: You need a wider monitor, @philipnee 😛 ```suggestion ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(GroupProtocol.class)), ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig { DEFAULT_ALLOW_AUTO_CREATE_TOPICS, Importance.MEDIUM, ALLOW_AUTO_CREATE_TOPICS_DOC) +.define(GROUP_PROTOCOL_CONFIG, +Type.STRING, +DEFAULT_GROUP_PROTOCOL, +ConfigDef.CaseInsensitiveValidString + .in(Utils.enumOptions(GroupProtocol.class)), Review Comment: You need a wider monitor, @philipnee 😛 ```suggestion ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(GroupProtocol.class)), ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GRO
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
philipnee commented on PR #14642: URL: https://github.com/apache/kafka/pull/14642#issuecomment-1780387305 Apparently, it is not very easy to validate if the server-side assignor is used with the local assignor option instantiating the config. We should validate this when starting up the client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
philipnee opened a new pull request, #14642: URL: https://github.com/apache/kafka/pull/14642 Protocol Name | Type | Default | Description -- | -- | -- | -- group.protocol | enum | generic | A flag which indicates if the new protocol should be used or not. It could be: generic or consumer group.remote.assignor | string | null | The server side assignor to use. It cannot be used in conjunction with group.local.assignor. null means that the choice of the assignor is left to the group coordinator. group.local.assignors | list | empty | The list of client side (local) assignors as a list of full class names. It cannot be used in conjunction with group.remote.assignor. Three new configurations were added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org