junrao commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1222183119


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -945,4 +1176,27 @@ public void close() {
         }
     }
 
+    private static class RetentionSizeData {
+        private final long retentionSize;
+        private final long remainingBreachedSize;
+
+        public RetentionSizeData(long retentionSize, long 
remainingBreachedSize) {
+            this.retentionSize = retentionSize;
+            this.remainingBreachedSize = remainingBreachedSize;
+        }
+
+    }
+
+    private static class RetentionTimeData {
+
+        private final long retentionMs;
+        private final long cleanupUntilMs;
+
+        public RetentionTimeData(long retentionMs, long cleanupUntilMs) {
+            this.retentionMs = retentionMs;
+            this.cleanupUntilMs = cleanupUntilMs;
+        }
+

Review Comment:
   extra new line



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -945,4 +1176,27 @@ public void close() {
         }
     }
 
+    private static class RetentionSizeData {
+        private final long retentionSize;
+        private final long remainingBreachedSize;
+
+        public RetentionSizeData(long retentionSize, long 
remainingBreachedSize) {
+            this.retentionSize = retentionSize;
+            this.remainingBreachedSize = remainingBreachedSize;
+        }
+

Review Comment:
   extra new line



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -945,4 +1176,27 @@ public void close() {
         }
     }
 
+    private static class RetentionSizeData {
+        private final long retentionSize;
+        private final long remainingBreachedSize;
+
+        public RetentionSizeData(long retentionSize, long 
remainingBreachedSize) {
+            this.retentionSize = retentionSize;
+            this.remainingBreachedSize = remainingBreachedSize;
+        }
+
+    }
+
+    private static class RetentionTimeData {
+
+        private final long retentionMs;
+        private final long cleanupUntilMs;
+
+        public RetentionTimeData(long retentionMs, long cleanupUntilMs) {
+            this.retentionMs = retentionMs;
+            this.cleanupUntilMs = cleanupUntilMs;
+        }
+
+    }
+

