This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 28ad4dd5c5b MINOR: Remove unnecessary Optional from offsetsToSnapshot
(#19613)
28ad4dd5c5b is described below
commit 28ad4dd5c5b60ab9788368ca51ec9736afcf372c
Author: Jhen-Yung Hsu <[email protected]>
AuthorDate: Sat May 3 12:58:08 2025 +0800
MINOR: Remove unnecessary Optional from offsetsToSnapshot (#19613)
Reviewers: PoAn Yang <[email protected]>, Ken Huang
<[email protected]>, TengYao Chi <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../kafka/storage/internals/log/UnifiedLog.java | 26 +++++++++-------------
1 file changed, 11 insertions(+), 15 deletions(-)
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
index dcef6929d19..972aeb09581 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
@@ -2416,15 +2416,13 @@ public class UnifiedLog implements AutoCloseable {
Time time,
boolean reloadFromCleanShutdown,
String logPrefix) throws
IOException {
- List<Optional<Long>> offsetsToSnapshot = new ArrayList<>();
- if (segments.nonEmpty()) {
- long lastSegmentBaseOffset =
segments.lastSegment().get().baseOffset();
- Optional<LogSegment> lowerSegment =
segments.lowerSegment(lastSegmentBaseOffset);
- Optional<Long> nextLatestSegmentBaseOffset =
lowerSegment.map(LogSegment::baseOffset);
- offsetsToSnapshot.add(nextLatestSegmentBaseOffset);
- offsetsToSnapshot.add(Optional.of(lastSegmentBaseOffset));
- }
- offsetsToSnapshot.add(Optional.of(lastOffset));
+ List<Long> offsetsToSnapshot = new ArrayList<>();
+ segments.lastSegment().ifPresent(lastSegment -> {
+ long lastSegmentBaseOffset = lastSegment.baseOffset();
+ segments.lowerSegment(lastSegmentBaseOffset).ifPresent(s ->
offsetsToSnapshot.add(s.baseOffset()));
+ offsetsToSnapshot.add(lastSegmentBaseOffset);
+ });
+ offsetsToSnapshot.add(lastOffset);
LOG.info("{}Loading producer state till offset {}", logPrefix,
lastOffset);
@@ -2443,11 +2441,9 @@ public class UnifiedLog implements AutoCloseable {
// To avoid an expensive scan through all the segments, we take
empty snapshots from the start of the
// last two segments and the last offset. This should avoid the
full scan in the case that the log needs
// truncation.
- for (Optional<Long> offset : offsetsToSnapshot) {
- if (offset.isPresent()) {
- producerStateManager.updateMapEndOffset(offset.get());
- producerStateManager.takeSnapshot();
- }
+ for (long offset : offsetsToSnapshot) {
+ producerStateManager.updateMapEndOffset(offset);
+ producerStateManager.takeSnapshot();
}
} else {
LOG.info("{}Reloading from producer snapshot and rebuilding
producer state from offset {}", logPrefix, lastOffset);
@@ -2469,7 +2465,7 @@ public class UnifiedLog implements AutoCloseable {
long startOffset = Utils.max(segment.baseOffset(),
producerStateManager.mapEndOffset(), logStartOffset);
producerStateManager.updateMapEndOffset(startOffset);
- if
(offsetsToSnapshot.contains(Optional.of(segment.baseOffset()))) {
+ if (offsetsToSnapshot.contains(segment.baseOffset())) {
producerStateManager.takeSnapshot();
}
int maxPosition = segment.size();