lianetm commented on code in PR #16899:
URL: https://github.com/apache/kafka/pull/16899#discussion_r1852903322
##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -379,7 +378,29 @@ public class ConsumerConfig extends AbstractConfig {
private static final String SECURITY_PROVIDERS_DOC =
SecurityConfig.SECURITY_PROVIDERS_DOC;
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new
AtomicInteger(1);
+ private static final List<Class<? extends AbstractPartitionAssignor>>
PARTITION_ASSIGNOR_DEFAULT_VALUE =
+ List.of(RangeAssignor.class, CooperativeStickyAssignor.class);
+ /**
+ * A list of configuration keys for CLASSIC protocol not supported. we
should check the input string and clean up the
Review Comment:
```suggestion
* A list of configuration keys not supported for CLASSIC protocol.
```
I'm also suggesting we remove the second sentence about "clean up default
values" because I cannot find that we clean up values for unsupported configs,
I only see we throw ConfigException. Am I missing something?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##########
@@ -237,4 +237,22 @@ public void testProtocolConfigValidation(String protocol,
boolean isValid) {
assertThrows(ConfigException.class, () -> new
ConsumerConfig(configs));
}
}
+
+ @Test
+ public void testUnsupportedConfigsWithConsumerGroupProtocol() {
+
testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"RoundRobinAssignor");
+
testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
1000);
+
testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
30000);
+ }
+
+ private void testUnsupportedConfigsWithConsumerGroupProtocol(String
configName, Object value) {
+ final Map<String, Object> configs = new HashMap<>();
+ configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializerClass);
+ configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializerClass);
+ configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name());
+ configs.put(configName, value);
+ ConfigException exception = assertThrows(ConfigException.class, () ->
new ConsumerConfig(configs));
+ assertTrue(exception.getMessage().contains(configName +
Review Comment:
can't we `assertEquals` here ?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -718,9 +740,14 @@ private void maybeOverrideEnableAutoCommit(Map<String,
Object> configs) {
}
}
- private void checkGroupRemoteAssignor() {
- if
(getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.CLASSIC.name())
&& getString(GROUP_REMOTE_ASSIGNOR_CONFIG) != null &&
!getString(GROUP_REMOTE_ASSIGNOR_CONFIG).isEmpty()) {
- throw new ConfigException(GROUP_REMOTE_ASSIGNOR_CONFIG + " cannot
be set when " + GROUP_PROTOCOL_CONFIG + "=" + GroupProtocol.CLASSIC.name());
+ private void checkUnsupportedConfigs(GroupProtocol groupProtocol,
List<String> unsupportedConfigs) {
+ if
(getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(groupProtocol.name())) {
+ unsupportedConfigs.forEach(configName -> {
+ Object config = originals().get(configName);
+ if (config != null && !Utils.isBlank(config.toString())) {
+ throw new ConfigException(configName + " cannot be set
when " + GROUP_PROTOCOL_CONFIG + "=" + groupProtocol.name());
Review Comment:
would it be a better experience for the user if we gather all the
unsupported configs used first, and then throw an error with all of them? I
expect it will be helpful when users upgrade to the new protocol, and probably
leave the existing config (with several unsupported).
##########
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##########
@@ -237,4 +237,22 @@ public void testProtocolConfigValidation(String protocol,
boolean isValid) {
assertThrows(ConfigException.class, () -> new
ConsumerConfig(configs));
}
}
+
+ @Test
+ public void testUnsupportedConfigsWithConsumerGroupProtocol() {
+
testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"RoundRobinAssignor");
+
testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
1000);
+
testUnsupportedConfigsWithConsumerGroupProtocol(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
30000);
+ }
+
+ private void testUnsupportedConfigsWithConsumerGroupProtocol(String
configName, Object value) {
+ final Map<String, Object> configs = new HashMap<>();
+ configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializerClass);
+ configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializerClass);
+ configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name());
+ configs.put(configName, value);
Review Comment:
Map.of?
##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##########
@@ -337,7 +335,6 @@ private Map<String, Object> composeConfigs(ClusterInstance
cluster, String group
configs.put(KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
configs.put(VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol);
- configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
RangeAssignor.class.getName());
Review Comment:
just to double check, even though we're removing this prop, I expect the
test will still use the range assignor if under classic protocol just because
range is the first assignor in the default value of the config. Correct?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -413,7 +434,7 @@ public class ConsumerConfig extends AbstractConfig {
HEARTBEAT_INTERVAL_MS_DOC)
.define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
Type.LIST,
- Arrays.asList(RangeAssignor.class,
CooperativeStickyAssignor.class),
+ PARTITION_ASSIGNOR_DEFAULT_VALUE,
Review Comment:
This constant was introduced to be reused and that made sense, but seems
it's only used here now right? If so I think keeping the default explicitly
here as it was is more convenient, it just makes it easier to discover (I
personally navigate to this definition a lot looking for defaults :))
##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -379,7 +378,29 @@ public class ConsumerConfig extends AbstractConfig {
private static final String SECURITY_PROVIDERS_DOC =
SecurityConfig.SECURITY_PROVIDERS_DOC;
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new
AtomicInteger(1);
+ private static final List<Class<? extends AbstractPartitionAssignor>>
PARTITION_ASSIGNOR_DEFAULT_VALUE =
+ List.of(RangeAssignor.class, CooperativeStickyAssignor.class);
+ /**
+ * A list of configuration keys for CLASSIC protocol not supported. we
should check the input string and clean up the
+ * default value.
+ */
+ private static final List<String> CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS =
Collections.singletonList(
+ GROUP_REMOTE_ASSIGNOR_CONFIG
+ );
+
+ /**
+ * A list of configuration keys for consumer protocol not supported. we
should check the input string and clean up the
+ * default value.
+ */
+ private static final List<String> CONSUMER_PROTOCOL_UNSUPPORTED_CONFIGS =
List.of(
+ PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+ HEARTBEAT_INTERVAL_MS_CONFIG,
+ SESSION_TIMEOUT_MS_CONFIG,
+ "group.max.session.timeout.ms",
+ "group.mix.session.timeout.ms"
Review Comment:
well these configs are used by the broker only, not the client, so I would
say we don't deal with them here and let the broker drive it?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -379,7 +378,29 @@ public class ConsumerConfig extends AbstractConfig {
private static final String SECURITY_PROVIDERS_DOC =
SecurityConfig.SECURITY_PROVIDERS_DOC;
private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new
AtomicInteger(1);
+ private static final List<Class<? extends AbstractPartitionAssignor>>
PARTITION_ASSIGNOR_DEFAULT_VALUE =
+ List.of(RangeAssignor.class, CooperativeStickyAssignor.class);
+ /**
+ * A list of configuration keys for CLASSIC protocol not supported. we
should check the input string and clean up the
+ * default value.
+ */
+ private static final List<String> CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS =
Collections.singletonList(
+ GROUP_REMOTE_ASSIGNOR_CONFIG
+ );
+
+ /**
+ * A list of configuration keys for consumer protocol not supported. we
should check the input string and clean up the
Review Comment:
```suggestion
* A list of configuration keys not supported for CONSUMER protocol.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]