Review Comment:
   extra new line



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -618,6 +625,230 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            if (isLeader()) {
+                logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+                updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+            }
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private final Optional<RetentionSizeData> retentionSizeData;
+            private final Optional<RetentionTimeData> retentionTimeData;
+
+            private long remainingBreachedSize;
+
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(Optional<RetentionSizeData> 
retentionSizeData, Optional<RetentionTimeData> retentionTimeData) {
+                this.retentionSizeData = retentionSizeData;
+                this.retentionTimeData = retentionTimeData;
+                remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+            }
+
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+                if (!retentionSizeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                    // Assumption that segments contain size >= 0
+                    if (retentionSizeData.get().remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+                            metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+                }
+                return isSegmentDeleted;
+            }
+
+            public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (!retentionTimeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+                            metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+                if (isSegmentDeleted && retentionSizeData.isPresent()) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+            // unreferenced because they are not part of the current leader 
epoch lineage.
+            private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+                        
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment {} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled() || !isLeader()) {
+                logger.info("Returning from remote log segments cleanup as the 
task state is changed");
+                return;
+            }
+
+            // Cleanup remote log segments and update the log start offset if 
applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = 
log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                return;
+            }
+
+            final long retentionSize = log.config().retentionSize;
+            final boolean checkSizeRetention = retentionSize > -1;
+
+            final long retentionMs = log.config().retentionMs;
+            final boolean checkTimestampRetention = retentionMs > -1;
+
+            // Iterate once
+            //  - to build the log size of segments with base-offset < 
local-log-start-offset
+            //  - to collect all the epochs of remote log segments
+            // These values can be cached and updated in RLMTask for this 
topic partition without computing in each
+            // iteration. But the logic can become little complex and need to 
cover different scenarios to avoid any
+            // leaks. We can have a followup to improve it by maintaining 
these values through both copying and deletion.
+            final Set<Integer> epochsSet = new HashSet<>();
+            long totalSizeEarlierToLocalLogStartOffset = 0L;
+            while (segmentMetadataIter.hasNext()) {
+                RemoteLogSegmentMetadata segmentMetadata = 
segmentMetadataIter.next();
+                
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+
+                if (checkSizeRetention && segmentMetadata.endOffset() < 
log.localLogStartOffset()) {
+                    totalSizeEarlierToLocalLogStartOffset += 
segmentMetadata.segmentSizeInBytes();

Review Comment:
   Let's say there is a remote segment with startOffset 100 and endOffset 200. 
If the localLogStartOffset is 150, we exclude the remote segment. This means 
that we are undercounting the size, right?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1383,19 +1430,22 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private def deleteRetentionMsBreachedSegments(): Int = {
-    if (config.retentionMs < 0) return 0
+    val retentionMs = localRetentionMs(config)
+    if (retentionMs < 0) return 0
     val startMs = time.milliseconds
 
     def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-      startMs - segment.largestTimestamp > config.retentionMs
+      startMs - segment.largestTimestamp > retentionMs
     }
 
     deleteOldSegments(shouldDelete, RetentionMsBreach(this))
   }
 
+

Review Comment:
   extra new line



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -618,6 +625,230 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            if (isLeader()) {
+                logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+                updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+            }
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private final Optional<RetentionSizeData> retentionSizeData;
+            private final Optional<RetentionTimeData> retentionTimeData;
+
+            private long remainingBreachedSize;
+
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(Optional<RetentionSizeData> 
retentionSizeData, Optional<RetentionTimeData> retentionTimeData) {
+                this.retentionSizeData = retentionSizeData;
+                this.retentionTimeData = retentionTimeData;
+                remainingBreachedSize = retentionSizeData.map(sizeData -> 
sizeData.remainingBreachedSize).orElse(0L);
+            }
+
+            private boolean 
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+                if (!retentionSizeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> {
+                    // Assumption that segments contain size >= 0
+                    if (retentionSizeData.get().remainingBreachedSize > 0) {
+                        remainingBreachedSize -= x.segmentSizeInBytes();
+                        return remainingBreachedSize >= 0;
+                    } else return false;
+                });
+                if (isSegmentDeleted) {
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention size {} breach. Log size after deletion will be {}.",
+                            metadata.remoteLogSegmentId(), 
retentionSizeData.get().retentionSize, remainingBreachedSize + 
retentionSizeData.get().retentionSize);
+                }
+                return isSegmentDeleted;
+            }
+
+            public boolean 
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (!retentionTimeData.isPresent()) {
+                    return false;
+                }
+
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+                        x -> x.maxTimestampMs() <= 
retentionTimeData.get().cleanupUntilMs);
+                if (isSegmentDeleted) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    // It is fine to have logStartOffset as 
`metadata.endOffset() + 1` as the segment offset intervals
+                    // are ascending with in an epoch.
+                    logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+                    logger.info("Deleted remote log segment {} due to 
retention time {}ms breach based on the largest record timestamp in the 
segment",
+                            metadata.remoteLogSegmentId(), 
retentionTimeData.get().retentionMs);
+                }
+                return isSegmentDeleted;
+            }
+
+            private boolean 
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long 
startOffset)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
-> startOffset > x.endOffset());
+                if (isSegmentDeleted && retentionSizeData.isPresent()) {
+                    remainingBreachedSize = Math.max(0, remainingBreachedSize 
- metadata.segmentSizeInBytes());
+                    logger.info("Deleted remote log segment {} due to log 
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+                }
+
+                // No need to update the logStartOffset.
+                return isSegmentDeleted;
+            }
+
+            // It removes the segments beyond the current leader's earliest 
epoch. Those segments are considered as
+            // unreferenced because they are not part of the current leader 
epoch lineage.
+            private boolean 
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, 
RemoteLogSegmentMetadata metadata) throws RemoteStorageException, 
ExecutionException, InterruptedException {
+                boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x 
->
+                        
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < 
earliestEpochEntry.epoch));
+                if (isSegmentDeleted) {
+                    logger.info("Deleted remote log segment {} due to leader 
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and 
segmentEpochs: {}",
+                            metadata.remoteLogSegmentId(), earliestEpochEntry, 
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+                }
+
+                // No need to update the log-start-offset as these 
epochs/offsets are earlier to that value.
+                return isSegmentDeleted;
+            }
+
+            private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata 
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+                    throws RemoteStorageException, ExecutionException, 
InterruptedException {
+                if (predicate.test(segmentMetadata)) {
+                    // Publish delete segment started event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+                    // Delete the segment in remote storage.
+                    
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+                    // Publish delete segment finished event.
+                    remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+                            new 
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                    
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+
+        }
+
+        private void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionException, InterruptedException {
+            if (isCancelled() || !isLeader()) {
+                logger.info("Returning from remote log segments cleanup as the 
task state is changed");
+                return;
+            }
+
+            // Cleanup remote log segments and update the log start offset if 
applicable.
+            final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+            if (!segmentMetadataIter.hasNext()) {
+                return;
+            }
+
+            final Optional<UnifiedLog> logOptional = 
fetchLog.apply(topicIdPartition.topicPartition());
+            if (!logOptional.isPresent()) {
+                return;
+            }
+
+            final UnifiedLog log = logOptional.get();
+            final Option<LeaderEpochFileCache> leaderEpochCacheOption = 
log.leaderEpochCache();
+            if (leaderEpochCacheOption.isEmpty()) {
+                return;
+            }
+
+            final long retentionSize = log.config().retentionSize;
+            final boolean checkSizeRetention = retentionSize > -1;
+
+            final long retentionMs = log.config().retentionMs;
+            final boolean checkTimestampRetention = retentionMs > -1;
+
+            // Iterate once
+            //  - to build the log size of segments with base-offset < 
local-log-start-offset
+            //  - to collect all the epochs of remote log segments
+            // These values can be cached and updated in RLMTask for this 
topic partition without computing in each
+            // iteration. But the logic can become little complex and need to 
cover different scenarios to avoid any
+            // leaks. We can have a followup to improve it by maintaining 
these values through both copying and deletion.
+            final Set<Integer> epochsSet = new HashSet<>();
+            long totalSizeEarlierToLocalLogStartOffset = 0L;
+            while (segmentMetadataIter.hasNext()) {
+                RemoteLogSegmentMetadata segmentMetadata = 
segmentMetadataIter.next();
+                
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+
+                if (checkSizeRetention && segmentMetadata.endOffset() < 
log.localLogStartOffset()) {
+                    totalSizeEarlierToLocalLogStartOffset += 
segmentMetadata.segmentSizeInBytes();
+                }
+            }
+
+            // All the leader epochs in sorted order that exists in remote 
storage
+            final List<Integer> remoteLeaderEpochs = new 
ArrayList<>(epochsSet);
+            Collections.sort(remoteLeaderEpochs);
+
+            Optional<RetentionSizeData> retentionSizeData = 
buildRetentionSizeData(checkSizeRetention, retentionSize, log, 
totalSizeEarlierToLocalLogStartOffset);
+            Optional<RetentionTimeData> retentionTimeData = 
checkTimestampRetention
+                    ? Optional.of(new RetentionTimeData(retentionMs, 
time.milliseconds() - retentionMs))
+                    : Optional.empty();
+            RemoteLogRetentionHandler remoteLogRetentionHandler = new 
RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
+
+            LeaderEpochFileCache leaderEpochCache = 
leaderEpochCacheOption.get();
+            Iterator<EpochEntry> epochEntryIterator = 
leaderEpochCache.epochEntries().iterator();
+            boolean isSegmentDeleted = true;
+            while (isSegmentDeleted && epochEntryIterator.hasNext()) {
+                EpochEntry epochEntry = epochEntryIterator.next();
+                Iterator<RemoteLogSegmentMetadata> segmentsIterator = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, 
epochEntry.epoch);
+                while (isSegmentDeleted && segmentsIterator.hasNext()) {
+                    if (isCancelled() || !isLeader()) {
+                        return;
+                    }
+
+                    RemoteLogSegmentMetadata metadata = 
segmentsIterator.next();
+                    isSegmentDeleted =
+                            
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
+                                    
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) ||
+                                    
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, 
log.logStartOffset());
+                }
+            }
+
+            // Remove the remote log segments whose segment-leader-epochs are 
lesser than the earliest-epoch known

