This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fceff8ac25f MINOR: Add logging for share fetch record isolation and
throttling (#21217)
fceff8ac25f is described below
commit fceff8ac25f333bfe4579ed7b778ecee1640ccd5
Author: Lan Ding <[email protected]>
AuthorDate: Tue Dec 30 05:01:24 2025 +0800
MINOR: Add logging for share fetch record isolation and throttling (#21217)
see https://github.com/apache/kafka/pull/20837#discussion_r2647208019
Added logs to `acquireSubsetBatchRecords` to trace when records are
isolated or throttled due to high delivery counts. This improves
observability for share partition fetching logic.
Reviewers: Chia-Ping Tsai <[email protected]>, Andrew Schofield
<[email protected]>
---
core/src/main/java/kafka/server/share/SharePartition.java | 6 ++++++
1 file changed, 6 insertions(+)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 243a7406d6a..baafdb24ae1 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -1952,6 +1952,8 @@ public class SharePartition {
// the delivery limit and already have some records to return
in response then skip processing
// the current record, which shall be delivered alone in next
fetch.
if (maxDeliveryCount > 2 && recordDeliveryCount ==
maxDeliveryCount - 1 && acquiredCount > 0) {
+ log.warn("The offset {} is on last delivery attempt in
share partition: {}-{}, should be delivered alone in next fetch",
+ offsetState.getKey(), groupId, topicIdPartition);
break;
}
@@ -1995,6 +1997,8 @@ public class SharePartition {
// Delivered alone.
if (offsetState.getValue().deliveryCount() == maxDeliveryCount
&& maxDeliveryCount > 2) {
+ log.warn("The offset {} is on last delivery attempt in
share partition: {}-{}, should be delivered alone in this fetch",
+ offsetState.getKey(), groupId, topicIdPartition);
break;
}
if (isRecordLimitMode && acquiredCount == maxFetchRecords) {
@@ -2002,6 +2006,8 @@ public class SharePartition {
break;
}
if (hasThrottledRecord && acquiredCount ==
maxFetchRecordsWhileThrottledRecords) {
+ log.debug("Breaking early due to throttling for share
partition: {}-{}, acquired {} records.",
+ groupId, topicIdPartition, acquiredCount);
break;
}
}