This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a7eae28a67b MINOR: Replaced internal KafkaConfig field in
TransactionLogConfig (#19482)
a7eae28a67b is described below
commit a7eae28a67b365ed42aa02dc9cf52f51ee7f3bfe
Author: Hong-Yi Chen <[email protected]>
AuthorDate: Sat May 10 15:06:18 2025 +0000
MINOR: Replaced internal KafkaConfig field in TransactionLogConfig (#19482)
Retaining a reference to ```AbstractConfig``` introduced coupling and
potential inconsistencies with dynamic config updates. This change
simplifies ```TransactionLogConfig``` into a POJO by removing the
internal ```AbstractConfig``` field, and aligns with feedback from
#19439
Reviewers: PoAn Yang <[email protected]>, TengYao Chi
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../transaction/TransactionLogConfig.java | 10 +++---
.../transaction/TransactionLogConfigTest.java | 39 +++++++++-------------
2 files changed, 21 insertions(+), 28 deletions(-)
diff --git
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java
index 65d7a7a3bc5..3e1c4797df4 100644
---
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java
+++
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java
@@ -74,22 +74,24 @@ public final class TransactionLogConfig {
// Configuration for testing only as default value should be
sufficient for typical usage
.defineInternal(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG,
INT, PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW,
PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC);
- private final AbstractConfig config;
private final int transactionTopicMinISR;
private final int transactionLoadBufferSize;
private final short transactionTopicReplicationFactor;
private final int transactionTopicPartitions;
private final int transactionTopicSegmentBytes;
private final int producerIdExpirationCheckIntervalMs;
+ private final boolean transactionPartitionVerificationEnable;
+ private final int producerIdExpirationMs;
public TransactionLogConfig(AbstractConfig config) {
- this.config = config;
this.transactionTopicMinISR =
config.getInt(TRANSACTIONS_TOPIC_MIN_ISR_CONFIG);
this.transactionLoadBufferSize =
config.getInt(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG);
this.transactionTopicReplicationFactor =
config.getShort(TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG);
this.transactionTopicPartitions =
config.getInt(TRANSACTIONS_TOPIC_PARTITIONS_CONFIG);
this.transactionTopicSegmentBytes =
config.getInt(TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG);
this.producerIdExpirationCheckIntervalMs =
config.getInt(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG);
+ this.transactionPartitionVerificationEnable =
config.getBoolean(TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
+ this.producerIdExpirationMs =
config.getInt(PRODUCER_ID_EXPIRATION_MS_CONFIG);
}
public int transactionTopicMinISR() {
@@ -118,11 +120,11 @@ public final class TransactionLogConfig {
// This is a broker dynamic config used for
DynamicProducerStateManagerConfig
public boolean transactionPartitionVerificationEnable() {
- return
config.getBoolean(TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
+ return transactionPartitionVerificationEnable;
}
// This is a broker dynamic config used for
DynamicProducerStateManagerConfig
public int producerIdExpirationMs() {
- return config.getInt(PRODUCER_ID_EXPIRATION_MS_CONFIG);
+ return producerIdExpirationMs;
}
}
diff --git
a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigTest.java
b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigTest.java
index 5ed9e74c4bd..7746bda844a 100644
---
a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigTest.java
+++
b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigTest.java
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
class TransactionLogConfigTest {
@Test
@@ -44,6 +45,7 @@ class TransactionLogConfigTest {
assertEquals(declaredConfigs,
TransactionLogConfig.CONFIG_DEF.names());
}
+ @SuppressWarnings("ResultOfMethodCallIgnored")
@Test
void ShouldGetStaticValueFromClassAttribute() {
AbstractConfig config = mock(AbstractConfig.class);
@@ -53,7 +55,9 @@ class TransactionLogConfigTest {
doReturn(4).when(config).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG);
doReturn(5).when(config).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG);
doReturn(6).when(config).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG);
-
+
doReturn(false).when(config).getBoolean(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
+
doReturn(88).when(config).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG);
+
TransactionLogConfig transactionLogConfig = new
TransactionLogConfig(config);
assertEquals(1, transactionLogConfig.transactionTopicMinISR());
@@ -62,16 +66,20 @@ class TransactionLogConfigTest {
assertEquals(4, transactionLogConfig.transactionTopicPartitions());
assertEquals(5, transactionLogConfig.transactionTopicSegmentBytes());
assertEquals(6,
transactionLogConfig.producerIdExpirationCheckIntervalMs());
+
assertFalse(transactionLogConfig.transactionPartitionVerificationEnable());
+ assertEquals(88, transactionLogConfig.producerIdExpirationMs());
-
- // If the following calls are missing, we won’t be able to distinguish
whether the value is set in the constructor or if
- // it fetches the latest value from AbstractConfig with each call.
+ // This TransactionLogConfig instance is expected to be replaced
entirely when dynamic config updates occur.
+ // Calling the getters a second time ensures they return the values
established at construction,
+ // rather than invoking AbstractConfig again on each call.
transactionLogConfig.transactionTopicMinISR();
transactionLogConfig.transactionLoadBufferSize();
transactionLogConfig.transactionTopicReplicationFactor();
transactionLogConfig.transactionTopicPartitions();
transactionLogConfig.transactionTopicSegmentBytes();
transactionLogConfig.producerIdExpirationCheckIntervalMs();
+ transactionLogConfig.transactionPartitionVerificationEnable();
+ transactionLogConfig.producerIdExpirationMs();
verify(config,
times(1)).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG);
verify(config,
times(1)).getInt(TransactionLogConfig.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG);
@@ -79,25 +87,8 @@ class TransactionLogConfigTest {
verify(config,
times(1)).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG);
verify(config,
times(1)).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG);
verify(config,
times(1)).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG);
- }
-
- @Test
- void ShouldGetDynamicValueFromAbstractConfig() {
- AbstractConfig config = mock(AbstractConfig.class);
-
doReturn(false).when(config).getBoolean(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
-
doReturn(88).when(config).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG);
-
- TransactionLogConfig transactionLogConfig = new
TransactionLogConfig(config);
-
-
assertFalse(transactionLogConfig.transactionPartitionVerificationEnable());
- assertEquals(88, transactionLogConfig.producerIdExpirationMs());
-
- // If the following calls are missing, we won’t be able to distinguish
whether the value is set in the constructor or if
- // it fetches the latest value from AbstractConfig with each call.
- transactionLogConfig.transactionPartitionVerificationEnable();
- transactionLogConfig.producerIdExpirationMs();
-
- verify(config,
times(2)).getBoolean(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
- verify(config,
times(2)).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG);
+ verify(config,
times(1)).getBoolean(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
+ verify(config,
times(1)).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG);
+ verifyNoMoreInteractions(config);
}
}
\ No newline at end of file