mimaison commented on code in PR #16665: URL: https://github.com/apache/kafka/pull/16665#discussion_r1709881410
########## transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java: ########## @@ -61,15 +62,67 @@ public final class TransactionLogConfigs { public static final int PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT = 600000; public static final String PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC = "The interval at which to remove producer IDs that have expired due to <code>producer.id.expiration.ms</code> passing."; public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DOC) - .define(TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DOC) - .define(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC) - .define(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DOC) - .define(TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG, INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC) + .define(TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, INT, TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, atLeast(1), HIGH, TRANSACTIONS_TOPIC_MIN_ISR_DOC) + .define(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG, INT, TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, TRANSACTIONS_LOAD_BUFFER_SIZE_DOC) + .define(TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC) + .define(TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, INT, TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, TRANSACTIONS_TOPIC_PARTITIONS_DOC) + .define(TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG, INT, TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC) - .define(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, BOOLEAN, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT, LOW, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC) + .define(TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, BOOLEAN, TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT, LOW, TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC) - .define(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, atLeast(1), LOW, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DOC) + .define(PRODUCER_ID_EXPIRATION_MS_CONFIG, INT, PRODUCER_ID_EXPIRATION_MS_DEFAULT, atLeast(1), LOW, PRODUCER_ID_EXPIRATION_MS_DOC) // Configuration for testing only as default value should be sufficient for typical usage - .defineInternal(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC); + .defineInternal(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, INT, PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW, PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC); + + private AbstractConfig config; + private int transactionTopicMinISR; + private int transactionsLoadBufferSize; + private short transactionTopicReplicationFactor; + private int transactionTopicPartitions; + private int transactionTopicSegmentBytes; + private int producerIdExpirationCheckIntervalMs; + + public TransactionLogConfig(AbstractConfig config) { + this.config = config; + this.transactionTopicMinISR = config.getInt(TRANSACTIONS_TOPIC_MIN_ISR_CONFIG); + this.transactionsLoadBufferSize = config.getInt(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG); + this.transactionTopicReplicationFactor = config.getShort(TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG); + this.transactionTopicPartitions = config.getInt(TRANSACTIONS_TOPIC_PARTITIONS_CONFIG); + this.transactionTopicSegmentBytes = config.getInt(TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG); + this.producerIdExpirationCheckIntervalMs = config.getInt(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG); + } + + public int transactionTopicMinISR() { + return transactionTopicMinISR; + } + + public int transactionsLoadBufferSize() { + return transactionsLoadBufferSize; + } + + public short transactionTopicReplicationFactor() { + return transactionTopicReplicationFactor; + } + + public int transactionTopicPartitions() { + return transactionTopicPartitions; + } + + public int transactionTopicSegmentBytes() { + return transactionTopicSegmentBytes; + } + + public Boolean transactionPartitionVerificationEnable() { + return config.getBoolean(TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG); Review Comment: Even with the comments, I'm not sure I understand why this is not initialized in the constructor. Can you explain how this is supposed to work? AbstractConfig instances are kind of read only by default but it looks like we're expecting it to change here. Am I missing something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org