This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 36b9bb94f10 KAFKA-19474 Move WARN log on log truncation below HWM 
(#20106)
36b9bb94f10 is described below

commit 36b9bb94f106749039ced274eccdd5d02520261c
Author: Gaurav Narula <[email protected]>
AuthorDate: Wed Jul 9 02:55:02 2025 +0100

    KAFKA-19474 Move WARN log on log truncation below HWM (#20106)
    
    #5608 introduced a regression where the check for `targetOffset <
    log.highWatermark`
    to emit a `WARN` log was made incorrectly after truncating the log.
    
    This change moves the check for `targetOffset < log.highWatermark`  to
    `UnifiedLog#truncateTo` and ensures we emit a `WARN` log on truncation
    below  the replica's HWM by both the `ReplicaFetcherThread` and
    `ReplicaAlterLogDirsThread`
    
    Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 core/src/main/scala/kafka/server/ReplicaFetcherThread.scala         | 5 -----
 .../java/org/apache/kafka/storage/internals/log/UnifiedLog.java     | 6 ++++++
 2 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 8459b6fd8eb..fa2f6bb7f35 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -164,14 +164,9 @@ class ReplicaFetcherThread(name: String,
    */
   override def truncate(tp: TopicPartition, offsetTruncationState: 
OffsetTruncationState): Unit = {
     val partition = replicaMgr.getPartitionOrException(tp)
-    val log = partition.localLogOrException
 
     partition.truncateTo(offsetTruncationState.offset, isFuture = false)
 
-    if (offsetTruncationState.offset < log.highWatermark)
-      warn(s"Truncating $tp to offset ${offsetTruncationState.offset} below 
high watermark " +
-        s"${log.highWatermark}")
-
     // mark the future replica for truncation only when we do last truncation
     if (offsetTruncationState.truncationCompleted)
       
replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId,
 tp,
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
index 2ca67ad47bc..cb642bd1e4b 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
@@ -2238,6 +2238,12 @@ public class UnifiedLog implements AutoCloseable {
                     if (targetOffset < 0) {
                         throw new IllegalArgumentException("Cannot truncate 
partition " + topicPartition() + " to a negative offset (" + targetOffset + 
").");
                     }
+
+                    long hwm = highWatermark();
+                    if (targetOffset < hwm) {
+                        logger.warn("Truncating {}{} to offset {} below high 
watermark {}", isFuture() ? "future " : "", topicPartition(), targetOffset, 
hwm);
+                    }
+
                     if (targetOffset >= localLog.logEndOffset()) {
                         logger.info("Truncating to {} has no effect as the 
largest offset in the log is {}", targetOffset, localLog.logEndOffset() - 1);
                         // Always truncate epoch cache since we may have a 
conflicting epoch entry at the

Reply via email to