apoorvmittal10 commented on code in PR #20837:
URL: https://github.com/apache/kafka/pull/20837#discussion_r2539147320


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -201,6 +201,11 @@ enum SharePartitionState {
      */
     private final int maxDeliveryCount;
 
+    /**
+     * Records whose delivery count exceeds this are deemed abnormal,
+     * and the batching of these records should be reduced.
+     */

Review Comment:
   ```suggestion
       /**
        * Records whose delivery count exceeds this are deemed abnormal and the 
batching of these records
        * should be reduced. The limit is set to half of maxDeliveryCount 
rounded up, with a minimum of 2.
        */
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -201,6 +201,11 @@ enum SharePartitionState {
      */
     private final int maxDeliveryCount;
 
+    /**
+     * Records whose delivery count exceeds this are deemed abnormal,
+     * and the batching of these records should be reduced.
+     */
+    private final int badRecordDeliveryThreshold;

Review Comment:
   ```suggestion
       private final int throttleRecordsDeliveryLimit;
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1942,6 +1988,19 @@ private boolean checkForStartOffsetWithinBatch(long 
batchFirstOffset, long batch
         return batchFirstOffset < localStartOffset && batchLastOffset >= 
localStartOffset;
     }
 
+    /**
+     * Check if the in-flight batch contains bad records based on delivery 
count.
+     *
+     * @param inFlightBatch The in-flight batch to check for bad records.
+     * @return True if the batch contains bad records (delivery count >= 
threshold), false otherwise.
+     */
+    private boolean checkForBadRecordsDeliveryCount(InFlightBatch 
inFlightBatch) {

Review Comment:
   ```suggestion
       private boolean shouldThrottleRecordsDelivery(InFlightBatch 
inFlightBatch) {
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -858,8 +871,16 @@ public ShareAcquiredRecords acquire(
                     // In record_limit mode, we need to ensure that we do not 
acquire more than
                     // maxRecordsToAcquire. Hence, pass the remaining number 
of records that can
                     // be acquired.
-                    int acquiredSubsetCount = 
acquireSubsetBatchRecords(memberId, isRecordLimitMode, numRecordsRemaining, 
firstBatch.baseOffset(), lastOffsetToAcquire, inFlightBatch, result);
-                    acquiredCount += acquiredSubsetCount;
+                    BadRecordMarkerAndAcquiredCount 
badRecordMarkerAndAcquiredCount = acquireSubsetBatchRecords(memberId, 
isRecordLimitMode,
+                        numRecordsRemaining, firstBatch.baseOffset(), 
lastOffsetToAcquire, inFlightBatch, result);
+
+                    acquiredCount += 
badRecordMarkerAndAcquiredCount.acquiredCount();
+                    // If a bad record is present, return immediately and set 
`maxRecordsToAcquire = -1`
+                    // to prevent acquiring any new records afterwards.
+                    if (badRecordMarkerAndAcquiredCount.badRecordMarker()) {

Review Comment:
   Why do you need this check, as already with above check 
`deliveryCountExceed` we know the batch has been throttled?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -834,7 +840,14 @@ public ShareAcquiredRecords acquire(
                 boolean fullMatch = checkForFullMatch(inFlightBatch, 
firstBatch.baseOffset(), lastOffsetToAcquire);
                 int numRecordsRemaining = maxRecordsToAcquire - acquiredCount;
                 boolean recordLimitSubsetMatch = isRecordLimitMode && 
checkForRecordLimitSubsetMatch(inFlightBatch, maxRecordsToAcquire, 
acquiredCount);
-                if (!fullMatch || inFlightBatch.offsetState() != null || 
recordLimitSubsetMatch) {
+                boolean deliveryCountExceed = 
checkForBadRecordsDeliveryCount(inFlightBatch);
+                // Stop acquiring more records if bad record found after 
acquiring some data to
+                // prevent affecting already acquired records
+                if (deliveryCountExceed && acquiredCount > 0) {
+                    maxRecordsToAcquire = -1;

Review Comment:
   Why -1 and not 0?
   ```suggestion
                       // Set the max records to acquire as 0 to prevent 
further acquisition of records.
                       maxRecordsToAcquire = 0;
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1885,7 +1908,21 @@ private int acquireSubsetBatchRecords(
                     continue;
                 }
 
-                InFlightState updateResult =  
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, 
DeliveryCountOps.INCREASE,
+                int recordDeliveryCount = 
offsetState.getValue().deliveryCount();
+                // On last delivery attempt, submit acquired records,
+                // bad record will be delivered alone next time
+                if (maxDeliveryCount > 2 && recordDeliveryCount == 
maxDeliveryCount - 1 && acquiredCount > 0) {
+                    hasBadRecord = true;
+                    break;
+                }

Review Comment:
   Why this check is required as we already have check on lines 1944-1949 in 
the current PR?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1885,7 +1908,21 @@ private int acquireSubsetBatchRecords(
                     continue;
                 }
 
-                InFlightState updateResult =  
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, 
DeliveryCountOps.INCREASE,
+                int recordDeliveryCount = 
offsetState.getValue().deliveryCount();
+                // On last delivery attempt, submit acquired records,
+                // bad record will be delivered alone next time
+                if (maxDeliveryCount > 2 && recordDeliveryCount == 
maxDeliveryCount - 1 && acquiredCount > 0) {
+                    hasBadRecord = true;
+                    break;
+                }
+
+                // On repeated delivery failures (>= 
badRecordDeliveryThreshold), progressively reduce batch size to isolate bad 
record
+                if (recordDeliveryCount >= badRecordDeliveryThreshold && 
maxFetchRecordsWhileBadRecord < 0) {
+                    maxFetchRecordsWhileBadRecord = Math.max(1, (long) 
inFlightBatch.offsetState().size() >> (recordDeliveryCount - 
badRecordDeliveryThreshold + 1));
+                    hasBadRecord = true;
+                }

Review Comment:
   Hmmm, this need detailed comments.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -834,7 +840,14 @@ public ShareAcquiredRecords acquire(
                 boolean fullMatch = checkForFullMatch(inFlightBatch, 
firstBatch.baseOffset(), lastOffsetToAcquire);
                 int numRecordsRemaining = maxRecordsToAcquire - acquiredCount;
                 boolean recordLimitSubsetMatch = isRecordLimitMode && 
checkForRecordLimitSubsetMatch(inFlightBatch, maxRecordsToAcquire, 
acquiredCount);
-                if (!fullMatch || inFlightBatch.offsetState() != null || 
recordLimitSubsetMatch) {
+                boolean deliveryCountExceed = 
checkForBadRecordsDeliveryCount(inFlightBatch);

Review Comment:
   ```suggestion
                   boolean throttleRecordsDelivery = 
shouldThrottleRecordsDelivery(inFlightBatch);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to