This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7a1c4403b80 MINOR: Remove firstBatchMetadata from ProducerStateEntry 
constructor (#21174)
7a1c4403b80 is described below

commit 7a1c4403b80724addcc101ec57b5a66ce2bd3806
Author: Dmitry Werner <[email protected]>
AuthorDate: Thu Jan 1 22:32:35 2026 +0500

    MINOR: Remove firstBatchMetadata from ProducerStateEntry constructor 
(#21174)
    
    Remove firstBatchMetadata from ProducerStateEntry constructor to
    simplify it.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../apache/kafka/jmh/storage/ProducerStateManagerBench.java |  4 +---
 .../kafka/storage/internals/log/ProducerStateEntry.java     | 10 +++-------
 .../kafka/storage/internals/log/ProducerStateManager.java   |  9 ++++++---
 .../apache/kafka/storage/internals/log/LogSegmentTest.java  |  9 ++++++---
 .../storage/internals/log/ProducerStateManagerTest.java     | 13 +++++++------
 5 files changed, 23 insertions(+), 22 deletions(-)

diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java
index c534aede378..976e2da8840 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java
@@ -41,7 +41,6 @@ import org.openjdk.jmh.annotations.Warmup;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.TimeUnit;
 
@@ -78,8 +77,7 @@ public class ProducerStateManagerBench {
                 epoch,
                 0,
                 time.milliseconds(),
-                OptionalLong.empty(),
-                Optional.empty()
+                OptionalLong.empty()
             );
             manager.loadProducerEntry(entry);
         }
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java
index 97eb59e0b0f..778fcdd3654 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java
@@ -42,20 +42,16 @@ public class ProducerStateEntry {
     private OptionalLong currentTxnFirstOffset;
 
     static ProducerStateEntry empty(long producerId) {
-        return new ProducerStateEntry(producerId, 
RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, 
OptionalLong.empty(), Optional.empty());
+        return new ProducerStateEntry(producerId, 
RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, 
OptionalLong.empty());
     }
 
     public ProducerStateEntry(long producerId, short producerEpoch, int 
coordinatorEpoch, long lastTimestamp,
-                              OptionalLong currentTxnFirstOffset, 
Optional<BatchMetadata> firstBatchMetadata) {
+                              OptionalLong currentTxnFirstOffset) {
         this.producerId = producerId;
         this.producerEpoch = producerEpoch;
         this.coordinatorEpoch = coordinatorEpoch;
         this.lastTimestamp = lastTimestamp;
         this.currentTxnFirstOffset = currentTxnFirstOffset;
-        firstBatchMetadata.ifPresent(batch -> {
-            batchMetadata.add(batch);
-            this.lastTimestamp = batch.timestamp();
-        });
     }
 
     int firstSeq() {
@@ -86,7 +82,7 @@ public class ProducerStateEntry {
      * Returns a new instance with the provided producer ID and the values 
from the current instance.
      */
     ProducerStateEntry withProducerId(long producerId) {
-        return new ProducerStateEntry(producerId, producerEpoch(), 
coordinatorEpoch, lastTimestamp, currentTxnFirstOffset, Optional.empty());
+        return new ProducerStateEntry(producerId, producerEpoch(), 
coordinatorEpoch, lastTimestamp, currentTxnFirstOffset);
     }
 
     void addBatch(short producerEpoch, int lastSeq, long lastOffset, int 
offsetDelta, long timestamp) {
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
index 0cb344f5ef0..80c2eeb3593 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
@@ -649,9 +649,12 @@ public class ProducerStateManager {
             long currentTxnFirstOffset = producerEntry.currentTxnFirstOffset();
 
             OptionalLong currentTxnFirstOffsetVal = currentTxnFirstOffset >= 0 
? OptionalLong.of(currentTxnFirstOffset) : OptionalLong.empty();
-            Optional<BatchMetadata> batchMetadata =
-                    (lastOffset >= 0) ? Optional.of(new 
BatchMetadata(lastSequence, lastOffset, offsetDelta, timestamp)) : 
Optional.empty();
-            entries.add(new ProducerStateEntry(producerId, producerEpoch, 
coordinatorEpoch, timestamp, currentTxnFirstOffsetVal, batchMetadata));
+            ProducerStateEntry stateEntry = new ProducerStateEntry(producerId, 
producerEpoch, coordinatorEpoch, timestamp, currentTxnFirstOffsetVal);
+
+            if (lastOffset >= 0)
+                stateEntry.addBatch(producerEpoch, lastSequence, lastOffset, 
offsetDelta, timestamp);
+
+            entries.add(stateEntry);
         }
 
         return entries;
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
index 2131aaf7ed9..f26ba9749f3 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
@@ -509,9 +509,12 @@ public class LogSegmentTest {
 
             // recover again, assuming the transaction from pid2 began on a 
previous segment
             stateManager = newProducerStateManager();
-            stateManager.loadProducerEntry(new ProducerStateEntry(pid2, 
producerEpoch, 0,
-                RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L),
-                Optional.of(new BatchMetadata(10, 10L, 5, 
RecordBatch.NO_TIMESTAMP))));
+
+            ProducerStateEntry stateEntry = new ProducerStateEntry(pid2, 
producerEpoch, 0,
+                RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L));
+            stateEntry.addBatch(producerEpoch, 10, 10L, 5, 
RecordBatch.NO_TIMESTAMP);
+
+            stateManager.loadProducerEntry(stateEntry);
             segment.recover(stateManager, mock(LeaderEpochFileCache.class));
             assertEquals(108L, stateManager.mapEndOffset());
 
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
index b504fb76636..a9adf3eafb1 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
@@ -1320,14 +1320,15 @@ public class ProducerStateManagerTest {
     @Test
     public void testReadWriteSnapshot() throws IOException {
         Map<Long, ProducerStateEntry> expectedEntryMap = new HashMap<>();
-        expectedEntryMap.put(1L, new ProducerStateEntry(1L, (short) 2, 3,
-                RecordBatch.NO_TIMESTAMP,
-                OptionalLong.of(100L),
-                Optional.of(new BatchMetadata(1, 2L, 3, 
RecordBatch.NO_TIMESTAMP))));
+        ProducerStateEntry stateEntry = new ProducerStateEntry(1L, (short) 2, 
3,
+            RecordBatch.NO_TIMESTAMP, OptionalLong.of(100L));
+
+        stateEntry.addBatch((short) 2, 1, 2L, 3, RecordBatch.NO_TIMESTAMP);
+
+        expectedEntryMap.put(1L, stateEntry);
         expectedEntryMap.put(11L, new ProducerStateEntry(11L, (short) 12, 13,
                 123456L,
-                OptionalLong.empty(),
-                Optional.empty()));
+                OptionalLong.empty()));
 
         File file = new File(logDir, "testReadWriteSnapshot");
         ProducerStateManager.writeSnapshot(file, expectedEntryMap, true);

Reply via email to