mjsax commented on code in PR #21737:
URL: https://github.com/apache/kafka/pull/21737#discussion_r2928155242
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java:
##########
@@ -323,18 +323,215 @@ public void testInvalidConfigs() {
configs.put(GroupCoordinatorConfig.SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG,
1000);
assertEquals(5000,
createConfig(configs).shareGroupInitializeRetryIntervalMs());
+
+ // group.streams.session.timeout.ms
+
+ // must be positive
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 0);
+ assertEquals("Invalid value 0 for configuration
group.streams.session.timeout.ms: Value must be at least 1",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+ // cannot be smaller than MIN
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT - 1);
+ assertEquals("group.streams.session.timeout.ms must be greater than or
equal to group.streams.min.session.timeout.ms",
+ assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+ // can be MIN
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT);
+ createConfig(configs);
+
+ // can be MAX
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT);
+ createConfig(configs);
+
+ // cannot be larger than MAX
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT + 1);
+ assertEquals("group.streams.session.timeout.ms must be less than or
equal to group.streams.max.session.timeout.ms",
+ assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+
+ // group.streams.min.session.timeout.ms
+
+ // must be positive
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
0);
+ assertEquals("Invalid value 0 for configuration
group.streams.min.session.timeout.ms: Value must be at least 1",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+ // can be MAX (implies `MAX can be MIN`)
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT); //
required
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT); // when
+ createConfig(configs);
+
+ // cannot be larger than MAX (implies `MAX cannot be smaller than MIN`)
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT + 1); //
required
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT + 1); //
when
+ assertEquals("group.streams.max.session.timeout.ms must be greater
than or equal to group.streams.min.session.timeout.ms",
+ assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+ // other case for `streams.group.mix.sessoion.timeout.ms` are covered
in section `session.timeout.ms` above
+
+
+ // group.streams.max.session.timeout.ms
+
+ // must be positive
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG,
0);
+ assertEquals("Invalid value 0 for configuration
group.streams.max.session.timeout.ms: Value must be at least 1",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+ // other case for `streams.group.max.session.timeout.ms` are covered
in sections `session.timeout.ms` and `streams.group.min.session.timeout.ms`
above
+
+
+ // group.streams.heartbeat.interval.ms
+
+ // must be positive
configs.clear();
-
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
45000);
-
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
60000);
-
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
50000);
-
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
50000);
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
0);
+ assertEquals("Invalid value 0 for configuration
group.streams.heartbeat.interval.ms: Value must be at least 1",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+ // cannot be smaller than MIN
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT - 1);
+ assertEquals("group.streams.heartbeat.interval.ms must be greater than
or equal to group.streams.min.heartbeat.interval.ms",
+ assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+ // can be MIN
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT);
+ createConfig(configs);
+
+ // can be MAX
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT);
+ createConfig(configs);
+
+ // cannot be larger than MAX
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT + 1);
+ assertEquals("group.streams.heartbeat.interval.ms must be less than or
equal to group.streams.max.heartbeat.interval.ms",
+ assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+ // can be smaller than session timeout
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT - 1); //
required
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT - 1);
+ createConfig(configs);
+
+ // cannot be same than session timeout
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT); // required
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT);
assertEquals("group.streams.heartbeat.interval.ms must be less than
group.streams.session.timeout.ms",
assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+ // group.streams.min.heartbeat.interval.ms
+
+ // must be positive
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
0);
+ assertEquals("Invalid value 0 for configuration
group.streams.min.heartbeat.interval.ms: Value must be at least 1",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+ // can be MAX (implies `MAX can be MIN`)
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT); //
required
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT); //
when
+ createConfig(configs);
+
+ // cannot be larger than MAX (implies `MAX cannot be smaller than MIN`)
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT + 1); //
required
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT + 1);
// when
+ assertEquals("group.streams.max.heartbeat.interval.ms must be greater
than or equal to group.streams.min.heartbeat.interval.ms",
+ assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+ // other case for `streams.group.mix.heartbeat.interval.ms` covered in
`session.timeout.ms` section
+
+
+ // group.streams.max.heartbeat.interval.ms
+
+ // must be positive
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
0);
+ assertEquals("Invalid value 0 for configuration
group.streams.max.heartbeat.interval.ms: Value must be at least 1",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+ // other case for `streams.group.max.heartbeat.interval.ms` covered in
`session.timeout.ms` and `streams.group.mix.heartbeat.interval.ms` section
+
+
+ // group.streams.max.size
+
+ // must be positive
+ configs.clear();
+ configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG, 0);
+ assertEquals("Invalid value 0 for configuration
group.streams.max.size: Value must be at least 1",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+
+ // group.streams.num.standby.replicas
+
+ // cannot be negative
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG,
-1);
+ assertEquals("Invalid value -1 for configuration
group.streams.num.standby.replicas: Value must be at least 0",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+ // can be MAX
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT);
+ createConfig(configs);
+
+ // cannot be larger than MAX
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT + 1);
+ assertEquals("group.streams.num.standby.replicas must be less than or
equal to group.streams.max.standby.replicas",
+ assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+
+ // group.streams.max.num.standby.replicas
+
+ // cannot be negative
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG,
-1);
+ assertEquals("Invalid value -1 for configuration
group.streams.max.standby.replicas: Value must be at least 0",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+
+ // group.streams.initial.rebalance.delay.ms
+
+ // cannot be negative
configs.clear();
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
-1);
assertEquals("Invalid value -1 for configuration
group.streams.initial.rebalance.delay.ms: Value must be at least 0",
assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+ // group.streams.task.offset.interval.ms
Review Comment:
Test for new config
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]