This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 36b9bb94f10 KAFKA-19474 Move WARN log on log truncation below HWM
(#20106)
36b9bb94f10 is described below
commit 36b9bb94f106749039ced274eccdd5d02520261c
Author: Gaurav Narula <[email protected]>
AuthorDate: Wed Jul 9 02:55:02 2025 +0100
KAFKA-19474 Move WARN log on log truncation below HWM (#20106)
#5608 introduced a regression where the check for `targetOffset <
log.highWatermark`
to emit a `WARN` log was made incorrectly after truncating the log.
This change moves the check for `targetOffset < log.highWatermark` to
`UnifiedLog#truncateTo` and ensures we emit a `WARN` log on truncation
below the replica's HWM by both the `ReplicaFetcherThread` and
`ReplicaAlterLogDirsThread`
Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala | 5 -----
.../java/org/apache/kafka/storage/internals/log/UnifiedLog.java | 6 ++++++
2 files changed, 6 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 8459b6fd8eb..fa2f6bb7f35 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -164,14 +164,9 @@ class ReplicaFetcherThread(name: String,
*/
override def truncate(tp: TopicPartition, offsetTruncationState:
OffsetTruncationState): Unit = {
val partition = replicaMgr.getPartitionOrException(tp)
- val log = partition.localLogOrException
partition.truncateTo(offsetTruncationState.offset, isFuture = false)
- if (offsetTruncationState.offset < log.highWatermark)
- warn(s"Truncating $tp to offset ${offsetTruncationState.offset} below
high watermark " +
- s"${log.highWatermark}")
-
// mark the future replica for truncation only when we do last truncation
if (offsetTruncationState.truncationCompleted)
replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId,
tp,
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
index 2ca67ad47bc..cb642bd1e4b 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
@@ -2238,6 +2238,12 @@ public class UnifiedLog implements AutoCloseable {
if (targetOffset < 0) {
throw new IllegalArgumentException("Cannot truncate
partition " + topicPartition() + " to a negative offset (" + targetOffset +
").");
}
+
+ long hwm = highWatermark();
+ if (targetOffset < hwm) {
+ logger.warn("Truncating {}{} to offset {} below high
watermark {}", isFuture() ? "future " : "", topicPartition(), targetOffset,
hwm);
+ }
+
if (targetOffset >= localLog.logEndOffset()) {
logger.info("Truncating to {} has no effect as the
largest offset in the log is {}", targetOffset, localLog.logEndOffset() - 1);
// Always truncate epoch cache since we may have a
conflicting epoch entry at the