Review Comment:
   Hmm, why do we need a separate retention based on leader epochs? Is that not 
already covered by size/time/startOffset based retention?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1331,7 +1370,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
                                 reason: SegmentDeletionReason): Int = {
     def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
-      highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
+      val upperBoundOffset = 
nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset)
+
+      // Check not to delete segments which are not yet copied to tiered 
storage
+      val isSegmentTieredToRemoteStorage =
+        if (remoteLogEnabled()) upperBoundOffset > 0 && upperBoundOffset - 1 
<= highestOffsetInRemoteStorage

Review Comment:
   Is highestOffsetInRemoteStorage inclusive or exclusive? It would be useful 
to document that.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -945,13 +978,19 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
         localLog.checkIfMemoryMappedBufferClosed()
         if (newLogStartOffset > logStartOffset) {
-          updatedLogStartOffset = true
-          updateLogStartOffset(newLogStartOffset)
-          _localLogStartOffset = newLogStartOffset
-          info(s"Incremented log start offset to $newLogStartOffset due to 
$reason")
-          leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
-          producerStateManager.onLogStartOffsetIncremented(newLogStartOffset)
-          maybeIncrementFirstUnstableOffset()
+          // it should always get updated  if tiered-storage is not enabled.
+          if (!onlyLocalLogStartOffsetUpdate || !remoteLogEnabled()) {

Review Comment:
   Hmm, this logic doesn't look right. If a client calls `deleteRecords`, we 
call `maybeIncrementLogStartOffset` with onlyLocalLogStartOffsetUpdate=false. 
So, we will go through this branch and update _localLogStartOffset. This will 
be incorrect if remote log is enabled.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -152,16 +152,42 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   locally {
     initializePartitionMetadata()
     updateLogStartOffset(logStartOffset)
+    updateLocalLogStartOffset(math.max(logStartOffset, 
localLog.segments.firstSegmentBaseOffset.getOrElse(0L)))
+    if (!remoteLogEnabled())
+      logStartOffset = localLogStartOffset()
     maybeIncrementFirstUnstableOffset()
     initializeTopicId()
 
     
logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset)
+
+    info(s"Completed load of log with ${localLog.segments.numberOfSegments} 
segments, local log start offset ${localLogStartOffset()} and " +

Review Comment:
   We already log the completion of the load loading in LogManager. Could we 
fold this there to avoid double logging?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -152,16 +152,42 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   locally {
     initializePartitionMetadata()
     updateLogStartOffset(logStartOffset)
+    updateLocalLogStartOffset(math.max(logStartOffset, 
localLog.segments.firstSegmentBaseOffset.getOrElse(0L)))
+    if (!remoteLogEnabled())
+      logStartOffset = localLogStartOffset()
     maybeIncrementFirstUnstableOffset()
     initializeTopicId()
 
     
logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset)
+
+    info(s"Completed load of log with ${localLog.segments.numberOfSegments} 
segments, local log start offset ${localLogStartOffset()} and " +
+      s"log end offset $logEndOffset")
   }
 
   def setLogOffsetsListener(listener: LogOffsetsListener): Unit = {
     logOffsetsListener = listener
   }
 
+  private def updateLocalLogStartOffset(offset: Long): Unit = {
+    _localLogStartOffset = offset
+
+    if (highWatermark < offset) {
+      updateHighWatermark(offset)
+    }
+
+    if (this.recoveryPoint < offset) {
+      localLog.updateRecoveryPoint(offset)

Review Comment:
   This is already done in `updateLogStartOffset`. Do we need to do it here 
again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to