apoorvmittal10 commented on code in PR #20246:
URL: https://github.com/apache/kafka/pull/20246#discussion_r2478737602
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -816,8 +821,9 @@ public ShareAcquiredRecords acquire(
// Compute if the batch is a full match.
boolean fullMatch = checkForFullMatch(inFlightBatch,
firstBatch.baseOffset(), lastOffsetToAcquire);
-
- if (!fullMatch || inFlightBatch.offsetState() != null) {
+ long numRecordsToAcquire = inFlightBatch.lastOffset() -
inFlightBatch.firstOffset() + 1;
Review Comment:
The name should be `numRecordsInBatch` as we don't know yet if all records
from the batch can be acquired or not.
```suggestion
long numRecordsInBatch = inFlightBatch.lastOffset() -
inFlightBatch.firstOffset() + 1;
```
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -816,8 +821,9 @@ public ShareAcquiredRecords acquire(
// Compute if the batch is a full match.
boolean fullMatch = checkForFullMatch(inFlightBatch,
firstBatch.baseOffset(), lastOffsetToAcquire);
-
- if (!fullMatch || inFlightBatch.offsetState() != null) {
+ long numRecordsToAcquire = inFlightBatch.lastOffset() -
inFlightBatch.firstOffset() + 1;
+ int numRecordsRemaining = maxRecordsToAcquire - acquiredCount;
+ if (!fullMatch || inFlightBatch.offsetState() != null ||
(isRecordLimitMode && (numRecordsToAcquire > numRecordsRemaining))) {
Review Comment:
May be:
```
boolean recordLimitSubsetMatch = isRecordLimitMode ?
checkForRecordLimitSubsetMatch(inflightBatch,.....) : false;
if (!fullMatch || inFlightBatch.offsetState() != null ||
recordLimitSubsetMatch) {
....
```
```
private boolean checkForRecordLimitSubsetMatch(InflightBatch
inflightBatch, ....) {
long numRecordsInBatch = inFlightBatch.lastOffset() -
inFlightBatch.firstOffset() + 1;
int numRecordsRemaining = maxRecordsToAcquire -
acquiredCount;
return numRecordsInBatch > numRecordsRemaining;
}
```
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1751,10 +1768,56 @@ private List<AcquiredRecords> createBatches(
.setLastOffset(lastAcquiredOffset)
.setDeliveryCount((short) 1));
- result.forEach(acquiredRecords -> {
- // Schedule acquisition lock timeout for the batch.
- AcquisitionLockTimerTask timerTask =
scheduleAcquisitionLockTimeout(memberId, acquiredRecords.firstOffset(),
acquiredRecords.lastOffset());
- // Add the new batch to the in-flight records along with the
acquisition lock timeout task for the batch.
+ if (isRecordLimitMode) {
+ AcquiredRecords acquiredRecords = result.get(0);
+ if (acquiredRecords.lastOffset() -
acquiredRecords.firstOffset() + 1 > maxFetchRecords) {
+ InFlightBatch inFlightBatch = new InFlightBatch(
+ timer,
+ time,
+ memberId,
+ acquiredRecords.firstOffset(),
+ acquiredRecords.lastOffset(),
+ RecordState.ACQUIRED,
+ 1,
+ null,
+ timeoutHandler,
+ sharePartitionMetrics);
+
+ int delayMs =
recordLockDurationMsOrDefault(groupConfigManager, groupId,
defaultRecordLockDurationMs);
+ long lastOffset = acquiredRecords.firstOffset() +
maxFetchRecords - 1;
+ acquiredRecords.setLastOffset(lastOffset);
+ inFlightBatch.maybeInitializeOffsetStateUpdate(lastOffset,
delayMs);
+ updateFindNextFetchOffset(true);
+
+ cachedState.put(acquiredRecords.firstOffset(),
inFlightBatch);
+ sharePartitionMetrics.recordInFlightBatchMessageCount(
+ acquiredRecords.lastOffset() -
acquiredRecords.firstOffset() + 1);
+ return List.of(acquiredRecords);
+ } else {
+ addBatches(memberId, result);
+ }
+ } else {
+ addBatches(memberId, result);
+ }
Review Comment:
If has a return statement anyways
```suggestion
addBatches(memberId, result);
```
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1751,10 +1768,56 @@ private List<AcquiredRecords> createBatches(
.setLastOffset(lastAcquiredOffset)
.setDeliveryCount((short) 1));
- result.forEach(acquiredRecords -> {
- // Schedule acquisition lock timeout for the batch.
- AcquisitionLockTimerTask timerTask =
scheduleAcquisitionLockTimeout(memberId, acquiredRecords.firstOffset(),
acquiredRecords.lastOffset());
- // Add the new batch to the in-flight records along with the
acquisition lock timeout task for the batch.
+ if (isRecordLimitMode) {
+ AcquiredRecords acquiredRecords = result.get(0);
+ if (acquiredRecords.lastOffset() -
acquiredRecords.firstOffset() + 1 > maxFetchRecords) {
+ InFlightBatch inFlightBatch = new InFlightBatch(
Review Comment:
Can you please write comments in the code, explaing why we need the
additional check and why only first element from list is compared.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]