This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 43350274e8c KAFKA-19156: Streamlined share group configs, with usage
in ShareSessionCache (#19505)
43350274e8c is described below
commit 43350274e8cc4522305a76e1099286d04d386517
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Wed Apr 23 15:22:02 2025 +0530
KAFKA-19156: Streamlined share group configs, with usage in
ShareSessionCache (#19505)
This PR removes the group.share.max.groups config. This config was used
to calculate the maximum size of share session cache. But with the new
config group.share.max.share.sessions in place with exactly this
purpose, the ShareSessionCache initialization has also been passed the
new config.
Refer: [KAFKA-19156](https://issues.apache.org/jira/browse/KAFKA-19156)
Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 4 +--
.../scala/unit/kafka/server/KafkaConfigTest.scala | 1 -
.../group/modern/share/ShareGroupConfig.java | 21 ++++++----------
.../kafka/coordinator/group/GroupConfigTest.java | 3 +--
.../group/modern/share/ShareGroupConfigTest.java | 29 ++++++----------------
5 files changed, 16 insertions(+), 42 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index cbd2a91fbc5..a2c6d4b98f5 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -426,9 +426,7 @@ class BrokerServer(
))
val fetchManager = new FetchManager(Time.SYSTEM, new
FetchSessionCache(fetchSessionCacheShards))
- val shareFetchSessionCache : ShareSessionCache = new ShareSessionCache(
- config.shareGroupConfig.shareGroupMaxGroups *
config.groupCoordinatorConfig.shareGroupMaxSize
- )
+ val shareFetchSessionCache : ShareSessionCache = new
ShareSessionCache(config.shareGroupConfig.shareGroupMaxShareSessions())
sharePartitionManager = new SharePartitionManager(
replicaManager,
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 74e34b06dac..517741cf2d8 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1026,7 +1026,6 @@ class KafkaConfigTest {
case ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
- case ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
case
ShareGroupConfig.SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
case ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
index 58df34c2679..d686ba1a342 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
@@ -32,7 +32,6 @@ import static
org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
-import static org.apache.kafka.common.config.ConfigDef.Type.SHORT;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
public class ShareGroupConfig {
@@ -51,10 +50,6 @@ public class ShareGroupConfig {
public static final int SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT = 5;
public static final String SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC = "The
maximum number of delivery attempts for a record delivered to a share group.";
- public static final String SHARE_GROUP_MAX_GROUPS_CONFIG =
"group.share.max.groups";
- public static final short SHARE_GROUP_MAX_GROUPS_DEFAULT = 10;
- public static final String SHARE_GROUP_MAX_GROUPS_DOC = "The maximum
number of share groups.";
-
public static final String SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG =
"group.share.record.lock.duration.ms";
public static final int SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT =
30000;
public static final String SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC = "The
record acquisition lock duration in milliseconds for share groups.";
@@ -86,7 +81,6 @@ public class ShareGroupConfig {
.define(SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, INT,
SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 3600000), MEDIUM,
SHARE_GROUP_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG, INT,
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DEFAULT, between(1000, 30000), MEDIUM,
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_DOC)
.define(SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG, INT,
SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT, between(30000, 3600000),
MEDIUM, SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DOC)
- .define(SHARE_GROUP_MAX_GROUPS_CONFIG, SHORT,
SHARE_GROUP_MAX_GROUPS_DEFAULT, between(1, 100), MEDIUM,
SHARE_GROUP_MAX_GROUPS_DOC)
.define(SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, INT,
SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT, between(100, 10000), MEDIUM,
SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC)
.define(SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG, INT,
SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DEFAULT, MEDIUM,
SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_DOC)
.define(SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, INT,
SHARE_GROUP_MAX_SHARE_SESSIONS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_MAX_SHARE_SESSIONS_DOC)
@@ -95,17 +89,18 @@ public class ShareGroupConfig {
private final boolean isShareGroupEnabled;
private final int shareGroupPartitionMaxRecordLocks;
private final int shareGroupDeliveryCountLimit;
- private final short shareGroupMaxGroups;
private final int shareGroupRecordLockDurationMs;
private final int shareGroupMaxRecordLockDurationMs;
private final int shareGroupMinRecordLockDurationMs;
private final int shareFetchPurgatoryPurgeIntervalRequests;
private final int shareGroupMaxShareSessions;
private final String shareGroupPersisterClassName;
+ private final AbstractConfig config;
public ShareGroupConfig(AbstractConfig config) {
- // Share groups are enabled in two cases:
- // 1. The internal configuration to enable it is explicitly set
+ this.config = config;
+ // Share groups are enabled in either of the two following cases:
+ // 1. The internal configuration to enable it is explicitly set; or
// 2. the share rebalance protocol is enabled.
Set<String> protocols =
config.getList(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG)
.stream().map(String::toUpperCase).collect(Collectors.toSet());
@@ -113,7 +108,6 @@ public class ShareGroupConfig {
protocols.contains(GroupType.SHARE.name());
shareGroupPartitionMaxRecordLocks =
config.getInt(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG);
shareGroupDeliveryCountLimit =
config.getInt(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG);
- shareGroupMaxGroups =
config.getShort(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG);
shareGroupRecordLockDurationMs =
config.getInt(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG);
shareGroupMaxRecordLockDurationMs =
config.getInt(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG);
shareGroupMinRecordLockDurationMs =
config.getInt(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG);
@@ -136,10 +130,6 @@ public class ShareGroupConfig {
return shareGroupDeliveryCountLimit;
}
- public short shareGroupMaxGroups() {
- return shareGroupMaxGroups;
- }
-
public int shareGroupRecordLockDurationMs() {
return shareGroupRecordLockDurationMs;
}
@@ -171,6 +161,9 @@ public class ShareGroupConfig {
Utils.require(shareGroupMaxRecordLockDurationMs >=
shareGroupRecordLockDurationMs,
String.format("%s must be greater than or equal to %s",
SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG,
SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG));
+ Utils.require(shareGroupMaxShareSessions >=
config.getInt(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG),
+ String.format("%s must be greater than or equal to %s",
+ SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG,
GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG));
}
/**
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 50205b0a9e9..77014de5bf1 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -41,7 +41,6 @@ public class GroupConfigTest {
private static final boolean SHARE_GROUP_ENABLE = true;
private static final int SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS = 200;
private static final int SHARE_GROUP_DELIVERY_COUNT_LIMIT = 5;
- private static final short SHARE_GROUP_MAX_GROUPS = 10;
private static final int SHARE_GROUP_RECORD_LOCK_DURATION_MS = 30000;
private static final int SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS = 15000;
private static final int SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS = 60000;
@@ -284,6 +283,6 @@ public class GroupConfigTest {
private ShareGroupConfig createShareGroupConfig() {
return ShareGroupConfigTest.createShareGroupConfig(SHARE_GROUP_ENABLE,
SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS, SHARE_GROUP_DELIVERY_COUNT_LIMIT,
- SHARE_GROUP_MAX_GROUPS, SHARE_GROUP_RECORD_LOCK_DURATION_MS,
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS,
SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS);
+ SHARE_GROUP_RECORD_LOCK_DURATION_MS,
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS,
SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS);
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
index 0a2f57b6ff6..698dd59380c 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
@@ -39,7 +39,6 @@ public class ShareGroupConfigTest {
configs.put(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG, true);
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG,
200);
configs.put(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG,
5);
- configs.put(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG, (short)
10);
configs.put(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG, 30000);
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG,
15000);
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG,
60000);
@@ -51,7 +50,6 @@ public class ShareGroupConfigTest {
assertTrue(config.isShareGroupEnabled());
assertEquals(200, config.shareGroupPartitionMaxRecordLocks());
assertEquals(5, config.shareGroupDeliveryCountLimit());
- assertEquals(10, config.shareGroupMaxGroups());
assertEquals(30000, config.shareGroupRecordLockDurationMs());
assertEquals(15000, config.shareGroupMinRecordLockDurationMs());
assertEquals(60000, config.shareGroupMaxRecordLockDurationMs());
@@ -89,24 +87,6 @@ public class ShareGroupConfigTest {
assertEquals("Invalid value 11 for configuration
group.share.delivery.count.limit: Value must be no more than 10",
assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
- configs.clear();
- // test for when SHARE_GROUP_MAX_GROUPS_CONFIG is of incorrect data
type
- configs.put(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG, 10);
- assertEquals("Invalid value 10 for configuration
group.share.max.groups: Expected value to be a 16-bit integer (short), but it
was a java.lang.Integer",
- assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
-
- configs.clear();
- // test for when SHARE_GROUP_MAX_GROUPS_CONFIG is out of bounds
- configs.put(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG, (short) 0);
- assertEquals("Invalid value 0 for configuration
group.share.max.groups: Value must be at least 1",
- assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
-
- configs.clear();
- // test for when SHARE_GROUP_MAX_GROUPS_CONFIG is out of bounds
- configs.put(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG, (short)
110);
- assertEquals("Invalid value 110 for configuration
group.share.max.groups: Value must be no more than 100",
- assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
-
configs.clear();
// test for when SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG is out
of bounds
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, 50);
@@ -118,13 +98,19 @@ public class ShareGroupConfigTest {
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG,
20000);
assertEquals("Invalid value 20000 for configuration
group.share.partition.max.record.locks: Value must be no more than 10000",
assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+ configs.clear();
+ // test for when SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG is less than
SHARE_GROUP_MAX_SIZE_CONFIG
+ configs.put(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG, 2000);
+ configs.put(ShareGroupConfig.SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG,
1000);
+ assertEquals("Invalid value 2000 for configuration
group.share.max.size: Value must be no more than 1000",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
}
public static ShareGroupConfig createShareGroupConfig(
boolean shareGroupEnable,
int shareGroupPartitionMaxRecordLocks,
int shareGroupDeliveryCountLimit,
- short shareGroupsMaxGroups,
int shareGroupRecordLockDurationsMs,
int shareGroupMinRecordLockDurationMs,
int shareGroupMaxRecordLockDurationMs
@@ -133,7 +119,6 @@ public class ShareGroupConfigTest {
configs.put(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG,
shareGroupEnable);
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG,
shareGroupPartitionMaxRecordLocks);
configs.put(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG,
shareGroupDeliveryCountLimit);
- configs.put(ShareGroupConfig.SHARE_GROUP_MAX_GROUPS_CONFIG,
shareGroupsMaxGroups);
configs.put(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG,
shareGroupRecordLockDurationsMs);
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG,
shareGroupMinRecordLockDurationMs);
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG,
shareGroupMaxRecordLockDurationMs);