chia7712 commented on code in PR #16665:
URL: https://github.com/apache/kafka/pull/16665#discussion_r1709903239
##########
transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java:
##########
@@ -61,15 +62,67 @@ public final class TransactionLogConfigs {
public static final int PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT =
600000;
public static final String PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC =
"The interval at which to remove producer IDs that have expired due to
<code>producer.id.expiration.ms</code> passing.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
- .define(TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG,
INT, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, atLeast(1),
HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_DOC)
-
.define(TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG, INT,
TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH,
TransactionLogConfigs.TRANSACTIONS_LOAD_BUFFER_SIZE_DOC)
-
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
SHORT, TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT,
atLeast(1), HIGH,
TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC)
-
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, INT,
TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH,
TransactionLogConfigs.TRANSACTIONS_TOPIC_PARTITIONS_DOC)
-
.define(TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG, INT,
TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1),
HIGH, TransactionLogConfigs.TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC)
+ .define(TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, INT,
TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT, atLeast(1), HIGH,
TRANSACTIONS_TOPIC_MIN_ISR_DOC)
+ .define(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG, INT,
TRANSACTIONS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH,
TRANSACTIONS_LOAD_BUFFER_SIZE_DOC)
+ .define(TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT,
TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH,
TRANSACTIONS_TOPIC_REPLICATION_FACTOR_DOC)
+ .define(TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, INT,
TRANSACTIONS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH,
TRANSACTIONS_TOPIC_PARTITIONS_DOC)
+ .define(TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG, INT,
TRANSACTIONS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH,
TRANSACTIONS_TOPIC_SEGMENT_BYTES_DOC)
-
.define(TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG,
BOOLEAN,
TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT, LOW,
TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC)
+ .define(TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG, BOOLEAN,
TRANSACTION_PARTITION_VERIFICATION_ENABLE_DEFAULT, LOW,
TRANSACTION_PARTITION_VERIFICATION_ENABLE_DOC)
- .define(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG,
INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, atLeast(1), LOW,
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DOC)
+ .define(PRODUCER_ID_EXPIRATION_MS_CONFIG, INT,
PRODUCER_ID_EXPIRATION_MS_DEFAULT, atLeast(1), LOW,
PRODUCER_ID_EXPIRATION_MS_DOC)
// Configuration for testing only as default value should be
sufficient for typical usage
-
.defineInternal(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG,
INT, TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
atLeast(1), LOW,
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC);
+ .defineInternal(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG,
INT, PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW,
PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC);
+
+ private AbstractConfig config;
+ private int transactionTopicMinISR;
+ private int transactionsLoadBufferSize;
+ private short transactionTopicReplicationFactor;
+ private int transactionTopicPartitions;
+ private int transactionTopicSegmentBytes;
+ private int producerIdExpirationCheckIntervalMs;
+
+ public TransactionLogConfig(AbstractConfig config) {
+ this.config = config;
+ this.transactionTopicMinISR =
config.getInt(TRANSACTIONS_TOPIC_MIN_ISR_CONFIG);
+ this.transactionsLoadBufferSize =
config.getInt(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG);
+ this.transactionTopicReplicationFactor =
config.getShort(TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG);
+ this.transactionTopicPartitions =
config.getInt(TRANSACTIONS_TOPIC_PARTITIONS_CONFIG);
+ this.transactionTopicSegmentBytes =
config.getInt(TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG);
+ this.producerIdExpirationCheckIntervalMs =
config.getInt(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG);
+ }
+
+ public int transactionTopicMinISR() {
+ return transactionTopicMinISR;
+ }
+
+ public int transactionsLoadBufferSize() {
+ return transactionsLoadBufferSize;
+ }
+
+ public short transactionTopicReplicationFactor() {
+ return transactionTopicReplicationFactor;
+ }
+
+ public int transactionTopicPartitions() {
+ return transactionTopicPartitions;
+ }
+
+ public int transactionTopicSegmentBytes() {
+ return transactionTopicSegmentBytes;
+ }
+
+ public Boolean transactionPartitionVerificationEnable() {
+ return
config.getBoolean(TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
Review Comment:
we had a discussion for that before
(https://github.com/apache/kafka/pull/16458#discussion_r1662130270)
`AbstractConfig` is read-only, but the sub class `KafkaConfig` can update
inner configs dynamically (see
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala#L192).
Hence, the passed `AbstractConfig` always have "up-to-date configs"
There is a jira which tries to refactor it
(https://issues.apache.org/jira/browse/KAFKA-17001), but I haven't worked on it
because that will introduce major changes to scala code.
In short, the basic rules for now are shown below.
1. the non-dynamic config will be evaluated in constructor.
2. the dynamic config will be evaluated in getter (as `AbstractConfig`
always have latest configs)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]