mimaison commented on code in PR #15158: URL: https://github.com/apache/kafka/pull/15158#discussion_r1458684850
########## checkstyle/import-control-core.xml: ########## @@ -82,6 +82,7 @@ <allow pkg="kafka.log" /> <allow pkg="kafka.cluster" /> <allow pkg="kafka.server" /> + <allow pkg="org.apache.kafka.server" /> Review Comment: Do we really need this change? core seems to build fine without it. ########## core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala: ########## @@ -220,8 +220,8 @@ object TestLinearWriteSpeed { brokerTopicStats = new BrokerTopicStats, time = Time.SYSTEM, maxTransactionTimeoutMs = 5 * 60 * 1000, - producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false), - producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs, + producerStateManagerConfig = new ProducerStateManagerConfig(org.apache.kafka.server.config.Defaults.PRODUCER_ID_EXPIRATION_MS, false), Review Comment: Can we import `org.apache.kafka.server.config.Defaults`? ########## core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala: ########## @@ -53,7 +53,7 @@ class LogCleanerManagerTest extends Logging { val logConfig: LogConfig = new LogConfig(logProps) val time = new MockTime(1400000000000L, 1000L) // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs` val offset = 999 - val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false) + val producerStateManagerConfig = new ProducerStateManagerConfig(org.apache.kafka.server.config.Defaults.PRODUCER_ID_EXPIRATION_MS, false) Review Comment: Can we import `org.apache.kafka.server.config.Defaults`? ########## core/src/main/scala/kafka/server/KafkaConfig.scala: ########## @@ -1336,205 +1097,205 @@ object KafkaConfig { .define(LogMessageDownConversionEnableProp, BOOLEAN, LogConfig.DEFAULT_MESSAGE_DOWNCONVERSION_ENABLE, LOW, LogMessageDownConversionEnableDoc) /** ********* Replication configuration ***********/ - .define(ControllerSocketTimeoutMsProp, INT, Defaults.ControllerSocketTimeoutMs, MEDIUM, ControllerSocketTimeoutMsDoc) - .define(DefaultReplicationFactorProp, INT, Defaults.DefaultReplicationFactor, MEDIUM, DefaultReplicationFactorDoc) - .define(ReplicaLagTimeMaxMsProp, LONG, Defaults.ReplicaLagTimeMaxMs, HIGH, ReplicaLagTimeMaxMsDoc) - .define(ReplicaSocketTimeoutMsProp, INT, Defaults.ReplicaSocketTimeoutMs, HIGH, ReplicaSocketTimeoutMsDoc) - .define(ReplicaSocketReceiveBufferBytesProp, INT, Defaults.ReplicaSocketReceiveBufferBytes, HIGH, ReplicaSocketReceiveBufferBytesDoc) - .define(ReplicaFetchMaxBytesProp, INT, Defaults.ReplicaFetchMaxBytes, atLeast(0), MEDIUM, ReplicaFetchMaxBytesDoc) - .define(ReplicaFetchWaitMaxMsProp, INT, Defaults.ReplicaFetchWaitMaxMs, HIGH, ReplicaFetchWaitMaxMsDoc) - .define(ReplicaFetchBackoffMsProp, INT, Defaults.ReplicaFetchBackoffMs, atLeast(0), MEDIUM, ReplicaFetchBackoffMsDoc) - .define(ReplicaFetchMinBytesProp, INT, Defaults.ReplicaFetchMinBytes, HIGH, ReplicaFetchMinBytesDoc) - .define(ReplicaFetchResponseMaxBytesProp, INT, Defaults.ReplicaFetchResponseMaxBytes, atLeast(0), MEDIUM, ReplicaFetchResponseMaxBytesDoc) - .define(NumReplicaFetchersProp, INT, Defaults.NumReplicaFetchers, HIGH, NumReplicaFetchersDoc) - .define(ReplicaHighWatermarkCheckpointIntervalMsProp, LONG, Defaults.ReplicaHighWatermarkCheckpointIntervalMs, HIGH, ReplicaHighWatermarkCheckpointIntervalMsDoc) - .define(FetchPurgatoryPurgeIntervalRequestsProp, INT, Defaults.FetchPurgatoryPurgeIntervalRequests, MEDIUM, FetchPurgatoryPurgeIntervalRequestsDoc) - .define(ProducerPurgatoryPurgeIntervalRequestsProp, INT, Defaults.ProducerPurgatoryPurgeIntervalRequests, MEDIUM, ProducerPurgatoryPurgeIntervalRequestsDoc) - .define(DeleteRecordsPurgatoryPurgeIntervalRequestsProp, INT, Defaults.DeleteRecordsPurgatoryPurgeIntervalRequests, MEDIUM, DeleteRecordsPurgatoryPurgeIntervalRequestsDoc) - .define(AutoLeaderRebalanceEnableProp, BOOLEAN, Defaults.AutoLeaderRebalanceEnable, HIGH, AutoLeaderRebalanceEnableDoc) - .define(LeaderImbalancePerBrokerPercentageProp, INT, Defaults.LeaderImbalancePerBrokerPercentage, HIGH, LeaderImbalancePerBrokerPercentageDoc) - .define(LeaderImbalanceCheckIntervalSecondsProp, LONG, Defaults.LeaderImbalanceCheckIntervalSeconds, atLeast(1), HIGH, LeaderImbalanceCheckIntervalSecondsDoc) + .define(ControllerSocketTimeoutMsProp, INT, Defaults.CONTROLLER_SOCKET_TIMEOUT_MS, MEDIUM, ControllerSocketTimeoutMsDoc) + .define(DefaultReplicationFactorProp, INT, Defaults.REPLICATION_FACTOR, MEDIUM, DefaultReplicationFactorDoc) + .define(ReplicaLagTimeMaxMsProp, LONG, Defaults.REPLICA_LAG_TIME_MAX_MS, HIGH, ReplicaLagTimeMaxMsDoc) + .define(ReplicaSocketTimeoutMsProp, INT, Defaults.REPLICA_SOCKET_TIMEOUT_MS, HIGH, ReplicaSocketTimeoutMsDoc) + .define(ReplicaSocketReceiveBufferBytesProp, INT, Defaults.REPLICA_SOCKET_RECEIVE_BUFFER_BYTES, HIGH, ReplicaSocketReceiveBufferBytesDoc) + .define(ReplicaFetchMaxBytesProp, INT, Defaults.REPLICA_FETCH_MAX_BYTES, atLeast(0), MEDIUM, ReplicaFetchMaxBytesDoc) + .define(ReplicaFetchWaitMaxMsProp, INT, Defaults.REPLICA_FETCH_WAIT_MAX_MS, HIGH, ReplicaFetchWaitMaxMsDoc) + .define(ReplicaFetchBackoffMsProp, INT, Defaults.REPLICA_FETCH_BACKOFF_MS, atLeast(0), MEDIUM, ReplicaFetchBackoffMsDoc) + .define(ReplicaFetchMinBytesProp, INT, Defaults.REPLICA_FETCH_MIN_BYTES, HIGH, ReplicaFetchMinBytesDoc) + .define(ReplicaFetchResponseMaxBytesProp, INT, Defaults.REPLICA_FETCH_RESPONSE_MAX_BYTES, atLeast(0), MEDIUM, ReplicaFetchResponseMaxBytesDoc) + .define(NumReplicaFetchersProp, INT, Defaults.NUM_REPLICA_FETCHERS, HIGH, NumReplicaFetchersDoc) + .define(ReplicaHighWatermarkCheckpointIntervalMsProp, LONG, Defaults.REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS, HIGH, ReplicaHighWatermarkCheckpointIntervalMsDoc) + .define(FetchPurgatoryPurgeIntervalRequestsProp, INT, Defaults.FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS, MEDIUM, FetchPurgatoryPurgeIntervalRequestsDoc) + .define(ProducerPurgatoryPurgeIntervalRequestsProp, INT, Defaults.PRODUCER_PURGATORY_PURGE_INTERVAL_REQUESTS, MEDIUM, ProducerPurgatoryPurgeIntervalRequestsDoc) + .define(DeleteRecordsPurgatoryPurgeIntervalRequestsProp, INT, Defaults.DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS, MEDIUM, DeleteRecordsPurgatoryPurgeIntervalRequestsDoc) + .define(AutoLeaderRebalanceEnableProp, BOOLEAN, Defaults.AUTO_LEADER_REBALANCE_ENABLE, HIGH, AutoLeaderRebalanceEnableDoc) + .define(LeaderImbalancePerBrokerPercentageProp, INT, Defaults.LEADER_IMBALANCE_PER_BROKER_PERCENTAGE, HIGH, LeaderImbalancePerBrokerPercentageDoc) + .define(LeaderImbalanceCheckIntervalSecondsProp, LONG, Defaults.LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS, atLeast(1), HIGH, LeaderImbalanceCheckIntervalSecondsDoc) .define(UncleanLeaderElectionEnableProp, BOOLEAN, LogConfig.DEFAULT_UNCLEAN_LEADER_ELECTION_ENABLE, HIGH, UncleanLeaderElectionEnableDoc) - .define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, in(Utils.enumOptions(classOf[SecurityProtocol]):_*), MEDIUM, InterBrokerSecurityProtocolDoc) - .define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, new MetadataVersionValidator(), MEDIUM, InterBrokerProtocolVersionDoc) + .define(InterBrokerSecurityProtocolProp, STRING, Defaults.INTER_BROKER_SECURITY_PROTOCOL, in(Utils.enumOptions(classOf[SecurityProtocol]):_*), MEDIUM, InterBrokerSecurityProtocolDoc) + .define(InterBrokerProtocolVersionProp, STRING, Defaults.INTER_BROKER_PROTOCOL_VERSION, new MetadataVersionValidator(), MEDIUM, InterBrokerProtocolVersionDoc) .define(InterBrokerListenerNameProp, STRING, null, MEDIUM, InterBrokerListenerNameDoc) .define(ReplicaSelectorClassProp, STRING, null, MEDIUM, ReplicaSelectorClassDoc) /** ********* Controlled shutdown configuration ***********/ - .define(ControlledShutdownMaxRetriesProp, INT, Defaults.ControlledShutdownMaxRetries, MEDIUM, ControlledShutdownMaxRetriesDoc) - .define(ControlledShutdownRetryBackoffMsProp, LONG, Defaults.ControlledShutdownRetryBackoffMs, MEDIUM, ControlledShutdownRetryBackoffMsDoc) - .define(ControlledShutdownEnableProp, BOOLEAN, Defaults.ControlledShutdownEnable, MEDIUM, ControlledShutdownEnableDoc) + .define(ControlledShutdownMaxRetriesProp, INT, Defaults.CONTROLLED_SHUTDOWN_MAX_RETRIES, MEDIUM, ControlledShutdownMaxRetriesDoc) + .define(ControlledShutdownRetryBackoffMsProp, LONG, Defaults.CONTROLLED_SHUTDOWN_RETRY_BACKOFF_MS, MEDIUM, ControlledShutdownRetryBackoffMsDoc) + .define(ControlledShutdownEnableProp, BOOLEAN, Defaults.CONTROLLED_SHUTDOWN_ENABLE, MEDIUM, ControlledShutdownEnableDoc) /** ********* Group coordinator configuration ***********/ - .define(GroupMinSessionTimeoutMsProp, INT, Defaults.GroupMinSessionTimeoutMs, MEDIUM, GroupMinSessionTimeoutMsDoc) - .define(GroupMaxSessionTimeoutMsProp, INT, Defaults.GroupMaxSessionTimeoutMs, MEDIUM, GroupMaxSessionTimeoutMsDoc) - .define(GroupInitialRebalanceDelayMsProp, INT, Defaults.GroupInitialRebalanceDelayMs, MEDIUM, GroupInitialRebalanceDelayMsDoc) - .define(GroupMaxSizeProp, INT, Defaults.GroupMaxSize, atLeast(1), MEDIUM, GroupMaxSizeDoc) + .define(GroupMinSessionTimeoutMsProp, INT, Defaults.GROUP_MIN_SESSION_TIMEOUT_MS, MEDIUM, GroupMinSessionTimeoutMsDoc) + .define(GroupMaxSessionTimeoutMsProp, INT, Defaults.GROUP_MAX_SESSION_TIMEOUT_MS, MEDIUM, GroupMaxSessionTimeoutMsDoc) + .define(GroupInitialRebalanceDelayMsProp, INT, Defaults.GROUP_INITIAL_REBALANCE_DELAY_MS, MEDIUM, GroupInitialRebalanceDelayMsDoc) + .define(GroupMaxSizeProp, INT, Defaults.GROUP_MAX_SIZE, atLeast(1), MEDIUM, GroupMaxSizeDoc) /** New group coordinator configs */ - .define(GroupCoordinatorRebalanceProtocolsProp, LIST, Defaults.GroupCoordinatorRebalanceProtocols, + .define(GroupCoordinatorRebalanceProtocolsProp, LIST, Defaults.GROUP_COORDINATOR_REBALANCE_PROTOCOLS, ConfigDef.ValidList.in(Utils.enumOptions(classOf[GroupType]):_*), MEDIUM, GroupCoordinatorRebalanceProtocolsDoc) - .define(GroupCoordinatorNumThreadsProp, INT, Defaults.GroupCoordinatorNumThreads, atLeast(1), MEDIUM, GroupCoordinatorNumThreadsDoc) + .define(GroupCoordinatorNumThreadsProp, INT, Defaults.GROUP_COORDINATOR_NUM_THREADS, atLeast(1), MEDIUM, GroupCoordinatorNumThreadsDoc) // Internal configuration used by integration and system tests. - .defineInternal(NewGroupCoordinatorEnableProp, BOOLEAN, Defaults.NewGroupCoordinatorEnable, null, MEDIUM, NewGroupCoordinatorEnableDoc) + .defineInternal(NewGroupCoordinatorEnableProp, BOOLEAN, Defaults.NEW_GROUP_COORDINATOR_ENABLE, null, MEDIUM, NewGroupCoordinatorEnableDoc) /** Consumer groups configs */ - .define(ConsumerGroupSessionTimeoutMsProp, INT, Defaults.ConsumerGroupSessionTimeoutMs, atLeast(1), MEDIUM, ConsumerGroupSessionTimeoutMsDoc) - .define(ConsumerGroupMinSessionTimeoutMsProp, INT, Defaults.ConsumerGroupMinSessionTimeoutMs, atLeast(1), MEDIUM, ConsumerGroupMinSessionTimeoutMsDoc) - .define(ConsumerGroupMaxSessionTimeoutMsProp, INT, Defaults.ConsumerGroupMaxSessionTimeoutMs, atLeast(1), MEDIUM, ConsumerGroupMaxSessionTimeoutMsDoc) - .define(ConsumerGroupHeartbeatIntervalMsProp, INT, Defaults.ConsumerGroupHeartbeatIntervalMs, atLeast(1), MEDIUM, ConsumerGroupHeartbeatIntervalMsDoc) - .define(ConsumerGroupMinHeartbeatIntervalMsProp, INT, Defaults.ConsumerGroupMinHeartbeatIntervalMs, atLeast(1), MEDIUM, ConsumerGroupMinHeartbeatIntervalMsDoc) - .define(ConsumerGroupMaxHeartbeatIntervalMsProp, INT, Defaults.ConsumerGroupMaxHeartbeatIntervalMs, atLeast(1), MEDIUM, ConsumerGroupMaxHeartbeatIntervalMsDoc) - .define(ConsumerGroupMaxSizeProp, INT, Defaults.ConsumerGroupMaxSize, atLeast(1), MEDIUM, ConsumerGroupMaxSizeDoc) - .define(ConsumerGroupAssignorsProp, LIST, Defaults.ConsumerGroupAssignors, null, MEDIUM, ConsumerGroupAssignorsDoc) + .define(ConsumerGroupSessionTimeoutMsProp, INT, Defaults.CONSUMER_GROUP_SESSION_TIMEOUT_MS, atLeast(1), MEDIUM, ConsumerGroupSessionTimeoutMsDoc) + .define(ConsumerGroupMinSessionTimeoutMsProp, INT, Defaults.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS, atLeast(1), MEDIUM, ConsumerGroupMinSessionTimeoutMsDoc) + .define(ConsumerGroupMaxSessionTimeoutMsProp, INT, Defaults.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS, atLeast(1), MEDIUM, ConsumerGroupMaxSessionTimeoutMsDoc) + .define(ConsumerGroupHeartbeatIntervalMsProp, INT, Defaults.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS, atLeast(1), MEDIUM, ConsumerGroupHeartbeatIntervalMsDoc) + .define(ConsumerGroupMinHeartbeatIntervalMsProp, INT, Defaults.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS, atLeast(1), MEDIUM, ConsumerGroupMinHeartbeatIntervalMsDoc) + .define(ConsumerGroupMaxHeartbeatIntervalMsProp, INT, Defaults.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS, atLeast(1), MEDIUM, ConsumerGroupMaxHeartbeatIntervalMsDoc) + .define(ConsumerGroupMaxSizeProp, INT, Defaults.CONSUMER_GROUP_MAX_SIZE, atLeast(1), MEDIUM, ConsumerGroupMaxSizeDoc) + .define(ConsumerGroupAssignorsProp, LIST, Defaults.CONSUMER_GROUP_ASSIGNORS, null, MEDIUM, ConsumerGroupAssignorsDoc) /** ********* Offset management configuration ***********/ - .define(OffsetMetadataMaxSizeProp, INT, Defaults.OffsetMetadataMaxSize, HIGH, OffsetMetadataMaxSizeDoc) - .define(OffsetsLoadBufferSizeProp, INT, Defaults.OffsetsLoadBufferSize, atLeast(1), HIGH, OffsetsLoadBufferSizeDoc) - .define(OffsetsTopicReplicationFactorProp, SHORT, Defaults.OffsetsTopicReplicationFactor, atLeast(1), HIGH, OffsetsTopicReplicationFactorDoc) - .define(OffsetsTopicPartitionsProp, INT, Defaults.OffsetsTopicPartitions, atLeast(1), HIGH, OffsetsTopicPartitionsDoc) - .define(OffsetsTopicSegmentBytesProp, INT, Defaults.OffsetsTopicSegmentBytes, atLeast(1), HIGH, OffsetsTopicSegmentBytesDoc) - .define(OffsetsTopicCompressionCodecProp, INT, Defaults.OffsetsTopicCompressionCodec, HIGH, OffsetsTopicCompressionCodecDoc) - .define(OffsetsRetentionMinutesProp, INT, Defaults.OffsetsRetentionMinutes, atLeast(1), HIGH, OffsetsRetentionMinutesDoc) - .define(OffsetsRetentionCheckIntervalMsProp, LONG, Defaults.OffsetsRetentionCheckIntervalMs, atLeast(1), HIGH, OffsetsRetentionCheckIntervalMsDoc) - .define(OffsetCommitTimeoutMsProp, INT, Defaults.OffsetCommitTimeoutMs, atLeast(1), HIGH, OffsetCommitTimeoutMsDoc) - .define(OffsetCommitRequiredAcksProp, SHORT, Defaults.OffsetCommitRequiredAcks, HIGH, OffsetCommitRequiredAcksDoc) - .define(DeleteTopicEnableProp, BOOLEAN, Defaults.DeleteTopicEnable, HIGH, DeleteTopicEnableDoc) + .define(OffsetMetadataMaxSizeProp, INT, Defaults.OFFSET_METADATA_MAX_SIZE, HIGH, OffsetMetadataMaxSizeDoc) + .define(OffsetsLoadBufferSizeProp, INT, Defaults.OFFSETS_LOAD_BUFFER_SIZE, atLeast(1), HIGH, OffsetsLoadBufferSizeDoc) + .define(OffsetsTopicReplicationFactorProp, SHORT, Defaults.OFFSETS_TOPIC_REPLICATION_FACTOR, atLeast(1), HIGH, OffsetsTopicReplicationFactorDoc) + .define(OffsetsTopicPartitionsProp, INT, Defaults.OFFSETS_TOPIC_PARTITIONS, atLeast(1), HIGH, OffsetsTopicPartitionsDoc) + .define(OffsetsTopicSegmentBytesProp, INT, Defaults.OFFSETS_TOPIC_SEGMENT_BYTES, atLeast(1), HIGH, OffsetsTopicSegmentBytesDoc) + .define(OffsetsTopicCompressionCodecProp, INT, Defaults.OFFSETS_TOPIC_COMPRESSION_CODEC, HIGH, OffsetsTopicCompressionCodecDoc) + .define(OffsetsRetentionMinutesProp, INT, Defaults.OFFSETS_RETENTION_MINUTES, atLeast(1), HIGH, OffsetsRetentionMinutesDoc) + .define(OffsetsRetentionCheckIntervalMsProp, LONG, Defaults.OFFSETS_RETENTION_CHECK_INTERVAL_MS, atLeast(1), HIGH, OffsetsRetentionCheckIntervalMsDoc) + .define(OffsetCommitTimeoutMsProp, INT, Defaults.OFFSET_COMMIT_TIMEOUT_MS, atLeast(1), HIGH, OffsetCommitTimeoutMsDoc) + .define(OffsetCommitRequiredAcksProp, SHORT, Defaults.OFFSET_COMMIT_REQUIRED_ACKS, HIGH, OffsetCommitRequiredAcksDoc) + .define(DeleteTopicEnableProp, BOOLEAN, Defaults.DELETE_TOPIC_ENABLE, HIGH, DeleteTopicEnableDoc) .define(CompressionTypeProp, STRING, LogConfig.DEFAULT_COMPRESSION_TYPE, in(BrokerCompressionType.names.asScala.toSeq:_*), HIGH, CompressionTypeDoc) /** ********* Transaction management configuration ***********/ - .define(TransactionalIdExpirationMsProp, INT, Defaults.TransactionalIdExpirationMs, atLeast(1), HIGH, TransactionalIdExpirationMsDoc) - .define(TransactionsMaxTimeoutMsProp, INT, Defaults.TransactionsMaxTimeoutMs, atLeast(1), HIGH, TransactionsMaxTimeoutMsDoc) - .define(TransactionsTopicMinISRProp, INT, Defaults.TransactionsTopicMinISR, atLeast(1), HIGH, TransactionsTopicMinISRDoc) - .define(TransactionsLoadBufferSizeProp, INT, Defaults.TransactionsLoadBufferSize, atLeast(1), HIGH, TransactionsLoadBufferSizeDoc) - .define(TransactionsTopicReplicationFactorProp, SHORT, Defaults.TransactionsTopicReplicationFactor, atLeast(1), HIGH, TransactionsTopicReplicationFactorDoc) - .define(TransactionsTopicPartitionsProp, INT, Defaults.TransactionsTopicPartitions, atLeast(1), HIGH, TransactionsTopicPartitionsDoc) - .define(TransactionsTopicSegmentBytesProp, INT, Defaults.TransactionsTopicSegmentBytes, atLeast(1), HIGH, TransactionsTopicSegmentBytesDoc) - .define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, Defaults.TransactionsAbortTimedOutTransactionsCleanupIntervalMS, atLeast(1), LOW, TransactionsAbortTimedOutTransactionsIntervalMsDoc) - .define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, INT, Defaults.TransactionsRemoveExpiredTransactionsCleanupIntervalMS, atLeast(1), LOW, TransactionsRemoveExpiredTransactionsIntervalMsDoc) - - .define(TransactionPartitionVerificationEnableProp, BOOLEAN, Defaults.TransactionPartitionVerificationEnable, LOW, TransactionPartitionVerificationEnableDoc) - - .define(ProducerIdExpirationMsProp, INT, Defaults.ProducerIdExpirationMs, atLeast(1), LOW, ProducerIdExpirationMsDoc) + .define(TransactionalIdExpirationMsProp, INT, Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, TransactionalIdExpirationMsDoc) + .define(TransactionsMaxTimeoutMsProp, INT, Defaults.TRANSACTIONS_MAX_TIMEOUT_MS, atLeast(1), HIGH, TransactionsMaxTimeoutMsDoc) + .define(TransactionsTopicMinISRProp, INT, Defaults.TRANSACTIONS_TOPIC_MIN_ISR, atLeast(1), HIGH, TransactionsTopicMinISRDoc) + .define(TransactionsLoadBufferSizeProp, INT, Defaults.TRANSACTIONS_LOAD_BUFFER_SIZE, atLeast(1), HIGH, TransactionsLoadBufferSizeDoc) + .define(TransactionsTopicReplicationFactorProp, SHORT, Defaults.TRANSACTIONS_TOPIC_REPLICATION_FACTOR, atLeast(1), HIGH, TransactionsTopicReplicationFactorDoc) + .define(TransactionsTopicPartitionsProp, INT, Defaults.TRANSACTIONS_TOPIC_PARTITIONS, atLeast(1), HIGH, TransactionsTopicPartitionsDoc) + .define(TransactionsTopicSegmentBytesProp, INT, Defaults.TRANSACTIONS_TOPIC_SEGMENT_BYTES, atLeast(1), HIGH, TransactionsTopicSegmentBytesDoc) + .define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_ABORT_TIMED_OUT_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsAbortTimedOutTransactionsIntervalMsDoc) + .define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_REMOVE_EXPIRED_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsRemoveExpiredTransactionsIntervalMsDoc) + + .define(TransactionPartitionVerificationEnableProp, BOOLEAN, Defaults.TRANSACTION_PARTITION_VERIFICATION_ENABLE, LOW, TransactionPartitionVerificationEnableDoc) + + .define(ProducerIdExpirationMsProp, INT, Defaults.PRODUCER_ID_EXPIRATION_MS, atLeast(1), LOW, ProducerIdExpirationMsDoc) // Configuration for testing only as default value should be sufficient for typical usage - .defineInternal(ProducerIdExpirationCheckIntervalMsProp, INT, Defaults.ProducerIdExpirationCheckIntervalMs, atLeast(1), LOW, ProducerIdExpirationCheckIntervalMsDoc) + .defineInternal(ProducerIdExpirationCheckIntervalMsProp, INT, Defaults.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS, atLeast(1), LOW, ProducerIdExpirationCheckIntervalMsDoc) /** ********* Fetch Configuration **************/ - .define(MaxIncrementalFetchSessionCacheSlots, INT, Defaults.MaxIncrementalFetchSessionCacheSlots, atLeast(0), MEDIUM, MaxIncrementalFetchSessionCacheSlotsDoc) - .define(FetchMaxBytes, INT, Defaults.FetchMaxBytes, atLeast(1024), MEDIUM, FetchMaxBytesDoc) + .define(MaxIncrementalFetchSessionCacheSlots, INT, Defaults.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS, atLeast(0), MEDIUM, MaxIncrementalFetchSessionCacheSlotsDoc) + .define(FetchMaxBytes, INT, Defaults.FETCH_MAX_BYTES, atLeast(1024), MEDIUM, FetchMaxBytesDoc) /** ********* Kafka Metrics Configuration ***********/ - .define(MetricNumSamplesProp, INT, Defaults.MetricNumSamples, atLeast(1), LOW, MetricNumSamplesDoc) - .define(MetricSampleWindowMsProp, LONG, Defaults.MetricSampleWindowMs, atLeast(1), LOW, MetricSampleWindowMsDoc) - .define(MetricReporterClassesProp, LIST, Defaults.MetricReporterClasses, LOW, MetricReporterClassesDoc) - .define(MetricRecordingLevelProp, STRING, Defaults.MetricRecordingLevel, LOW, MetricRecordingLevelDoc) - .define(AutoIncludeJmxReporterProp, BOOLEAN, Defaults.AutoIncludeJmxReporter, LOW, AutoIncludeJmxReporterDoc) + .define(MetricNumSamplesProp, INT, Defaults.METRIC_NUM_SAMPLES, atLeast(1), LOW, MetricNumSamplesDoc) + .define(MetricSampleWindowMsProp, LONG, Defaults.METRIC_SAMPLE_WINDOW_MS, atLeast(1), LOW, MetricSampleWindowMsDoc) + .define(MetricReporterClassesProp, LIST, Defaults.METRIC_REPORTER_CLASSES, LOW, MetricReporterClassesDoc) + .define(MetricRecordingLevelProp, STRING, Defaults.METRIC_RECORDING_LEVEL, LOW, MetricRecordingLevelDoc) + .define(AutoIncludeJmxReporterProp, BOOLEAN, Defaults.AUTO_INCLUDE_JMX_REPORTER, LOW, AutoIncludeJmxReporterDoc) /** ********* Kafka Yammer Metrics Reporter Configuration for docs ***********/ - .define(KafkaMetricsReporterClassesProp, LIST, Defaults.KafkaMetricReporterClasses, LOW, KafkaMetricsReporterClassesDoc) - .define(KafkaMetricsPollingIntervalSecondsProp, INT, Defaults.KafkaMetricsPollingIntervalSeconds, atLeast(1), LOW, KafkaMetricsPollingIntervalSecondsDoc) + .define(KafkaMetricsReporterClassesProp, LIST, Defaults.KAFKA_METRIC_REPORTER_CLASSES, LOW, KafkaMetricsReporterClassesDoc) + .define(KafkaMetricsPollingIntervalSecondsProp, INT, Defaults.KAFKA_METRICS_POLLING_INTERVAL_SECONDS, atLeast(1), LOW, KafkaMetricsPollingIntervalSecondsDoc) /** ********* Kafka Client Telemetry Metrics Configuration ***********/ - .define(ClientTelemetryMaxBytesProp, INT, Defaults.ClientTelemetryMaxBytes, atLeast(1), LOW, ClientTelemetryMaxBytesDoc) + .define(ClientTelemetryMaxBytesProp, INT, Defaults.CLIENT_TELEMETRY_MAX_BYTES, atLeast(1), LOW, ClientTelemetryMaxBytesDoc) /** ********* Quota configuration ***********/ - .define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc) - .define(NumReplicationQuotaSamplesProp, INT, Defaults.NumReplicationQuotaSamples, atLeast(1), LOW, NumReplicationQuotaSamplesDoc) - .define(NumAlterLogDirsReplicationQuotaSamplesProp, INT, Defaults.NumAlterLogDirsReplicationQuotaSamples, atLeast(1), LOW, NumAlterLogDirsReplicationQuotaSamplesDoc) - .define(NumControllerQuotaSamplesProp, INT, Defaults.NumControllerQuotaSamples, atLeast(1), LOW, NumControllerQuotaSamplesDoc) - .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc) - .define(ReplicationQuotaWindowSizeSecondsProp, INT, Defaults.ReplicationQuotaWindowSizeSeconds, atLeast(1), LOW, ReplicationQuotaWindowSizeSecondsDoc) - .define(AlterLogDirsReplicationQuotaWindowSizeSecondsProp, INT, Defaults.AlterLogDirsReplicationQuotaWindowSizeSeconds, atLeast(1), LOW, AlterLogDirsReplicationQuotaWindowSizeSecondsDoc) - .define(ControllerQuotaWindowSizeSecondsProp, INT, Defaults.ControllerQuotaWindowSizeSeconds, atLeast(1), LOW, ControllerQuotaWindowSizeSecondsDoc) + .define(NumQuotaSamplesProp, INT, Defaults.NUM_QUOTA_SAMPLES, atLeast(1), LOW, NumQuotaSamplesDoc) + .define(NumReplicationQuotaSamplesProp, INT, Defaults.NUM_REPLICATION_QUOTA_SAMPLES, atLeast(1), LOW, NumReplicationQuotaSamplesDoc) + .define(NumAlterLogDirsReplicationQuotaSamplesProp, INT, Defaults.NUM_ALTER_LOG_DIRS_REPLICATION_QUOTA_SAMPLES, atLeast(1), LOW, NumAlterLogDirsReplicationQuotaSamplesDoc) + .define(NumControllerQuotaSamplesProp, INT, Defaults.NUM_CONTROLLER_QUOTA_SAMPLES, atLeast(1), LOW, NumControllerQuotaSamplesDoc) + .define(QuotaWindowSizeSecondsProp, INT, Defaults.QUOTA_WINDOW_SIZE_SECONDS, atLeast(1), LOW, QuotaWindowSizeSecondsDoc) + .define(ReplicationQuotaWindowSizeSecondsProp, INT, Defaults.REPLICATION_QUOTA_WINDOW_SIZE_SECONDS, atLeast(1), LOW, ReplicationQuotaWindowSizeSecondsDoc) + .define(AlterLogDirsReplicationQuotaWindowSizeSecondsProp, INT, Defaults.ALTER_LOG_DIRS_REPLICATION_QUOTA_WINDOW_SIZE_SECONDS, atLeast(1), LOW, AlterLogDirsReplicationQuotaWindowSizeSecondsDoc) + .define(ControllerQuotaWindowSizeSecondsProp, INT, Defaults.CONTROLLER_QUOTA_WINDOW_SIZE_SECONDS, atLeast(1), LOW, ControllerQuotaWindowSizeSecondsDoc) .define(ClientQuotaCallbackClassProp, CLASS, null, LOW, ClientQuotaCallbackClassDoc) /** ********* General Security Configuration ****************/ - .define(ConnectionsMaxReauthMsProp, LONG, Defaults.ConnectionsMaxReauthMsDefault, MEDIUM, ConnectionsMaxReauthMsDoc) - .define(SaslServerMaxReceiveSizeProp, INT, Defaults.DefaultServerMaxMaxReceiveSize, MEDIUM, SaslServerMaxReceiveSizeDoc) + .define(ConnectionsMaxReauthMsProp, LONG, Defaults.CONNECTIONS_MAX_REAUTH_MS, MEDIUM, ConnectionsMaxReauthMsDoc) + .define(SaslServerMaxReceiveSizeProp, INT, Defaults.SERVER_MAX_MAX_RECEIVE_SIZE, MEDIUM, SaslServerMaxReceiveSizeDoc) Review Comment: Can we remove one of the `MAX`? ########## server/src/main/java/org/apache/kafka/server/config/Defaults.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslClientAuth; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder; +import org.apache.kafka.coordinator.group.OffsetConfig; +import org.apache.kafka.coordinator.transaction.TransactionLogConfig; +import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig; +import org.apache.kafka.raft.RaftConfig; +import org.apache.kafka.server.common.MetadataVersion; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class Defaults { + /** ********* Zookeeper Configuration *********/ + public static final int ZK_SESSION_TIMEOUT_MS = 18000; + public static final boolean ZK_ENABLE_SECURE_ACLS = false; + public static final int ZK_MAX_IN_FLIGHT_REQUESTS = 10; + public static final boolean ZK_SSL_CLIENT_ENABLE = false; + public static final String ZK_SSL_PROTOCOL = "TLSv1.2"; + public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = "HTTPS"; + public static final boolean ZK_SSL_CRL_ENABLE = false; + public static final boolean ZK_SSL_OCSP_ENABLE = false; + + /** ********* General Configuration *********/ + public static final boolean BROKER_ID_GENERATION_ENABLE = true; + public static final int MAX_RESERVED_BROKER_ID = 1000; + public static final int BROKER_ID = -1; + public static final int NUM_NETWORK_THREADS = 3; + public static final int NUM_IO_THREADS = 8; + public static final int BACKGROUND_THREADS = 10; + public static final int QUEUED_MAX_REQUESTS = 500; + public static final int QUEUED_MAX_REQUEST_BYTES = -1; + public static final int INITIAL_BROKER_REGISTRATION_TIMEOUT_MS = 60000; + public static final int BROKER_HEARTBEAT_INTERVAL_MS = 2000; + public static final int BROKER_SESSION_TIMEOUT_MS = 9000; + public static final int METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES = 20 * 1024 * 1024; + public static final long METADATA_SNAPSHOT_MAX_INTERVAL_MS = TimeUnit.HOURS.toMillis(1); + public static final int METADATA_MAX_IDLE_INTERVAL_MS = 500; + public static final int METADATA_MAX_RETENTION_BYTES = 100 * 1024 * 1024; + public static final boolean DELETE_TOPIC_ENABLE = true; + /** ********* KRaft mode configs *********/ + public static final int EMPTY_NODE_ID = -1; + public static final long SERVER_MAX_STARTUP_TIME_MS = Long.MAX_VALUE; + + /************* Authorizer Configuration ***********/ + public static final String AUTHORIZER_CLASS_NAME = ""; + + /** ********* Socket Server Configuration *********/ + public static final String LISTENERS = "PLAINTEXT://:9092"; + //TODO: Replace this with EndPoint.DefaultSecurityProtocolMap once EndPoint is out of core. + public static final String LISTENER_SECURITY_PROTOCOL_MAP = Arrays.stream(SecurityProtocol.values()) + .collect(Collectors.toMap(sp -> ListenerName.forSecurityProtocol(sp), sp -> sp)) + .entrySet() + .stream() + .map(entry -> entry.getKey().value() + ":" + entry.getValue().name()) + .collect(Collectors.joining(",")); + public static final int SOCKET_SEND_BUFFER_BYTES = 100 * 1024; + public static final int SOCKET_RECEIVE_BUFFER_BYTES = 100 * 1024; + public static final int SOCKET_REQUEST_MAX_BYTES = 100 * 1024 * 1024; + public static final int SOCKET_LISTEN_BACKLOG_SIZE = 50; + public static final int MAX_CONNECTIONS_PER_IP = Integer.MAX_VALUE; + public static final String MAX_CONNECTIONS_PER_IP_OVERRIDES = ""; + public static final int MAX_CONNECTIONS = Integer.MAX_VALUE; + public static final int MAX_CONNECTION_CREATION_RATE = Integer.MAX_VALUE; + public static final long CONNECTIONS_MAX_IDLE_MS = 10 * 60 * 1000L; + public static final int REQUEST_TIMEOUT_MS = 30000; + public static final long CONNECTION_SETUP_TIMEOUT_MS = CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS; + public static final long CONNECTION_SETUP_TIMEOUT_MAX_MS = CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS; + public static final int FAILED_AUTHENTICATION_DELAY_MS = 100; + + /** ********* Log Configuration *********/ + public static final int NUM_PARTITIONS = 1; + public static final String LOG_DIR = "/tmp/kafka-logs"; + public static final long LOG_CLEANUP_INTERVAL_MS = 5 * 60 * 1000L; + public static final int LOG_CLEANER_THREADS = 1; + public static final double LOG_CLEANER_IO_MAX_BYTES_PER_SECOND = Double.MAX_VALUE; + public static final long LOG_CLEANER_DEDUPE_BUFFER_SIZE = 128 * 1024 * 1024L; + public static final int LOG_CLEANER_IO_BUFFER_SIZE = 512 * 1024; + public static final double LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR = 0.9d; + public static final int LOG_CLEANER_BACKOFF_MS = 15 * 1000; + public static final boolean LOG_CLEANER_ENABLE = true; + public static final int LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS = 60000; + public static final int LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS = 60000; + public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR = 1; + public static final boolean AUTO_CREATE_TOPICS_ENABLE = true; + + /** ********* Replication configuration *********/ + public static final int CONTROLLER_SOCKET_TIMEOUT_MS = REQUEST_TIMEOUT_MS; + public static final int REPLICATION_FACTOR = 1; + public static final long REPLICA_LAG_TIME_MAX_MS = 30000L; + public static final int REPLICA_SOCKET_TIMEOUT_MS = 30 * 1000; + public static final int REPLICA_SOCKET_RECEIVE_BUFFER_BYTES = 64 * 1024; + public static final int REPLICA_FETCH_MAX_BYTES = 1024 * 1024; + public static final int REPLICA_FETCH_WAIT_MAX_MS = 500; + public static final int REPLICA_FETCH_MIN_BYTES = 1; + public static final int REPLICA_FETCH_RESPONSE_MAX_BYTES = 10 * 1024 * 1024; + public static final int NUM_REPLICA_FETCHERS = 1; + public static final int REPLICA_FETCH_BACKOFF_MS = 1000; + public static final long REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS = 5000L; + public static final int FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS = 1000; + public static final int PRODUCER_PURGATORY_PURGE_INTERVAL_REQUESTS = 1000; + public static final int DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS = 1; + public static final boolean AUTO_LEADER_REBALANCE_ENABLE = true; + public static final int LEADER_IMBALANCE_PER_BROKER_PERCENTAGE = 10; + public static final int LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS = 300; + public static final String INTER_BROKER_SECURITY_PROTOCOL = SecurityProtocol.PLAINTEXT.toString(); + public static final String INTER_BROKER_PROTOCOL_VERSION = MetadataVersion.latestProduction().version(); + + /** ********* Controlled shutdown configuration *********/ + public static final int CONTROLLED_SHUTDOWN_MAX_RETRIES = 3; + public static final int CONTROLLED_SHUTDOWN_RETRY_BACKOFF_MS = 5000; + public static final boolean CONTROLLED_SHUTDOWN_ENABLE = true; + + /** ********* Group coordinator configuration *********/ + public static final int GROUP_MIN_SESSION_TIMEOUT_MS = 6000; + public static final int GROUP_MAX_SESSION_TIMEOUT_MS = 1800000; + public static final int GROUP_INITIAL_REBALANCE_DELAY_MS = 3000; + public static final int GROUP_MAX_SIZE = Integer.MAX_VALUE; + + /** ********* New group coordinator configs *********/ + public static final boolean NEW_GROUP_COORDINATOR_ENABLE = false; + public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS = Arrays.asList(Group.GroupType.CLASSIC.toString()); + public static final int GROUP_COORDINATOR_NUM_THREADS = 1; + + /** ********* Consumer group configs *********/ + public static final int CONSUMER_GROUP_SESSION_TIMEOUT_MS = 45000; + public static final int CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS = 45000; + public static final int CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS = 60000; + public static final int CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS = 5000; + public static final int CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS = 5000; + public static final int CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS = 15000; + public static final int CONSUMER_GROUP_MAX_SIZE = Integer.MAX_VALUE; + public static final List<String> CONSUMER_GROUP_ASSIGNORS = Collections.singletonList(RangeAssignor.class.getName()); + + /** ********* Offset management configuration *********/ + public static final int OFFSET_METADATA_MAX_SIZE = OffsetConfig.DEFAULT_MAX_METADATA_SIZE; + public static final int OFFSETS_LOAD_BUFFER_SIZE = OffsetConfig.DEFAULT_LOAD_BUFFER_SIZE; + public static final short OFFSETS_TOPIC_REPLICATION_FACTOR = OffsetConfig.DEFAULT_OFFSETS_TOPIC_REPLICATION_FACTOR; + public static final int OFFSETS_TOPIC_PARTITIONS = OffsetConfig.DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS; + public static final int OFFSETS_TOPIC_SEGMENT_BYTES = OffsetConfig.DEFAULT_OFFSETS_TOPIC_SEGMENT_BYTES; + public static final int OFFSETS_TOPIC_COMPRESSION_CODEC = OffsetConfig.DEFAULT_OFFSETS_TOPIC_COMPRESSION_TYPE.id; + public static final int OFFSETS_RETENTION_MINUTES = 7 * 24 * 60; + public static final long OFFSETS_RETENTION_CHECK_INTERVAL_MS = OffsetConfig.DEFAULT_OFFSETS_RETENTION_CHECK_INTERVAL_MS; + public static final int OFFSET_COMMIT_TIMEOUT_MS = OffsetConfig.DEFAULT_OFFSET_COMMIT_TIMEOUT_MS; + public static final short OFFSET_COMMIT_REQUIRED_ACKS = OffsetConfig.DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS; + + /** T********* ransaction management configuration *********/ Review Comment: Same on a few other comments below ########## core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala: ########## @@ -63,8 +63,8 @@ class BrokerCompressionTest { time = time, brokerTopicStats = new BrokerTopicStats, maxTransactionTimeoutMs = 5 * 60 * 1000, - producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false), - producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs, + producerStateManagerConfig = new ProducerStateManagerConfig(org.apache.kafka.server.config.Defaults.PRODUCER_ID_EXPIRATION_MS, false), Review Comment: Can we import `org.apache.kafka.server.config.Defaults`? ########## server/src/main/java/org/apache/kafka/server/config/Defaults.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslClientAuth; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder; +import org.apache.kafka.coordinator.group.OffsetConfig; +import org.apache.kafka.coordinator.transaction.TransactionLogConfig; +import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig; +import org.apache.kafka.raft.RaftConfig; +import org.apache.kafka.server.common.MetadataVersion; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class Defaults { + /** ********* Zookeeper Configuration *********/ + public static final int ZK_SESSION_TIMEOUT_MS = 18000; + public static final boolean ZK_ENABLE_SECURE_ACLS = false; + public static final int ZK_MAX_IN_FLIGHT_REQUESTS = 10; + public static final boolean ZK_SSL_CLIENT_ENABLE = false; + public static final String ZK_SSL_PROTOCOL = "TLSv1.2"; + public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = "HTTPS"; + public static final boolean ZK_SSL_CRL_ENABLE = false; + public static final boolean ZK_SSL_OCSP_ENABLE = false; + + /** ********* General Configuration *********/ + public static final boolean BROKER_ID_GENERATION_ENABLE = true; + public static final int MAX_RESERVED_BROKER_ID = 1000; + public static final int BROKER_ID = -1; + public static final int NUM_NETWORK_THREADS = 3; + public static final int NUM_IO_THREADS = 8; + public static final int BACKGROUND_THREADS = 10; + public static final int QUEUED_MAX_REQUESTS = 500; + public static final int QUEUED_MAX_REQUEST_BYTES = -1; + public static final int INITIAL_BROKER_REGISTRATION_TIMEOUT_MS = 60000; + public static final int BROKER_HEARTBEAT_INTERVAL_MS = 2000; + public static final int BROKER_SESSION_TIMEOUT_MS = 9000; + public static final int METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES = 20 * 1024 * 1024; + public static final long METADATA_SNAPSHOT_MAX_INTERVAL_MS = TimeUnit.HOURS.toMillis(1); + public static final int METADATA_MAX_IDLE_INTERVAL_MS = 500; + public static final int METADATA_MAX_RETENTION_BYTES = 100 * 1024 * 1024; + public static final boolean DELETE_TOPIC_ENABLE = true; + /** ********* KRaft mode configs *********/ + public static final int EMPTY_NODE_ID = -1; + public static final long SERVER_MAX_STARTUP_TIME_MS = Long.MAX_VALUE; + + /************* Authorizer Configuration ***********/ + public static final String AUTHORIZER_CLASS_NAME = ""; + + /** ********* Socket Server Configuration *********/ + public static final String LISTENERS = "PLAINTEXT://:9092"; + //TODO: Replace this with EndPoint.DefaultSecurityProtocolMap once EndPoint is out of core. + public static final String LISTENER_SECURITY_PROTOCOL_MAP = Arrays.stream(SecurityProtocol.values()) + .collect(Collectors.toMap(sp -> ListenerName.forSecurityProtocol(sp), sp -> sp)) + .entrySet() + .stream() + .map(entry -> entry.getKey().value() + ":" + entry.getValue().name()) + .collect(Collectors.joining(",")); + public static final int SOCKET_SEND_BUFFER_BYTES = 100 * 1024; + public static final int SOCKET_RECEIVE_BUFFER_BYTES = 100 * 1024; + public static final int SOCKET_REQUEST_MAX_BYTES = 100 * 1024 * 1024; + public static final int SOCKET_LISTEN_BACKLOG_SIZE = 50; + public static final int MAX_CONNECTIONS_PER_IP = Integer.MAX_VALUE; + public static final String MAX_CONNECTIONS_PER_IP_OVERRIDES = ""; + public static final int MAX_CONNECTIONS = Integer.MAX_VALUE; + public static final int MAX_CONNECTION_CREATION_RATE = Integer.MAX_VALUE; + public static final long CONNECTIONS_MAX_IDLE_MS = 10 * 60 * 1000L; + public static final int REQUEST_TIMEOUT_MS = 30000; + public static final long CONNECTION_SETUP_TIMEOUT_MS = CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS; + public static final long CONNECTION_SETUP_TIMEOUT_MAX_MS = CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS; + public static final int FAILED_AUTHENTICATION_DELAY_MS = 100; + + /** ********* Log Configuration *********/ + public static final int NUM_PARTITIONS = 1; + public static final String LOG_DIR = "/tmp/kafka-logs"; + public static final long LOG_CLEANUP_INTERVAL_MS = 5 * 60 * 1000L; + public static final int LOG_CLEANER_THREADS = 1; + public static final double LOG_CLEANER_IO_MAX_BYTES_PER_SECOND = Double.MAX_VALUE; + public static final long LOG_CLEANER_DEDUPE_BUFFER_SIZE = 128 * 1024 * 1024L; + public static final int LOG_CLEANER_IO_BUFFER_SIZE = 512 * 1024; + public static final double LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR = 0.9d; + public static final int LOG_CLEANER_BACKOFF_MS = 15 * 1000; + public static final boolean LOG_CLEANER_ENABLE = true; + public static final int LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS = 60000; + public static final int LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS = 60000; + public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR = 1; + public static final boolean AUTO_CREATE_TOPICS_ENABLE = true; + + /** ********* Replication configuration *********/ + public static final int CONTROLLER_SOCKET_TIMEOUT_MS = REQUEST_TIMEOUT_MS; + public static final int REPLICATION_FACTOR = 1; + public static final long REPLICA_LAG_TIME_MAX_MS = 30000L; + public static final int REPLICA_SOCKET_TIMEOUT_MS = 30 * 1000; + public static final int REPLICA_SOCKET_RECEIVE_BUFFER_BYTES = 64 * 1024; + public static final int REPLICA_FETCH_MAX_BYTES = 1024 * 1024; + public static final int REPLICA_FETCH_WAIT_MAX_MS = 500; + public static final int REPLICA_FETCH_MIN_BYTES = 1; + public static final int REPLICA_FETCH_RESPONSE_MAX_BYTES = 10 * 1024 * 1024; + public static final int NUM_REPLICA_FETCHERS = 1; + public static final int REPLICA_FETCH_BACKOFF_MS = 1000; + public static final long REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS = 5000L; + public static final int FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS = 1000; + public static final int PRODUCER_PURGATORY_PURGE_INTERVAL_REQUESTS = 1000; + public static final int DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS = 1; + public static final boolean AUTO_LEADER_REBALANCE_ENABLE = true; + public static final int LEADER_IMBALANCE_PER_BROKER_PERCENTAGE = 10; + public static final int LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS = 300; + public static final String INTER_BROKER_SECURITY_PROTOCOL = SecurityProtocol.PLAINTEXT.toString(); + public static final String INTER_BROKER_PROTOCOL_VERSION = MetadataVersion.latestProduction().version(); + + /** ********* Controlled shutdown configuration *********/ + public static final int CONTROLLED_SHUTDOWN_MAX_RETRIES = 3; + public static final int CONTROLLED_SHUTDOWN_RETRY_BACKOFF_MS = 5000; + public static final boolean CONTROLLED_SHUTDOWN_ENABLE = true; + + /** ********* Group coordinator configuration *********/ + public static final int GROUP_MIN_SESSION_TIMEOUT_MS = 6000; + public static final int GROUP_MAX_SESSION_TIMEOUT_MS = 1800000; + public static final int GROUP_INITIAL_REBALANCE_DELAY_MS = 3000; + public static final int GROUP_MAX_SIZE = Integer.MAX_VALUE; + + /** ********* New group coordinator configs *********/ + public static final boolean NEW_GROUP_COORDINATOR_ENABLE = false; + public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS = Arrays.asList(Group.GroupType.CLASSIC.toString()); Review Comment: Should we use `Collections.singletonList()` here? ########## core/src/test/scala/other/kafka/StressTestLog.scala: ########## @@ -51,8 +51,8 @@ object StressTestLog { scheduler = time.scheduler, time = time, maxTransactionTimeoutMs = 5 * 60 * 1000, - producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false), - producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs, + producerStateManagerConfig = new ProducerStateManagerConfig(org.apache.kafka.server.config.Defaults.PRODUCER_ID_EXPIRATION_MS, false), Review Comment: Can we import `org.apache.kafka.server.config.Defaults`? ########## core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala: ########## @@ -53,7 +53,7 @@ class LogCleanerManagerTest extends Logging { val logConfig: LogConfig = new LogConfig(logProps) val time = new MockTime(1400000000000L, 1000L) // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs` val offset = 999 - val producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false) + val producerStateManagerConfig = new ProducerStateManagerConfig(org.apache.kafka.server.config.Defaults.PRODUCER_ID_EXPIRATION_MS, false) Review Comment: I'll stop pointing it for each file but let's import the class. ########## core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala: ########## @@ -112,8 +112,8 @@ abstract class AbstractLogCleanerIntegrationTest { time = time, brokerTopicStats = new BrokerTopicStats, maxTransactionTimeoutMs = 5 * 60 * 1000, - producerStateManagerConfig = new ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false), - producerIdExpirationCheckIntervalMs = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs, + producerStateManagerConfig = new ProducerStateManagerConfig(org.apache.kafka.server.config.Defaults.PRODUCER_ID_EXPIRATION_MS, false), Review Comment: Can we import `org.apache.kafka.server.config.Defaults`? ########## core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala: ########## @@ -19,10 +19,9 @@ package kafka.utils import javax.crypto.SecretKeyFactory - -import kafka.server.Defaults import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.config.types.Password +import org.apache.kafka.server.config.Defaults Review Comment: This can be merged with the import 2 lines above ########## core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala: ########## @@ -270,7 +270,7 @@ class PartitionLockTest extends Logging { logManager.startup(Set.empty) val partition = new Partition(topicPartition, - replicaLagTimeMaxMs = kafka.server.Defaults.ReplicaLagTimeMaxMs, + replicaLagTimeMaxMs = org.apache.kafka.server.config.Defaults.REPLICA_LAG_TIME_MAX_MS, Review Comment: Can we import `org.apache.kafka.server.config.Defaults`? ########## server/src/main/java/org/apache/kafka/server/config/Defaults.java: ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslClientAuth; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder; +import org.apache.kafka.coordinator.group.OffsetConfig; +import org.apache.kafka.coordinator.transaction.TransactionLogConfig; +import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig; +import org.apache.kafka.raft.RaftConfig; +import org.apache.kafka.server.common.MetadataVersion; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class Defaults { + /** ********* Zookeeper Configuration *********/ + public static final int ZK_SESSION_TIMEOUT_MS = 18000; + public static final boolean ZK_ENABLE_SECURE_ACLS = false; + public static final int ZK_MAX_IN_FLIGHT_REQUESTS = 10; + public static final boolean ZK_SSL_CLIENT_ENABLE = false; + public static final String ZK_SSL_PROTOCOL = "TLSv1.2"; + public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = "HTTPS"; + public static final boolean ZK_SSL_CRL_ENABLE = false; + public static final boolean ZK_SSL_OCSP_ENABLE = false; + + /** ********* General Configuration *********/ + public static final boolean BROKER_ID_GENERATION_ENABLE = true; + public static final int MAX_RESERVED_BROKER_ID = 1000; + public static final int BROKER_ID = -1; + public static final int NUM_NETWORK_THREADS = 3; + public static final int NUM_IO_THREADS = 8; + public static final int BACKGROUND_THREADS = 10; + public static final int QUEUED_MAX_REQUESTS = 500; + public static final int QUEUED_MAX_REQUEST_BYTES = -1; + public static final int INITIAL_BROKER_REGISTRATION_TIMEOUT_MS = 60000; + public static final int BROKER_HEARTBEAT_INTERVAL_MS = 2000; + public static final int BROKER_SESSION_TIMEOUT_MS = 9000; + public static final int METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES = 20 * 1024 * 1024; + public static final long METADATA_SNAPSHOT_MAX_INTERVAL_MS = TimeUnit.HOURS.toMillis(1); + public static final int METADATA_MAX_IDLE_INTERVAL_MS = 500; + public static final int METADATA_MAX_RETENTION_BYTES = 100 * 1024 * 1024; + public static final boolean DELETE_TOPIC_ENABLE = true; + /** ********* KRaft mode configs *********/ + public static final int EMPTY_NODE_ID = -1; + public static final long SERVER_MAX_STARTUP_TIME_MS = Long.MAX_VALUE; + + /************* Authorizer Configuration ***********/ + public static final String AUTHORIZER_CLASS_NAME = ""; + + /** ********* Socket Server Configuration *********/ + public static final String LISTENERS = "PLAINTEXT://:9092"; + //TODO: Replace this with EndPoint.DefaultSecurityProtocolMap once EndPoint is out of core. + public static final String LISTENER_SECURITY_PROTOCOL_MAP = Arrays.stream(SecurityProtocol.values()) + .collect(Collectors.toMap(sp -> ListenerName.forSecurityProtocol(sp), sp -> sp)) + .entrySet() + .stream() + .map(entry -> entry.getKey().value() + ":" + entry.getValue().name()) + .collect(Collectors.joining(",")); + public static final int SOCKET_SEND_BUFFER_BYTES = 100 * 1024; + public static final int SOCKET_RECEIVE_BUFFER_BYTES = 100 * 1024; + public static final int SOCKET_REQUEST_MAX_BYTES = 100 * 1024 * 1024; + public static final int SOCKET_LISTEN_BACKLOG_SIZE = 50; + public static final int MAX_CONNECTIONS_PER_IP = Integer.MAX_VALUE; + public static final String MAX_CONNECTIONS_PER_IP_OVERRIDES = ""; + public static final int MAX_CONNECTIONS = Integer.MAX_VALUE; + public static final int MAX_CONNECTION_CREATION_RATE = Integer.MAX_VALUE; + public static final long CONNECTIONS_MAX_IDLE_MS = 10 * 60 * 1000L; + public static final int REQUEST_TIMEOUT_MS = 30000; + public static final long CONNECTION_SETUP_TIMEOUT_MS = CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS; + public static final long CONNECTION_SETUP_TIMEOUT_MAX_MS = CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS; + public static final int FAILED_AUTHENTICATION_DELAY_MS = 100; + + /** ********* Log Configuration *********/ + public static final int NUM_PARTITIONS = 1; + public static final String LOG_DIR = "/tmp/kafka-logs"; + public static final long LOG_CLEANUP_INTERVAL_MS = 5 * 60 * 1000L; + public static final int LOG_CLEANER_THREADS = 1; + public static final double LOG_CLEANER_IO_MAX_BYTES_PER_SECOND = Double.MAX_VALUE; + public static final long LOG_CLEANER_DEDUPE_BUFFER_SIZE = 128 * 1024 * 1024L; + public static final int LOG_CLEANER_IO_BUFFER_SIZE = 512 * 1024; + public static final double LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR = 0.9d; + public static final int LOG_CLEANER_BACKOFF_MS = 15 * 1000; + public static final boolean LOG_CLEANER_ENABLE = true; + public static final int LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS = 60000; + public static final int LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS = 60000; + public static final int NUM_RECOVERY_THREADS_PER_DATA_DIR = 1; + public static final boolean AUTO_CREATE_TOPICS_ENABLE = true; + + /** ********* Replication configuration *********/ + public static final int CONTROLLER_SOCKET_TIMEOUT_MS = REQUEST_TIMEOUT_MS; + public static final int REPLICATION_FACTOR = 1; + public static final long REPLICA_LAG_TIME_MAX_MS = 30000L; + public static final int REPLICA_SOCKET_TIMEOUT_MS = 30 * 1000; + public static final int REPLICA_SOCKET_RECEIVE_BUFFER_BYTES = 64 * 1024; + public static final int REPLICA_FETCH_MAX_BYTES = 1024 * 1024; + public static final int REPLICA_FETCH_WAIT_MAX_MS = 500; + public static final int REPLICA_FETCH_MIN_BYTES = 1; + public static final int REPLICA_FETCH_RESPONSE_MAX_BYTES = 10 * 1024 * 1024; + public static final int NUM_REPLICA_FETCHERS = 1; + public static final int REPLICA_FETCH_BACKOFF_MS = 1000; + public static final long REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS = 5000L; + public static final int FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS = 1000; + public static final int PRODUCER_PURGATORY_PURGE_INTERVAL_REQUESTS = 1000; + public static final int DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS = 1; + public static final boolean AUTO_LEADER_REBALANCE_ENABLE = true; + public static final int LEADER_IMBALANCE_PER_BROKER_PERCENTAGE = 10; + public static final int LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS = 300; + public static final String INTER_BROKER_SECURITY_PROTOCOL = SecurityProtocol.PLAINTEXT.toString(); + public static final String INTER_BROKER_PROTOCOL_VERSION = MetadataVersion.latestProduction().version(); + + /** ********* Controlled shutdown configuration *********/ + public static final int CONTROLLED_SHUTDOWN_MAX_RETRIES = 3; + public static final int CONTROLLED_SHUTDOWN_RETRY_BACKOFF_MS = 5000; + public static final boolean CONTROLLED_SHUTDOWN_ENABLE = true; + + /** ********* Group coordinator configuration *********/ + public static final int GROUP_MIN_SESSION_TIMEOUT_MS = 6000; + public static final int GROUP_MAX_SESSION_TIMEOUT_MS = 1800000; + public static final int GROUP_INITIAL_REBALANCE_DELAY_MS = 3000; + public static final int GROUP_MAX_SIZE = Integer.MAX_VALUE; + + /** ********* New group coordinator configs *********/ + public static final boolean NEW_GROUP_COORDINATOR_ENABLE = false; + public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS = Arrays.asList(Group.GroupType.CLASSIC.toString()); + public static final int GROUP_COORDINATOR_NUM_THREADS = 1; + + /** ********* Consumer group configs *********/ + public static final int CONSUMER_GROUP_SESSION_TIMEOUT_MS = 45000; + public static final int CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS = 45000; + public static final int CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS = 60000; + public static final int CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS = 5000; + public static final int CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS = 5000; + public static final int CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS = 15000; + public static final int CONSUMER_GROUP_MAX_SIZE = Integer.MAX_VALUE; + public static final List<String> CONSUMER_GROUP_ASSIGNORS = Collections.singletonList(RangeAssignor.class.getName()); + + /** ********* Offset management configuration *********/ + public static final int OFFSET_METADATA_MAX_SIZE = OffsetConfig.DEFAULT_MAX_METADATA_SIZE; + public static final int OFFSETS_LOAD_BUFFER_SIZE = OffsetConfig.DEFAULT_LOAD_BUFFER_SIZE; + public static final short OFFSETS_TOPIC_REPLICATION_FACTOR = OffsetConfig.DEFAULT_OFFSETS_TOPIC_REPLICATION_FACTOR; + public static final int OFFSETS_TOPIC_PARTITIONS = OffsetConfig.DEFAULT_OFFSETS_TOPIC_NUM_PARTITIONS; + public static final int OFFSETS_TOPIC_SEGMENT_BYTES = OffsetConfig.DEFAULT_OFFSETS_TOPIC_SEGMENT_BYTES; + public static final int OFFSETS_TOPIC_COMPRESSION_CODEC = OffsetConfig.DEFAULT_OFFSETS_TOPIC_COMPRESSION_TYPE.id; + public static final int OFFSETS_RETENTION_MINUTES = 7 * 24 * 60; + public static final long OFFSETS_RETENTION_CHECK_INTERVAL_MS = OffsetConfig.DEFAULT_OFFSETS_RETENTION_CHECK_INTERVAL_MS; + public static final int OFFSET_COMMIT_TIMEOUT_MS = OffsetConfig.DEFAULT_OFFSET_COMMIT_TIMEOUT_MS; + public static final short OFFSET_COMMIT_REQUIRED_ACKS = OffsetConfig.DEFAULT_OFFSET_COMMIT_REQUIRED_ACKS; + + /** T********* ransaction management configuration *********/ Review Comment: It looks like something went wrong on this line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org