Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-17 Thread via GitHub


chia7712 merged PR #15684:
URL: https://github.com/apache/kafka/pull/15684


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on PR #15684:
URL: https://github.com/apache/kafka/pull/15684#issuecomment-2058593968

   @OmniaGM Could you please fix those conflicts? thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-11 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1561280844


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -902,40 +824,40 @@ object KafkaConfig {
   .define(ControlledShutdownEnableProp, BOOLEAN, 
Defaults.CONTROLLED_SHUTDOWN_ENABLE, MEDIUM, ControlledShutdownEnableDoc)
 
   /** * Group coordinator configuration ***/
-  .define(GroupMinSessionTimeoutMsProp, INT, 
Defaults.GROUP_MIN_SESSION_TIMEOUT_MS, MEDIUM, GroupMinSessionTimeoutMsDoc)
-  .define(GroupMaxSessionTimeoutMsProp, INT, 
Defaults.GROUP_MAX_SESSION_TIMEOUT_MS, MEDIUM, GroupMaxSessionTimeoutMsDoc)
-  .define(GroupInitialRebalanceDelayMsProp, INT, 
Defaults.GROUP_INITIAL_REBALANCE_DELAY_MS, MEDIUM, 
GroupInitialRebalanceDelayMsDoc)
-  .define(GroupMaxSizeProp, INT, Defaults.GROUP_MAX_SIZE, atLeast(1), 
MEDIUM, GroupMaxSizeDoc)
+  .define(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, 
GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, 
GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
+  .define(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, 
GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, 
GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
+  .define(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
INT, GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, MEDIUM, 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_DOC)
+  .define(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG, INT, 
GroupCoordinatorConfig.GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, 
GroupCoordinatorConfig.GROUP_MAX_SIZE_DOC)
 
   /** New group coordinator configs */
-  .define(GroupCoordinatorRebalanceProtocolsProp, LIST, 
Defaults.GROUP_COORDINATOR_REBALANCE_PROTOCOLS,
-ConfigDef.ValidList.in(Utils.enumOptions(classOf[GroupType]):_*), 
MEDIUM, GroupCoordinatorRebalanceProtocolsDoc)
-  .define(GroupCoordinatorNumThreadsProp, INT, 
Defaults.GROUP_COORDINATOR_NUM_THREADS, atLeast(1), MEDIUM, 
GroupCoordinatorNumThreadsDoc)
+  
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, 
LIST, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
+ConfigDef.ValidList.in(Utils.enumOptions(classOf[GroupType]):_*), 
MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
+  .define(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 
INT, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), 
MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DOC)
   // Internal configuration used by integration and system tests.
-  .defineInternal(NewGroupCoordinatorEnableProp, BOOLEAN, 
Defaults.NEW_GROUP_COORDINATOR_ENABLE, null, MEDIUM, 
NewGroupCoordinatorEnableDoc)
+  
.defineInternal(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
BOOLEAN, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, 
MEDIUM, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DOC)
 
   /** Consumer groups configs */
-  .define(ConsumerGroupSessionTimeoutMsProp, INT, 
Defaults.CONSUMER_GROUP_SESSION_TIMEOUT_MS, atLeast(1), MEDIUM, 
ConsumerGroupSessionTimeoutMsDoc)
-  .define(ConsumerGroupMinSessionTimeoutMsProp, INT, 
Defaults.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS, atLeast(1), MEDIUM, 
ConsumerGroupMinSessionTimeoutMsDoc)
-  .define(ConsumerGroupMaxSessionTimeoutMsProp, INT, 
Defaults.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS, atLeast(1), MEDIUM, 
ConsumerGroupMaxSessionTimeoutMsDoc)
-  .define(ConsumerGroupHeartbeatIntervalMsProp, INT, 
Defaults.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS, atLeast(1), MEDIUM, 
ConsumerGroupHeartbeatIntervalMsDoc)
-  .define(ConsumerGroupMinHeartbeatIntervalMsProp, INT, 
Defaults.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS, atLeast(1), MEDIUM, 
ConsumerGroupMinHeartbeatIntervalMsDoc)
-  .define(ConsumerGroupMaxHeartbeatIntervalMsProp, INT, 
Defaults.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS, atLeast(1), MEDIUM, 
ConsumerGroupMaxHeartbeatIntervalMsDoc)
-  .define(ConsumerGroupMaxSizeProp, INT, Defaults.CONSUMER_GROUP_MAX_SIZE, 
atLeast(1), MEDIUM, ConsumerGroupMaxSizeDoc)
-  .define(ConsumerGroupAssignorsProp, LIST, 
Defaults.CONSUMER_GROUP_ASSIGNORS, null, MEDIUM, ConsumerGroupAssignorsDoc)
-  .defineInternal(ConsumerGroupMigrationPolicyProp, STRING, 
Defaults.CONSUMER_GROUP_MIGRATION_POLICY, 
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(classOf[ConsumerGroupMigrationPolicy]):
 _*), MEDIUM, ConsumerGroupMigrationPolicyDoc)
+  .define(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
INT, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, 
atLeast(1), MEDIUM, 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
+  
.define(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_

Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-11 Thread via GitHub


chia7712 commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1561070546


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -902,40 +824,40 @@ object KafkaConfig {
   .define(ControlledShutdownEnableProp, BOOLEAN, 
Defaults.CONTROLLED_SHUTDOWN_ENABLE, MEDIUM, ControlledShutdownEnableDoc)
 
   /** * Group coordinator configuration ***/
-  .define(GroupMinSessionTimeoutMsProp, INT, 
Defaults.GROUP_MIN_SESSION_TIMEOUT_MS, MEDIUM, GroupMinSessionTimeoutMsDoc)
-  .define(GroupMaxSessionTimeoutMsProp, INT, 
Defaults.GROUP_MAX_SESSION_TIMEOUT_MS, MEDIUM, GroupMaxSessionTimeoutMsDoc)
-  .define(GroupInitialRebalanceDelayMsProp, INT, 
Defaults.GROUP_INITIAL_REBALANCE_DELAY_MS, MEDIUM, 
GroupInitialRebalanceDelayMsDoc)
-  .define(GroupMaxSizeProp, INT, Defaults.GROUP_MAX_SIZE, atLeast(1), 
MEDIUM, GroupMaxSizeDoc)
+  .define(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, 
GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, 
GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
+  .define(GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, 
GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, 
GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
+  .define(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
INT, GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, MEDIUM, 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_DOC)
+  .define(GroupCoordinatorConfig.GROUP_MAX_SIZE_CONFIG, INT, 
GroupCoordinatorConfig.GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, 
GroupCoordinatorConfig.GROUP_MAX_SIZE_DOC)
 
   /** New group coordinator configs */
-  .define(GroupCoordinatorRebalanceProtocolsProp, LIST, 
Defaults.GROUP_COORDINATOR_REBALANCE_PROTOCOLS,
-ConfigDef.ValidList.in(Utils.enumOptions(classOf[GroupType]):_*), 
MEDIUM, GroupCoordinatorRebalanceProtocolsDoc)
-  .define(GroupCoordinatorNumThreadsProp, INT, 
Defaults.GROUP_COORDINATOR_NUM_THREADS, atLeast(1), MEDIUM, 
GroupCoordinatorNumThreadsDoc)
+  
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, 
LIST, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
+ConfigDef.ValidList.in(Utils.enumOptions(classOf[GroupType]):_*), 
MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
+  .define(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 
INT, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), 
MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DOC)
   // Internal configuration used by integration and system tests.
-  .defineInternal(NewGroupCoordinatorEnableProp, BOOLEAN, 
Defaults.NEW_GROUP_COORDINATOR_ENABLE, null, MEDIUM, 
NewGroupCoordinatorEnableDoc)
+  
.defineInternal(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
BOOLEAN, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, 
MEDIUM, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DOC)
 
   /** Consumer groups configs */
-  .define(ConsumerGroupSessionTimeoutMsProp, INT, 
Defaults.CONSUMER_GROUP_SESSION_TIMEOUT_MS, atLeast(1), MEDIUM, 
ConsumerGroupSessionTimeoutMsDoc)
-  .define(ConsumerGroupMinSessionTimeoutMsProp, INT, 
Defaults.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS, atLeast(1), MEDIUM, 
ConsumerGroupMinSessionTimeoutMsDoc)
-  .define(ConsumerGroupMaxSessionTimeoutMsProp, INT, 
Defaults.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS, atLeast(1), MEDIUM, 
ConsumerGroupMaxSessionTimeoutMsDoc)
-  .define(ConsumerGroupHeartbeatIntervalMsProp, INT, 
Defaults.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS, atLeast(1), MEDIUM, 
ConsumerGroupHeartbeatIntervalMsDoc)
-  .define(ConsumerGroupMinHeartbeatIntervalMsProp, INT, 
Defaults.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS, atLeast(1), MEDIUM, 
ConsumerGroupMinHeartbeatIntervalMsDoc)
-  .define(ConsumerGroupMaxHeartbeatIntervalMsProp, INT, 
Defaults.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS, atLeast(1), MEDIUM, 
ConsumerGroupMaxHeartbeatIntervalMsDoc)
-  .define(ConsumerGroupMaxSizeProp, INT, Defaults.CONSUMER_GROUP_MAX_SIZE, 
atLeast(1), MEDIUM, ConsumerGroupMaxSizeDoc)
-  .define(ConsumerGroupAssignorsProp, LIST, 
Defaults.CONSUMER_GROUP_ASSIGNORS, null, MEDIUM, ConsumerGroupAssignorsDoc)
-  .defineInternal(ConsumerGroupMigrationPolicyProp, STRING, 
Defaults.CONSUMER_GROUP_MIGRATION_POLICY, 
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(classOf[ConsumerGroupMigrationPolicy]):
 _*), MEDIUM, ConsumerGroupMigrationPolicyDoc)
+  .define(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
INT, GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, 
atLeast(1), MEDIUM, 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
+  
.define(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS

Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-11 Thread via GitHub


chia7712 commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1561014883


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##
@@ -16,14 +16,132 @@
  */
 package org.apache.kafka.coordinator.group;
 
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 /**
  * The group coordinator configurations.
  */
 public class GroupCoordinatorConfig {
+/** * Group coordinator configuration ***/
+public final static String GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = 
"group.min.session.timeout.ms";
+public final static String GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The minimum 
allowed session timeout for registered consumers. Shorter timeouts result in 
quicker failure detection at the cost of more frequent consumer heartbeating, 
which can overwhelm broker resources.";
+public static final int GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 6000;
+
+public final static String GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = 
"group.max.session.timeout.ms";
+public final static String GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The maximum 
allowed session timeout for registered consumers. Longer timeouts give 
consumers more time to process messages in between heartbeats at the cost of a 
longer time to detect failures.";
+public static final int GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 180;
+
+public final static String GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG = 
"group.initial.rebalance.delay.ms";
+public final static String GROUP_INITIAL_REBALANCE_DELAY_MS_DOC = "The 
amount of time the group coordinator will wait for more consumers to join a new 
group before performing the first rebalance. A longer delay means potentially 
fewer rebalances, but increases the time until processing begins.";
+public static final int GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT = 3000;
+
+public final static String GROUP_MAX_SIZE_CONFIG = "group.max.size";
+public final static String GROUP_MAX_SIZE_DOC = "The maximum number of 
consumers that a single consumer group can accommodate.";
+public static final int GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
+
+/** New group coordinator configs */
+public final static String NEW_GROUP_COORDINATOR_ENABLE_CONFIG = 
"group.coordinator.new.enable";
+public final static String NEW_GROUP_COORDINATOR_ENABLE_DOC = "Enable the 
new group coordinator.";
+public static final boolean NEW_GROUP_COORDINATOR_ENABLE_DEFAULT = false;
+
+public final static String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = 
"group.coordinator.rebalance.protocols";
+public final static String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = 
"The list of enabled rebalance protocols. Supported protocols: " + 
Utils.join(Group.GroupType.values(), ",") + ". " +
+"The " + Group.GroupType.CONSUMER + " rebalance protocol is in 
early access and therefore must not be used in production.";
+public static final List 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = 
Collections.singletonList(Group.GroupType.CLASSIC.toString());
+
+public final static String GROUP_COORDINATOR_NUM_THREADS_CONFIG = 
"group.coordinator.threads";
+public final static String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number 
of threads used by the group coordinator.";
+public static final int GROUP_COORDINATOR_NUM_THREADS_DEFAULT = 1;
+
+/** Consumer group configs */
+public final static String CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG = 
"group.consumer.session.timeout.ms";
+public final static String CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC = "The 
timeout to detect client failures when using the consumer group protocol.";
+public static final int CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000;
+
+public final static String CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = 
"group.consumer.min.session.timeout.ms";
+public final static String CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC = 
"The minimum allowed session timeout for registered consumers.";
+public static final int CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 
45000;
+
+public final static String CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = 
"group.consumer.max.session.timeout.ms";
+public final static String CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DOC = 
"The maximum allowed session timeout for registered consumers.";
+public static final int CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 
6;
+
+public final static String CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG = 
"group.consumer.heartbeat.interval.ms";
+public final static

Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-11 Thread via GitHub


chia7712 commented on PR #15684:
URL: https://github.com/apache/kafka/pull/15684#issuecomment-2049399165

   @OmniaGM Could you fix the conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-11 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1560715033


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##
@@ -16,14 +16,132 @@
  */
 package org.apache.kafka.coordinator.group;
 
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 /**
  * The group coordinator configurations.
  */
 public class GroupCoordinatorConfig {
+/** * Group coordinator configuration ***/
+public final static String GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = 
"group.min.session.timeout.ms";
+public final static String GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The minimum 
allowed session timeout for registered consumers. Shorter timeouts result in 
quicker failure detection at the cost of more frequent consumer heartbeating, 
which can overwhelm broker resources.";
+public static final int GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 6000;
+
+public final static String GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = 
"group.max.session.timeout.ms";
+public final static String GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The maximum 
allowed session timeout for registered consumers. Longer timeouts give 
consumers more time to process messages in between heartbeats at the cost of a 
longer time to detect failures.";
+public static final int GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 180;
+
+public final static String GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG = 
"group.initial.rebalance.delay.ms";
+public final static String GROUP_INITIAL_REBALANCE_DELAY_MS_DOC = "The 
amount of time the group coordinator will wait for more consumers to join a new 
group before performing the first rebalance. A longer delay means potentially 
fewer rebalances, but increases the time until processing begins.";
+public static final int GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT = 3000;
+
+public final static String GROUP_MAX_SIZE_CONFIG = "group.max.size";
+public final static String GROUP_MAX_SIZE_DOC = "The maximum number of 
consumers that a single consumer group can accommodate.";
+public static final int GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
+
+/** New group coordinator configs */
+public final static String NEW_GROUP_COORDINATOR_ENABLE_CONFIG = 
"group.coordinator.new.enable";
+public final static String NEW_GROUP_COORDINATOR_ENABLE_DOC = "Enable the 
new group coordinator.";
+public static final boolean NEW_GROUP_COORDINATOR_ENABLE_DEFAULT = false;
+
+public final static String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = 
"group.coordinator.rebalance.protocols";
+public final static String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = 
"The list of enabled rebalance protocols. Supported protocols: " + 
Utils.join(Group.GroupType.values(), ",") + ". " +
+"The " + Group.GroupType.CONSUMER + " rebalance protocol is in 
early access and therefore must not be used in production.";
+public static final List 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = 
Collections.singletonList(Group.GroupType.CLASSIC.toString());
+
+public final static String GROUP_COORDINATOR_NUM_THREADS_CONFIG = 
"group.coordinator.threads";
+public final static String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number 
of threads used by the group coordinator.";
+public static final int GROUP_COORDINATOR_NUM_THREADS_DEFAULT = 1;
+
+/** Consumer group configs */
+public final static String CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG = 
"group.consumer.session.timeout.ms";
+public final static String CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC = "The 
timeout to detect client failures when using the consumer group protocol.";
+public static final int CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000;
+
+public final static String CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = 
"group.consumer.min.session.timeout.ms";
+public final static String CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC = 
"The minimum allowed session timeout for registered consumers.";
+public static final int CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 
45000;
+
+public final static String CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = 
"group.consumer.max.session.timeout.ms";
+public final static String CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DOC = 
"The maximum allowed session timeout for registered consumers.";
+public static final int CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 
6;
+
+public final static String CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG = 
"group.consumer.heartbeat.interval.ms";
+public final static 

Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


chia7712 commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559774590


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##
@@ -16,14 +16,132 @@
  */
 package org.apache.kafka.coordinator.group;
 
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 /**
  * The group coordinator configurations.
  */
 public class GroupCoordinatorConfig {
+/** * Group coordinator configuration ***/
+public final static String GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = 
"group.min.session.timeout.ms";
+public final static String GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The minimum 
allowed session timeout for registered consumers. Shorter timeouts result in 
quicker failure detection at the cost of more frequent consumer heartbeating, 
which can overwhelm broker resources.";
+public static final int GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 6000;
+
+public final static String GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = 
"group.max.session.timeout.ms";
+public final static String GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The maximum 
allowed session timeout for registered consumers. Longer timeouts give 
consumers more time to process messages in between heartbeats at the cost of a 
longer time to detect failures.";
+public static final int GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 180;
+
+public final static String GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG = 
"group.initial.rebalance.delay.ms";
+public final static String GROUP_INITIAL_REBALANCE_DELAY_MS_DOC = "The 
amount of time the group coordinator will wait for more consumers to join a new 
group before performing the first rebalance. A longer delay means potentially 
fewer rebalances, but increases the time until processing begins.";
+public static final int GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT = 3000;
+
+public final static String GROUP_MAX_SIZE_CONFIG = "group.max.size";
+public final static String GROUP_MAX_SIZE_DOC = "The maximum number of 
consumers that a single consumer group can accommodate.";
+public static final int GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
+
+/** New group coordinator configs */
+public final static String NEW_GROUP_COORDINATOR_ENABLE_CONFIG = 
"group.coordinator.new.enable";
+public final static String NEW_GROUP_COORDINATOR_ENABLE_DOC = "Enable the 
new group coordinator.";
+public static final boolean NEW_GROUP_COORDINATOR_ENABLE_DEFAULT = false;
+
+public final static String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = 
"group.coordinator.rebalance.protocols";
+public final static String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = 
"The list of enabled rebalance protocols. Supported protocols: " + 
Utils.join(Group.GroupType.values(), ",") + ". " +
+"The " + Group.GroupType.CONSUMER + " rebalance protocol is in 
early access and therefore must not be used in production.";
+public static final List 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = 
Collections.singletonList(Group.GroupType.CLASSIC.toString());
+
+public final static String GROUP_COORDINATOR_NUM_THREADS_CONFIG = 
"group.coordinator.threads";
+public final static String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number 
of threads used by the group coordinator.";
+public static final int GROUP_COORDINATOR_NUM_THREADS_DEFAULT = 1;
+
+/** Consumer group configs */
+public final static String CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG = 
"group.consumer.session.timeout.ms";
+public final static String CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC = "The 
timeout to detect client failures when using the consumer group protocol.";
+public static final int CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000;
+
+public final static String CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = 
"group.consumer.min.session.timeout.ms";
+public final static String CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC = 
"The minimum allowed session timeout for registered consumers.";
+public static final int CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 
45000;
+
+public final static String CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = 
"group.consumer.max.session.timeout.ms";
+public final static String CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DOC = 
"The maximum allowed session timeout for registered consumers.";
+public static final int CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 
6;
+
+public final static String CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG = 
"group.consumer.heartbeat.interval.ms";
+public final static

Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


OmniaGM commented on PR #15684:
URL: https://github.com/apache/kafka/pull/15684#issuecomment-2047947749

   ugh my local environment keep the wrong caches because I keep jump between 
branches 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


chia7712 commented on PR #15684:
URL: https://github.com/apache/kafka/pull/15684#issuecomment-2047933442

   @OmniaGM it seems there are build error. could you fix them?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559709137


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   I updated the pr now. And rebased from trunk as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559706475


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   sorry I meant `org.apache.kafka.coordinator.group`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


chia7712 commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559703336


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   > then we can change the constructor of GroupCoordinatorConfig to accept 
KafkaConfig 
   
   that makes sense to me. A unified way to generate those config class can 
make consistent behavior.
   
   > I moved it to kafka.coordinator.group 
   
   I assume the package you mentioned is in core module, but I'm ok with your 
approach since it can avoid rewriting java code back to scala code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559689777


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   I would suggest that we wait until KafkaConfig is fully migrated out of core 
and then we can change the constructor of `GroupCoordinatorConfig` to accept 
[KafkaConfig](https://issues.apache.org/jira/browse/KAFKA-15853?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=17789703#comment-17789703)
 and it extract any needed grouping out KafkaConfig definition. WDYT? This 
might be the easiest way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559689777


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   I would suggest that we wait until KafkaConfig is fully migrated out of core 
and then we can change the constructor of `GroupCoordinatorConfig` to accept 
KafkaConfig and it extract any needed grouping out KafkaConfig definition. 
WDYT? This might be the easiest way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559689777


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   I would suggest that we wait until KafkaConfig is fully migrated out and 
then we can change the constructor of `GroupCoordinatorConfig` to accept 
KafkaConfig and it extract any needed grouping out KafkaConfig definition. 
WDYT? This might be the easiest way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559674776


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   I moved it to `kafka.coordinator.group` for now we can have a followup for
   
[KAFKA-15089](https://github.com/apache/kafka/pull/15684/commits/581e9a88089aa9881ec28de75877350e17711b3e)
 to look into removing this class later. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


chia7712 commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559616179


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   > OffsetConfig is only used by Scala code so it will disappear when we 
remove it
   
   or we can move `OffsetConfig` to `kafka.coordinator.group` since it is used 
by `kafka.coordinator.group.GroupCoordinator` and 
`kafka.coordinator.group.GroupMetadataManager`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


dajac commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559600382


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   `OffsetConfig` is only used by Scala code so it will disappear when we 
remove it. The `GroupConfig` in Scala may never be migrated to Java as 
`GroupCoordinatorConfig` already contains everything, I think. I wonder if we 
could actually replace `OffsetConfig` by an interface and make 
`GroupCoordinatorConfig` implements it. Then, we could pass 
`GroupCoordinatorConfig` to the old code too. I am not sure if this is feasible 
though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559529499


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   It is also bit odd to have a class  only for the constructor that do nothing 
but grouping. But I can see that split it out might be better as it seems like 
we have `GroupConfig` in scala that will get migrated to java at some point so 
wouldn't make sense to have the `GroupCoordinatorConfig` grow out of hand. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


chia7712 commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559398541


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   it seems to me the package `GroupCoordinatorConfig.OffsetConfig` is a bit 
confused, and move `OffsetConfig` out of `GroupCoordinatorConfig` should be 
fine. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559064580


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   I pushed the changes let me know if it makes sense or if you have other 
suggestions/feedback. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559054203


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   I'll move all props, defaults and docs into `GroupCoordinatorConfig` and 
move the constructor for `OffsetConfig` as static class within 
`GroupCoordinatorConfig` as `GroupMetadataManager` which mainly cares about 
`OffsetConfig` doesn't need the rest of the configs in `GroupCoordinatorConfig`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1559022658


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   Good call, this actually will make it simplier. Will update the PR to 
reflict this 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-10 Thread via GitHub


dajac commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1558996031


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,51 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {

Review Comment:
   @OmniaGM Thanks for working on this. I wonder whether we could merge 
`OffsetConfig` into `GroupCoordinatorConfig` in order to have one config for 
the entire package. It would bring us closer to the goal described in 
https://issues.apache.org/jira/browse/KAFKA-15089. Would it be possible? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-09 Thread via GitHub


chia7712 commented on PR #15684:
URL: https://github.com/apache/kafka/pull/15684#issuecomment-2044767415

   @OmniaGM it seems there are build error. could you please check them? thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1556491449


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -88,10 +125,10 @@ public OffsetConfig(int maxMetadataSize,
 }
 
 public OffsetConfig() {

Review Comment:
   deleted 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1556490595


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,53 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {
-public static final int DEFAULT_MAX_METADATA_SIZE = 4096;
-public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024;
-public static final long DEFAULT_OFFSET_RETENTION_MS = 24 * 60 * 60 * 
1000L;
-public static final long DEFAULT_OFFSETS_RETENTION_CHECK_INTERVAL_MS = 
60L;
-public static final int DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS = 50;
-public static final int DEFAULT_OFFSETS_TOPIC_SEGMENT_BYTES = 100 * 1024 * 
1024;
-public static final short DEFAULT_OFFSETS_TOPIC_REPLICATION_FACTOR = 3;
-public static final CompressionType DEFAULT_OFFSETS_TOPIC_COMPRESSION_TYPE 
= CompressionType.NONE;
-public static final int DEFAULT_OFFSET_COMMIT_TIMEOUT_MS = 5000;
-public static final short DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS = -1;
+public static final String OFFSET_METADATA_MAX_SIZE_CONFIG = 
"offset.metadata.max.bytes";
+public static final int OFFSET_METADATA_MAX_SIZE_DEFAULT = 4096;
+public static final String OFFSET_METADATA_MAX_SIZE_DOC = "The maximum 
size for a metadata entry associated with an offset commit.";
+
+public static final String OFFSETS_LOAD_BUFFER_SIZE_CONFIG = 
"offsets.load.buffer.size";
+public static final int OFFSETS_LOAD_BUFFER_SIZE_DEFAULT = 5 * 1024 * 1024;
+public static final String OFFSETS_LOAD_BUFFER_SIZE_DOC = "Batch size for 
reading from the offsets segments when loading offsets into the cache 
(soft-limit, overridden if records are too large).";
+
+public static final String OFFSETS_RETENTION_MINUTES_CONFIG = 
"offsets.retention.minutes";
+public static final int OFFSETS_RETENTION_MINUTES_DEFAULT = 7 * 24 * 60;
+public static final String OFFSETS_RETENTION_MINUTES_DOC = "For subscribed 
consumers, committed offset of a specific partition will be expired and 
discarded when 1) this retention period has elapsed after the consumer group 
loses all its consumers (i.e. becomes empty); " +
+"2) this retention period has elapsed since the last time an 
offset is committed for the partition and the group is no longer subscribed to 
the corresponding topic. " +
+"For standalone consumers (using manual assignment), offsets will 
be expired after this retention period has elapsed since the time of last 
commit. " +
+"Note that when a group is deleted via the delete-group request, 
its committed offsets will also be deleted without extra retention period; " +
+"also when a topic is deleted via the delete-topic request, upon 
propagated metadata update any group's committed offsets for that topic will 
also be deleted without extra retention period.";
+
+public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG = 
"offsets.retention.check.interval.ms";
+public static final long OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT = 
60L;
+public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC = 
"Frequency at which to check for stale offsets";
+
+public static final String OFFSETS_TOPIC_PARTITIONS_CONFIG = 
"offsets.topic.num.partitions";
+public static final int OFFSETS_TOPIC_PARTITIONS_DEFAULT = 50;
+public static final String OFFSETS_TOPIC_PARTITIONS_DOC = "The number of 
partitions for the offset commit topic (should not change after deployment).";
+
+public static final String OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG = 
"offsets.topic.segment.bytes";
+public static final int OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 
1024;
+public static final String OFFSETS_TOPIC_SEGMENT_BYTES_DOC = "The offsets 
topic segment bytes should be kept relatively small in order to facilitate 
faster log compaction and cache loads.";
+
+public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG = 
"offsets.topic.replication.factor";
+public static final short OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3;
+public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_DOC = "The 
replication factor for the offsets topic (set higher to ensure availability). " 
+
+"Internal topic creation will fail until the cluster size meets 
this replication factor requirement.";
+
+public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG = 
"offsets.topic.compression.codec";
+public static final CompressionType 
OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT = CompressionType.NONE;
+public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_DOC = 
"Compression codec for the offsets topic - compression may be used to achieve 
\"atomic\" commits.";
+
+public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = 
"offsets.commit.timeout.ms";
+public static fi

Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-08 Thread via GitHub


chia7712 commented on code in PR #15684:
URL: https://github.com/apache/kafka/pull/15684#discussion_r1556449383


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##
@@ -17,13 +17,84 @@
 package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.common.utils.Utils;

Review Comment:
   It seems we don't have strict import orders, but it would be nice to 
optimize the imports :)



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java:
##
@@ -20,16 +20,53 @@
 import org.apache.kafka.common.record.CompressionType;
 
 public class OffsetConfig {
-public static final int DEFAULT_MAX_METADATA_SIZE = 4096;
-public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024;
-public static final long DEFAULT_OFFSET_RETENTION_MS = 24 * 60 * 60 * 
1000L;
-public static final long DEFAULT_OFFSETS_RETENTION_CHECK_INTERVAL_MS = 
60L;
-public static final int DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS = 50;
-public static final int DEFAULT_OFFSETS_TOPIC_SEGMENT_BYTES = 100 * 1024 * 
1024;
-public static final short DEFAULT_OFFSETS_TOPIC_REPLICATION_FACTOR = 3;
-public static final CompressionType DEFAULT_OFFSETS_TOPIC_COMPRESSION_TYPE 
= CompressionType.NONE;
-public static final int DEFAULT_OFFSET_COMMIT_TIMEOUT_MS = 5000;
-public static final short DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS = -1;
+public static final String OFFSET_METADATA_MAX_SIZE_CONFIG = 
"offset.metadata.max.bytes";
+public static final int OFFSET_METADATA_MAX_SIZE_DEFAULT = 4096;
+public static final String OFFSET_METADATA_MAX_SIZE_DOC = "The maximum 
size for a metadata entry associated with an offset commit.";
+
+public static final String OFFSETS_LOAD_BUFFER_SIZE_CONFIG = 
"offsets.load.buffer.size";
+public static final int OFFSETS_LOAD_BUFFER_SIZE_DEFAULT = 5 * 1024 * 1024;
+public static final String OFFSETS_LOAD_BUFFER_SIZE_DOC = "Batch size for 
reading from the offsets segments when loading offsets into the cache 
(soft-limit, overridden if records are too large).";
+
+public static final String OFFSETS_RETENTION_MINUTES_CONFIG = 
"offsets.retention.minutes";
+public static final int OFFSETS_RETENTION_MINUTES_DEFAULT = 7 * 24 * 60;
+public static final String OFFSETS_RETENTION_MINUTES_DOC = "For subscribed 
consumers, committed offset of a specific partition will be expired and 
discarded when 1) this retention period has elapsed after the consumer group 
loses all its consumers (i.e. becomes empty); " +
+"2) this retention period has elapsed since the last time an 
offset is committed for the partition and the group is no longer subscribed to 
the corresponding topic. " +
+"For standalone consumers (using manual assignment), offsets will 
be expired after this retention period has elapsed since the time of last 
commit. " +
+"Note that when a group is deleted via the delete-group request, 
its committed offsets will also be deleted without extra retention period; " +
+"also when a topic is deleted via the delete-topic request, upon 
propagated metadata update any group's committed offsets for that topic will 
also be deleted without extra retention period.";
+
+public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG = 
"offsets.retention.check.interval.ms";
+public static final long OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT = 
60L;
+public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC = 
"Frequency at which to check for stale offsets";
+
+public static final String OFFSETS_TOPIC_PARTITIONS_CONFIG = 
"offsets.topic.num.partitions";
+public static final int OFFSETS_TOPIC_PARTITIONS_DEFAULT = 50;
+public static final String OFFSETS_TOPIC_PARTITIONS_DOC = "The number of 
partitions for the offset commit topic (should not change after deployment).";
+
+public static final String OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG = 
"offsets.topic.segment.bytes";
+public static final int OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 
1024;
+public static final String OFFSETS_TOPIC_SEGMENT_BYTES_DOC = "The offsets 
topic segment bytes should be kept relatively small in order to facilitate 
faster log compaction and cache loads.";
+
+public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG = 
"offsets.topic.replication.factor";
+public static final short OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3;
+public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_DOC = "The 
replication factor for the offsets topic (set higher to ensure availability). " 
+
+"Internal topic creation will fail until the cluster size meets 
this replication factor requirement.";
+
+public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG = 
"offsets.topi

[PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-08 Thread via GitHub


OmniaGM opened a new pull request, #15684:
URL: https://github.com/apache/kafka/pull/15684

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org