This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 28ad4dd5c5b MINOR: Remove unnecessary Optional from offsetsToSnapshot 
(#19613)
28ad4dd5c5b is described below

commit 28ad4dd5c5b60ab9788368ca51ec9736afcf372c
Author: Jhen-Yung Hsu <[email protected]>
AuthorDate: Sat May 3 12:58:08 2025 +0800

    MINOR: Remove unnecessary Optional from offsetsToSnapshot (#19613)
    
    Reviewers: PoAn Yang <[email protected]>, Ken Huang
     <[email protected]>, TengYao Chi <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../kafka/storage/internals/log/UnifiedLog.java    | 26 +++++++++-------------
 1 file changed, 11 insertions(+), 15 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
index dcef6929d19..972aeb09581 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
@@ -2416,15 +2416,13 @@ public class UnifiedLog implements AutoCloseable {
                                             Time time,
                                             boolean reloadFromCleanShutdown,
                                             String logPrefix) throws 
IOException {
-        List<Optional<Long>> offsetsToSnapshot = new ArrayList<>();
-        if (segments.nonEmpty()) {
-            long lastSegmentBaseOffset = 
segments.lastSegment().get().baseOffset();
-            Optional<LogSegment> lowerSegment = 
segments.lowerSegment(lastSegmentBaseOffset);
-            Optional<Long> nextLatestSegmentBaseOffset = 
lowerSegment.map(LogSegment::baseOffset);
-            offsetsToSnapshot.add(nextLatestSegmentBaseOffset);
-            offsetsToSnapshot.add(Optional.of(lastSegmentBaseOffset));
-        }
-        offsetsToSnapshot.add(Optional.of(lastOffset));
+        List<Long> offsetsToSnapshot = new ArrayList<>();
+        segments.lastSegment().ifPresent(lastSegment -> {
+            long lastSegmentBaseOffset = lastSegment.baseOffset();
+            segments.lowerSegment(lastSegmentBaseOffset).ifPresent(s -> 
offsetsToSnapshot.add(s.baseOffset()));
+            offsetsToSnapshot.add(lastSegmentBaseOffset);
+        });
+        offsetsToSnapshot.add(lastOffset);
 
         LOG.info("{}Loading producer state till offset {}", logPrefix, 
lastOffset);
 
@@ -2443,11 +2441,9 @@ public class UnifiedLog implements AutoCloseable {
             // To avoid an expensive scan through all the segments, we take 
empty snapshots from the start of the
             // last two segments and the last offset. This should avoid the 
full scan in the case that the log needs
             // truncation.
-            for (Optional<Long> offset : offsetsToSnapshot) {
-                if (offset.isPresent()) {
-                    producerStateManager.updateMapEndOffset(offset.get());
-                    producerStateManager.takeSnapshot();
-                }
+            for (long offset : offsetsToSnapshot) {
+                producerStateManager.updateMapEndOffset(offset);
+                producerStateManager.takeSnapshot();
             }
         } else {
             LOG.info("{}Reloading from producer snapshot and rebuilding 
producer state from offset {}", logPrefix, lastOffset);
@@ -2469,7 +2465,7 @@ public class UnifiedLog implements AutoCloseable {
                     long startOffset = Utils.max(segment.baseOffset(), 
producerStateManager.mapEndOffset(), logStartOffset);
                     producerStateManager.updateMapEndOffset(startOffset);
 
-                    if 
(offsetsToSnapshot.contains(Optional.of(segment.baseOffset()))) {
+                    if (offsetsToSnapshot.contains(segment.baseOffset())) {
                         producerStateManager.takeSnapshot();
                     }
                     int maxPosition = segment.size();

Reply via email to