This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7a1c4403b80 MINOR: Remove firstBatchMetadata from ProducerStateEntry
constructor (#21174)
7a1c4403b80 is described below
commit 7a1c4403b80724addcc101ec57b5a66ce2bd3806
Author: Dmitry Werner <[email protected]>
AuthorDate: Thu Jan 1 22:32:35 2026 +0500
MINOR: Remove firstBatchMetadata from ProducerStateEntry constructor
(#21174)
Remove firstBatchMetadata from ProducerStateEntry constructor to
simplify it.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../apache/kafka/jmh/storage/ProducerStateManagerBench.java | 4 +---
.../kafka/storage/internals/log/ProducerStateEntry.java | 10 +++-------
.../kafka/storage/internals/log/ProducerStateManager.java | 9 ++++++---
.../apache/kafka/storage/internals/log/LogSegmentTest.java | 9 ++++++---
.../storage/internals/log/ProducerStateManagerTest.java | 13 +++++++------
5 files changed, 23 insertions(+), 22 deletions(-)
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java
index c534aede378..976e2da8840 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/storage/ProducerStateManagerBench.java
@@ -41,7 +41,6 @@ import org.openjdk.jmh.annotations.Warmup;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
@@ -78,8 +77,7 @@ public class ProducerStateManagerBench {
epoch,
0,
time.milliseconds(),
- OptionalLong.empty(),
- Optional.empty()
+ OptionalLong.empty()
);
manager.loadProducerEntry(entry);
}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java
index 97eb59e0b0f..778fcdd3654 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java
@@ -42,20 +42,16 @@ public class ProducerStateEntry {
private OptionalLong currentTxnFirstOffset;
static ProducerStateEntry empty(long producerId) {
- return new ProducerStateEntry(producerId,
RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP,
OptionalLong.empty(), Optional.empty());
+ return new ProducerStateEntry(producerId,
RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP,
OptionalLong.empty());
}
public ProducerStateEntry(long producerId, short producerEpoch, int
coordinatorEpoch, long lastTimestamp,
- OptionalLong currentTxnFirstOffset,
Optional<BatchMetadata> firstBatchMetadata) {
+ OptionalLong currentTxnFirstOffset) {
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.coordinatorEpoch = coordinatorEpoch;
this.lastTimestamp = lastTimestamp;
this.currentTxnFirstOffset = currentTxnFirstOffset;
- firstBatchMetadata.ifPresent(batch -> {
- batchMetadata.add(batch);
- this.lastTimestamp = batch.timestamp();
- });
}
int firstSeq() {
@@ -86,7 +82,7 @@ public class ProducerStateEntry {
* Returns a new instance with the provided producer ID and the values
from the current instance.
*/
ProducerStateEntry withProducerId(long producerId) {
- return new ProducerStateEntry(producerId, producerEpoch(),
coordinatorEpoch, lastTimestamp, currentTxnFirstOffset, Optional.empty());
+ return new ProducerStateEntry(producerId, producerEpoch(),
coordinatorEpoch, lastTimestamp, currentTxnFirstOffset);
}
void addBatch(short producerEpoch, int lastSeq, long lastOffset, int
offsetDelta, long timestamp) {
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
index 0cb344f5ef0..80c2eeb3593 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
@@ -649,9 +649,12 @@ public class ProducerStateManager {
long currentTxnFirstOffset = producerEntry.currentTxnFirstOffset();
OptionalLong currentTxnFirstOffsetVal = currentTxnFirstOffset >= 0
? OptionalLong.of(currentTxnFirstOffset) : OptionalLong.empty();
- Optional<BatchMetadata> batchMetadata =
- (lastOffset >= 0) ? Optional.of(new
BatchMetadata(lastSequence, lastOffset, offsetDelta, timestamp)) :
Optional.empty();
- entries.add(new ProducerStateEntry(producerId, producerEpoch,
coordinatorEpoch, timestamp, currentTxnFirstOffsetVal, batchMetadata));
+ ProducerStateEntry stateEntry = new ProducerStateEntry(producerId,
producerEpoch, coordinatorEpoch, timestamp, currentTxnFirstOffsetVal);
+
+ if (lastOffset >= 0)
+ stateEntry.addBatch(producerEpoch, lastSequence, lastOffset,
offsetDelta, timestamp);
+
+ entries.add(stateEntry);
}
return entries;
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
index 2131aaf7ed9..f26ba9749f3 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
@@ -509,9 +509,12 @@ public class LogSegmentTest {
// recover again, assuming the transaction from pid2 began on a
previous segment
stateManager = newProducerStateManager();
- stateManager.loadProducerEntry(new ProducerStateEntry(pid2,
producerEpoch, 0,
- RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L),
- Optional.of(new BatchMetadata(10, 10L, 5,
RecordBatch.NO_TIMESTAMP))));
+
+ ProducerStateEntry stateEntry = new ProducerStateEntry(pid2,
producerEpoch, 0,
+ RecordBatch.NO_TIMESTAMP, OptionalLong.of(75L));
+ stateEntry.addBatch(producerEpoch, 10, 10L, 5,
RecordBatch.NO_TIMESTAMP);
+
+ stateManager.loadProducerEntry(stateEntry);
segment.recover(stateManager, mock(LeaderEpochFileCache.class));
assertEquals(108L, stateManager.mapEndOffset());
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
index b504fb76636..a9adf3eafb1 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java
@@ -1320,14 +1320,15 @@ public class ProducerStateManagerTest {
@Test
public void testReadWriteSnapshot() throws IOException {
Map<Long, ProducerStateEntry> expectedEntryMap = new HashMap<>();
- expectedEntryMap.put(1L, new ProducerStateEntry(1L, (short) 2, 3,
- RecordBatch.NO_TIMESTAMP,
- OptionalLong.of(100L),
- Optional.of(new BatchMetadata(1, 2L, 3,
RecordBatch.NO_TIMESTAMP))));
+ ProducerStateEntry stateEntry = new ProducerStateEntry(1L, (short) 2,
3,
+ RecordBatch.NO_TIMESTAMP, OptionalLong.of(100L));
+
+ stateEntry.addBatch((short) 2, 1, 2L, 3, RecordBatch.NO_TIMESTAMP);
+
+ expectedEntryMap.put(1L, stateEntry);
expectedEntryMap.put(11L, new ProducerStateEntry(11L, (short) 12, 13,
123456L,
- OptionalLong.empty(),
- Optional.empty()));
+ OptionalLong.empty()));
File file = new File(logDir, "testReadWriteSnapshot");
ProducerStateManager.writeSnapshot(file, expectedEntryMap, true);