chia7712 commented on code in PR #15684: URL: https://github.com/apache/kafka/pull/15684#discussion_r1556449383
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java: ########## @@ -17,13 +17,84 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.common.utils.Utils; Review Comment: It seems we don't have strict import orders, but it would be nice to optimize the imports :) ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java: ########## @@ -20,16 +20,53 @@ import org.apache.kafka.common.record.CompressionType; public class OffsetConfig { - public static final int DEFAULT_MAX_METADATA_SIZE = 4096; - public static final int DEFAULT_LOAD_BUFFER_SIZE = 5 * 1024 * 1024; - public static final long DEFAULT_OFFSET_RETENTION_MS = 24 * 60 * 60 * 1000L; - public static final long DEFAULT_OFFSETS_RETENTION_CHECK_INTERVAL_MS = 600000L; - public static final int DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS = 50; - public static final int DEFAULT_OFFSETS_TOPIC_SEGMENT_BYTES = 100 * 1024 * 1024; - public static final short DEFAULT_OFFSETS_TOPIC_REPLICATION_FACTOR = 3; - public static final CompressionType DEFAULT_OFFSETS_TOPIC_COMPRESSION_TYPE = CompressionType.NONE; - public static final int DEFAULT_OFFSET_COMMIT_TIMEOUT_MS = 5000; - public static final short DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS = -1; + public static final String OFFSET_METADATA_MAX_SIZE_CONFIG = "offset.metadata.max.bytes"; + public static final int OFFSET_METADATA_MAX_SIZE_DEFAULT = 4096; + public static final String OFFSET_METADATA_MAX_SIZE_DOC = "The maximum size for a metadata entry associated with an offset commit."; + + public static final String OFFSETS_LOAD_BUFFER_SIZE_CONFIG = "offsets.load.buffer.size"; + public static final int OFFSETS_LOAD_BUFFER_SIZE_DEFAULT = 5 * 1024 * 1024; + public static final String OFFSETS_LOAD_BUFFER_SIZE_DOC = "Batch size for reading from the offsets segments when loading offsets into the cache (soft-limit, overridden if records are too large)."; + + public static final String OFFSETS_RETENTION_MINUTES_CONFIG = "offsets.retention.minutes"; + public static final int OFFSETS_RETENTION_MINUTES_DEFAULT = 7 * 24 * 60; + public static final String OFFSETS_RETENTION_MINUTES_DOC = "For subscribed consumers, committed offset of a specific partition will be expired and discarded when 1) this retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty); " + + "2) this retention period has elapsed since the last time an offset is committed for the partition and the group is no longer subscribed to the corresponding topic. " + + "For standalone consumers (using manual assignment), offsets will be expired after this retention period has elapsed since the time of last commit. " + + "Note that when a group is deleted via the delete-group request, its committed offsets will also be deleted without extra retention period; " + + "also when a topic is deleted via the delete-topic request, upon propagated metadata update any group's committed offsets for that topic will also be deleted without extra retention period."; + + public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG = "offsets.retention.check.interval.ms"; + public static final long OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT = 600000L; + public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC = "Frequency at which to check for stale offsets"; + + public static final String OFFSETS_TOPIC_PARTITIONS_CONFIG = "offsets.topic.num.partitions"; + public static final int OFFSETS_TOPIC_PARTITIONS_DEFAULT = 50; + public static final String OFFSETS_TOPIC_PARTITIONS_DOC = "The number of partitions for the offset commit topic (should not change after deployment)."; + + public static final String OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG = "offsets.topic.segment.bytes"; + public static final int OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024; + public static final String OFFSETS_TOPIC_SEGMENT_BYTES_DOC = "The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads."; + + public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG = "offsets.topic.replication.factor"; + public static final short OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3; + public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor for the offsets topic (set higher to ensure availability). " + + "Internal topic creation will fail until the cluster size meets this replication factor requirement."; + + public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG = "offsets.topic.compression.codec"; + public static final CompressionType OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT = CompressionType.NONE; + public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_DOC = "Compression codec for the offsets topic - compression may be used to achieve \"atomic\" commits."; + + public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = "offsets.commit.timeout.ms"; + public static final int OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000; + public static final String OFFSET_COMMIT_TIMEOUT_MS_DOC = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " + + "or this timeout is reached. This is similar to the producer request timeout."; + + public static final String OFFSET_COMMIT_REQUIRED_ACKS_CONFIG = "offsets.commit.required.acks"; + public static final short OFFSET_COMMIT_REQUIRED_ACKS_DEFAULT = -1; + public static final String OFFSET_COMMIT_REQUIRED_ACKS_DOC = "The required acks before the commit can be accepted. In general, the default (-1) should not be overridden."; + + public static final long OFFSET_RETENTION_MS_DEFAULT = 24 * 60 * 60 * 1000L; Review Comment: this default value is not equal to `OFFSETS_RETENTION_MINUTES_DEFAULT`. that is weird to me ... ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetConfig.java: ########## @@ -88,10 +125,10 @@ public OffsetConfig(int maxMetadataSize, } public OffsetConfig() { Review Comment: it seems this constructor is unused. Could we remove it? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java: ########## @@ -17,13 +17,84 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import java.util.Arrays; +import java.util.Collections; import java.util.List; /** * The group coordinator configurations. */ public class GroupCoordinatorConfig { + /** ********* Group coordinator configuration ***********/ + public final static String GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = "group.min.session.timeout.ms"; + public final static String GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The minimum allowed session timeout for registered consumers. Shorter timeouts result in quicker failure detection at the cost of more frequent consumer heartbeating, which can overwhelm broker resources."; + public static final int GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 6000; + + public final static String GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = "group.max.session.timeout.ms"; + public final static String GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The maximum allowed session timeout for registered consumers. Longer timeouts give consumers more time to process messages in between heartbeats at the cost of a longer time to detect failures."; + public static final int GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 1800000; + + public final static String GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG = "group.initial.rebalance.delay.ms"; + public final static String GROUP_INITIAL_REBALANCE_DELAY_MS_DOC = "The amount of time the group coordinator will wait for more consumers to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins."; + public static final int GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT = 3000; + + public final static String GROUP_MAX_SIZE_CONFIG = "group.max.size"; + public final static String GROUP_MAX_SIZE_DOC = "The maximum number of consumers that a single consumer group can accommodate."; + public static final int GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE; + + /** New group coordinator configs */ + public final static String NEW_GROUP_COORDINATOR_ENABLE_CONFIG = "group.coordinator.new.enable"; + public final static String NEW_GROUP_COORDINATOR_ENABLE_DOC = "Enable the new group coordinator."; + public static final boolean NEW_GROUP_COORDINATOR_ENABLE_DEFAULT = false; + + public final static String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = "group.coordinator.rebalance.protocols"; + public final static String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = "The list of enabled rebalance protocols. Supported protocols: " + Utils.join(Group.GroupType.values(), ",") + ". " + + "The " + Group.GroupType.CONSUMER + " rebalance protocol is in early access and therefore must not be used in production."; + public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = Collections.singletonList(Group.GroupType.CLASSIC.toString()); + + public final static String GROUP_COORDINATOR_NUM_THREADS_CONFIG = "group.coordinator.threads"; + public final static String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number of threads used by the group coordinator."; + public static final int GROUP_COORDINATOR_NUM_THREADS_DEFAULT = 1; + + /** Consumer group configs */ + public final static String CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG = "group.consumer.session.timeout.ms"; + public final static String CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC = "The timeout to detect client failures when using the consumer group protocol."; + public static final int CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000; + + public final static String CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = "group.consumer.min.session.timeout.ms"; + public final static String CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The minimum allowed session timeout for registered consumers."; + public static final int CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 45000; + + public final static String CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = "group.consumer.max.session.timeout.ms"; + public final static String CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The maximum allowed session timeout for registered consumers."; + public static final int CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 60000; + + public final static String CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG = "group.consumer.heartbeat.interval.ms"; + public final static String CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC = "The heartbeat interval given to the members of a consumer group."; + public static final int CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT = 5000; + + public final static String CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG = "group.consumer.min.heartbeat.interval.ms"; + public final static String CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC = "The minimum heartbeat interval for registered consumers."; + public static final int CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT = 5000; + + public final static String CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG ="group.consumer.max.heartbeat.interval.ms"; + public final static String CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum heartbeat interval for registered consumers."; + public static final int CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000; + + public final static String CONSUMER_GROUP_MAX_SIZE_CONFIG = "group.consumer.max.size"; + public final static String CONSUMER_GROUP_MAX_SIZE_DOC = "The maximum number of consumers that a single consumer group can accommodate."; + public static final int CONSUMER_GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE; + + public final static String CONSUMER_GROUP_ASSIGNORS_CONFIG = "group.consumer.assignors"; + public final static String CONSUMER_GROUP_ASSIGNORS_DOC = "The server side assignors as a list of full class names. The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor."; + public static final List<String> CONSUMER_GROUP_ASSIGNORS_DEFAULT = Arrays.asList( Review Comment: Could we make it immutable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org