This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a7eae28a67b MINOR: Replaced internal KafkaConfig field in 
TransactionLogConfig (#19482)
a7eae28a67b is described below

commit a7eae28a67b365ed42aa02dc9cf52f51ee7f3bfe
Author: Hong-Yi Chen <[email protected]>
AuthorDate: Sat May 10 15:06:18 2025 +0000

    MINOR: Replaced internal KafkaConfig field in TransactionLogConfig (#19482)
    
    Retaining a reference to ```AbstractConfig``` introduced coupling and
    potential inconsistencies with dynamic config updates. This change
    simplifies ```TransactionLogConfig``` into a POJO by removing the
    internal ```AbstractConfig``` field, and aligns with feedback from
    #19439
    
    Reviewers: PoAn Yang <[email protected]>, TengYao Chi
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../transaction/TransactionLogConfig.java          | 10 +++---
 .../transaction/TransactionLogConfigTest.java      | 39 +++++++++-------------
 2 files changed, 21 insertions(+), 28 deletions(-)

diff --git 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java
 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java
index 65d7a7a3bc5..3e1c4797df4 100644
--- 
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java
+++ 
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLogConfig.java
@@ -74,22 +74,24 @@ public final class TransactionLogConfig {
             // Configuration for testing only as default value should be 
sufficient for typical usage
             .defineInternal(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, 
INT, PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW, 
PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC);
 
-    private final AbstractConfig config;
     private final int transactionTopicMinISR;
     private final int transactionLoadBufferSize;
     private final short transactionTopicReplicationFactor;
     private final int transactionTopicPartitions;
     private final int transactionTopicSegmentBytes;
     private final int producerIdExpirationCheckIntervalMs;
+    private final boolean transactionPartitionVerificationEnable;
+    private final int producerIdExpirationMs;
 
     public TransactionLogConfig(AbstractConfig config) {
-        this.config = config;
         this.transactionTopicMinISR = 
config.getInt(TRANSACTIONS_TOPIC_MIN_ISR_CONFIG);
         this.transactionLoadBufferSize = 
config.getInt(TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG);
         this.transactionTopicReplicationFactor = 
config.getShort(TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG);
         this.transactionTopicPartitions = 
config.getInt(TRANSACTIONS_TOPIC_PARTITIONS_CONFIG);
         this.transactionTopicSegmentBytes = 
config.getInt(TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG);
         this.producerIdExpirationCheckIntervalMs = 
config.getInt(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG);
+        this.transactionPartitionVerificationEnable = 
config.getBoolean(TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
+        this.producerIdExpirationMs = 
config.getInt(PRODUCER_ID_EXPIRATION_MS_CONFIG);
     }
 
     public int transactionTopicMinISR() {
@@ -118,11 +120,11 @@ public final class TransactionLogConfig {
 
     // This is a broker dynamic config used for 
DynamicProducerStateManagerConfig
     public boolean transactionPartitionVerificationEnable() {
-        return 
config.getBoolean(TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
+        return transactionPartitionVerificationEnable;
     }
 
     // This is a broker dynamic config used for 
DynamicProducerStateManagerConfig
     public int producerIdExpirationMs() {
-        return config.getInt(PRODUCER_ID_EXPIRATION_MS_CONFIG);
+        return producerIdExpirationMs;
     }
 }
diff --git 
a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigTest.java
 
b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigTest.java
index 5ed9e74c4bd..7746bda844a 100644
--- 
a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigTest.java
+++ 
b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigTest.java
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 
 class TransactionLogConfigTest {
     @Test
@@ -44,6 +45,7 @@ class TransactionLogConfigTest {
         assertEquals(declaredConfigs,  
TransactionLogConfig.CONFIG_DEF.names());
     }
 
+    @SuppressWarnings("ResultOfMethodCallIgnored")
     @Test
     void ShouldGetStaticValueFromClassAttribute() {
         AbstractConfig config = mock(AbstractConfig.class);
@@ -53,7 +55,9 @@ class TransactionLogConfigTest {
         
doReturn(4).when(config).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG);
         
doReturn(5).when(config).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG);
         
doReturn(6).when(config).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG);
-
+        
doReturn(false).when(config).getBoolean(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
+        
doReturn(88).when(config).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG);
+        
         TransactionLogConfig transactionLogConfig = new 
TransactionLogConfig(config);
 
         assertEquals(1, transactionLogConfig.transactionTopicMinISR());
@@ -62,16 +66,20 @@ class TransactionLogConfigTest {
         assertEquals(4, transactionLogConfig.transactionTopicPartitions());
         assertEquals(5, transactionLogConfig.transactionTopicSegmentBytes());
         assertEquals(6, 
transactionLogConfig.producerIdExpirationCheckIntervalMs());
+        
assertFalse(transactionLogConfig.transactionPartitionVerificationEnable());
+        assertEquals(88, transactionLogConfig.producerIdExpirationMs());
 
-
-        // If the following calls are missing, we won’t be able to distinguish 
whether the value is set in the constructor or if
-        // it fetches the latest value from AbstractConfig with each call.
+        // This TransactionLogConfig instance is expected to be replaced 
entirely when dynamic config updates occur.
+        // Calling the getters a second time ensures they return the values 
established at construction,
+        // rather than invoking AbstractConfig again on each call.
         transactionLogConfig.transactionTopicMinISR();
         transactionLogConfig.transactionLoadBufferSize();
         transactionLogConfig.transactionTopicReplicationFactor();
         transactionLogConfig.transactionTopicPartitions();
         transactionLogConfig.transactionTopicSegmentBytes();
         transactionLogConfig.producerIdExpirationCheckIntervalMs();
+        transactionLogConfig.transactionPartitionVerificationEnable();
+        transactionLogConfig.producerIdExpirationMs();
 
         verify(config, 
times(1)).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG);
         verify(config, 
times(1)).getInt(TransactionLogConfig.TRANSACTIONS_LOAD_BUFFER_SIZE_CONFIG);
@@ -79,25 +87,8 @@ class TransactionLogConfigTest {
         verify(config, 
times(1)).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG);
         verify(config, 
times(1)).getInt(TransactionLogConfig.TRANSACTIONS_TOPIC_SEGMENT_BYTES_CONFIG);
         verify(config, 
times(1)).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG);
-    }
-
-    @Test
-    void ShouldGetDynamicValueFromAbstractConfig() {
-        AbstractConfig config = mock(AbstractConfig.class);
-        
doReturn(false).when(config).getBoolean(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
-        
doReturn(88).when(config).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG);
-
-        TransactionLogConfig transactionLogConfig = new 
TransactionLogConfig(config);
-
-        
assertFalse(transactionLogConfig.transactionPartitionVerificationEnable());
-        assertEquals(88, transactionLogConfig.producerIdExpirationMs());
-
-        // If the following calls are missing, we won’t be able to distinguish 
whether the value is set in the constructor or if
-        // it fetches the latest value from AbstractConfig with each call.
-        transactionLogConfig.transactionPartitionVerificationEnable();
-        transactionLogConfig.producerIdExpirationMs();
-
-        verify(config, 
times(2)).getBoolean(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
-        verify(config, 
times(2)).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG);
+        verify(config, 
times(1)).getBoolean(TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);
+        verify(config, 
times(1)).getInt(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG);
+        verifyNoMoreInteractions(config);
     }
 }
\ No newline at end of file

Reply via email to