apoorvmittal10 commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1828532884


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -54,18 +59,23 @@ public class DelayedShareFetch extends DelayedOperation {
     private final ShareFetchData shareFetchData;
     private final ReplicaManager replicaManager;
 
-    private Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionDataFromTryComplete;
+    private Map<TopicIdPartition, FetchRequest.PartitionData> 
partitionsToComplete;
+    private Map<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;
     private final SharePartitionManager sharePartitionManager;
+    private final Map<TopicIdPartition, SharePartition> sharePartitions;
 
     DelayedShareFetch(
             ShareFetchData shareFetchData,
             ReplicaManager replicaManager,
-            SharePartitionManager sharePartitionManager) {
+            SharePartitionManager sharePartitionManager,
+            Map<TopicIdPartition, SharePartition> sharePartitions) {
         super(shareFetchData.fetchParams().maxWaitMs, Option.empty());
         this.shareFetchData = shareFetchData;
         this.replicaManager = replicaManager;
-        this.topicPartitionDataFromTryComplete = new LinkedHashMap<>();
+        this.partitionsToComplete = new LinkedHashMap<>();

Review Comment:
   We initialize the variable in constructor then re-assign while creating 
another LinkedHashMap in acquirablePartitions() method. Are we are initializing 
`partitionsToComplete` here to save null check? Can't we re-use already 
initialized LinkedHashMap()?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,51 @@ public void onComplete() {
      */
     @Override
     public boolean tryComplete() {
-        topicPartitionDataFromTryComplete = acquirablePartitions();
+        if (anySharePartitionNoLongerExists()) {
+            return forceComplete();
+        }
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
acquirablePartitions();
 
-        if (!topicPartitionDataFromTryComplete.isEmpty()) {
+        try {
+            if (!topicPartitionData.isEmpty()) {
+                // In case, fetch offset metadata doesn't exist for one or 
more topic partitions, we do a
+                // replicaManager.readFromLog to populate the offset metadata 
and update the fetch offset metadata for
+                // those topic partitions.
+                Map<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse = 
updateFetchOffsetMetadata(maybeReadFromLog(topicPartitionData));
+                if 
(anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) || 
isMinBytesSatisfied(topicPartitionData)) {
+                    partitionsToComplete = topicPartitionData;
+                    partitionsAlreadyFetched = replicaManagerReadResponse;
+                    boolean completedByMe = forceComplete();
+                    // If invocation of forceComplete is not successful, then 
that means the request is already completed
+                    // hence release the acquired locks.
+                    if (!completedByMe) {
+                        releasePartitionLocks(partitionsToComplete.keySet());
+                        partitionsAlreadyFetched.clear();
+                        partitionsToComplete.clear();
+                    }
+                    return completedByMe;
+                } else {
+                    log.debug("minBytes is not satisfied for the share fetch 
request for group {}, member {}, " +

Review Comment:
   There are 2 checks in the if condition (anyTopicIdPartitionHasLogReadError 
and isMinBytesSatisfied) but the log says that minBytes criteria is not 
satified. I this correct log statement?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +243,155 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
-        topicIdPartitions.forEach(tp -> {
-            SharePartition sharePartition = 
sharePartitionManager.sharePartition(groupId, tp);
+    private Map<TopicIdPartition, LogReadResult> 
maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData) {
+        Map<TopicIdPartition, FetchRequest.PartitionData> 
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+        topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+            if (sharePartition.fetchOffsetMetadata().isEmpty()) {
+                
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition, partitionData);
+            }
+        });
+        if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        // We fetch data from replica manager corresponding to the topic 
partitions that have missing fetch offset metadata.
+        return readFromLog(missingFetchOffsetMetadataTopicPartitions);
+    }
+
+    private Map<TopicIdPartition, LogReadResult> updateFetchOffsetMetadata(

Review Comment:
   Shouldn't the name be `maybeUpdateFetchOffsetMetadata` as it depends on the 
log read result?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +243,155 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
-        topicIdPartitions.forEach(tp -> {
-            SharePartition sharePartition = 
sharePartitionManager.sharePartition(groupId, tp);
+    private Map<TopicIdPartition, LogReadResult> 
maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData) {
+        Map<TopicIdPartition, FetchRequest.PartitionData> 
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+        topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+            if (sharePartition.fetchOffsetMetadata().isEmpty()) {
+                
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition, partitionData);
+            }
+        });
+        if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {

Review Comment:
   So did we not chose to implement 
https://github.com/apache/kafka/pull/17539/files#r1816530978 rather initialize 
with LinkedHashMap which will hardly be filled?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -169,9 +205,9 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
new LinkedHashMap<>();
 
         shareFetchData.partitionMaxBytes().keySet().forEach(topicIdPartition 
-> {
-            SharePartition sharePartition = 
sharePartitionManager.sharePartition(shareFetchData.groupId(), 
topicIdPartition);
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
             if (sharePartition == null) {
-                log.error("Encountered null share partition for groupId={}, 
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+                log.debug("Encountered null share partition for groupId={}, 
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
                 return;
             }

Review Comment:
   Same elsewhere.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -169,9 +205,9 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
new LinkedHashMap<>();
 
         shareFetchData.partitionMaxBytes().keySet().forEach(topicIdPartition 
-> {
-            SharePartition sharePartition = 
sharePartitionManager.sharePartition(shareFetchData.groupId(), 
topicIdPartition);
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
             if (sharePartition == null) {
-                log.error("Encountered null share partition for groupId={}, 
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+                log.debug("Encountered null share partition for groupId={}, 
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
                 return;
             }

Review Comment:
   We have now passed `sharePartitions` map which contains topicIdPartition and 
sharePartition itself then why are we iterating on 
`shareFetchData.partitionMaxBytes().keySet()` and doing a null check, why not 
to iterate in `sharePartitions` map itself? Also make the map of 
`sharePartitions` in SPM as of type LinkedHashMap then.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -54,18 +59,23 @@ public class DelayedShareFetch extends DelayedOperation {
     private final ShareFetchData shareFetchData;
     private final ReplicaManager replicaManager;
 
-    private Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionDataFromTryComplete;
+    private Map<TopicIdPartition, FetchRequest.PartitionData> 
partitionsToComplete;
+    private Map<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;
     private final SharePartitionManager sharePartitionManager;
+    private final Map<TopicIdPartition, SharePartition> sharePartitions;
 
     DelayedShareFetch(
             ShareFetchData shareFetchData,
             ReplicaManager replicaManager,
-            SharePartitionManager sharePartitionManager) {
+            SharePartitionManager sharePartitionManager,
+            Map<TopicIdPartition, SharePartition> sharePartitions) {
         super(shareFetchData.fetchParams().maxWaitMs, Option.empty());
         this.shareFetchData = shareFetchData;
         this.replicaManager = replicaManager;
-        this.topicPartitionDataFromTryComplete = new LinkedHashMap<>();
+        this.partitionsToComplete = new LinkedHashMap<>();
+        this.partitionsAlreadyFetched = new LinkedHashMap<>();

Review Comment:
   Again we reset `partitionsAlreadyFetched` to response from 
`replicaManagerReadResponse`. Why to have such instances created when anyways 
we have to re-assign?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,51 @@ public void onComplete() {
      */
     @Override
     public boolean tryComplete() {
-        topicPartitionDataFromTryComplete = acquirablePartitions();
+        if (anySharePartitionNoLongerExists()) {
+            return forceComplete();
+        }
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
acquirablePartitions();
 
-        if (!topicPartitionDataFromTryComplete.isEmpty()) {
+        try {
+            if (!topicPartitionData.isEmpty()) {
+                // In case, fetch offset metadata doesn't exist for one or 
more topic partitions, we do a
+                // replicaManager.readFromLog to populate the offset metadata 
and update the fetch offset metadata for
+                // those topic partitions.
+                Map<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse = 
updateFetchOffsetMetadata(maybeReadFromLog(topicPartitionData));
+                if 
(anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) || 
isMinBytesSatisfied(topicPartitionData)) {
+                    partitionsToComplete = topicPartitionData;
+                    partitionsAlreadyFetched = replicaManagerReadResponse;

Review Comment:
   We are re-assigning the already initalized variables in constructor. I would 
say we should have null check handling in `onComplete` rather creating 
resources which never gets utilized.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,51 @@ public void onComplete() {
      */
     @Override
     public boolean tryComplete() {
-        topicPartitionDataFromTryComplete = acquirablePartitions();
+        if (anySharePartitionNoLongerExists()) {
+            return forceComplete();
+        }
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
acquirablePartitions();
 
-        if (!topicPartitionDataFromTryComplete.isEmpty()) {
+        try {
+            if (!topicPartitionData.isEmpty()) {
+                // In case, fetch offset metadata doesn't exist for one or 
more topic partitions, we do a
+                // replicaManager.readFromLog to populate the offset metadata 
and update the fetch offset metadata for
+                // those topic partitions.
+                Map<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse = 
updateFetchOffsetMetadata(maybeReadFromLog(topicPartitionData));
+                if 
(anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) || 
isMinBytesSatisfied(topicPartitionData)) {
+                    partitionsToComplete = topicPartitionData;
+                    partitionsAlreadyFetched = replicaManagerReadResponse;
+                    boolean completedByMe = forceComplete();
+                    // If invocation of forceComplete is not successful, then 
that means the request is already completed
+                    // hence release the acquired locks.
+                    if (!completedByMe) {
+                        releasePartitionLocks(partitionsToComplete.keySet());
+                        partitionsAlreadyFetched.clear();
+                        partitionsToComplete.clear();
+                    }
+                    return completedByMe;
+                } else {
+                    log.debug("minBytes is not satisfied for the share fetch 
request for group {}, member {}, " +
+                            "topic partitions {}", shareFetchData.groupId(), 
shareFetchData.memberId(),
+                        shareFetchData.partitionMaxBytes().keySet());
+                    releasePartitionLocks(topicPartitionData.keySet());
+                }
+            } else {
+                log.trace("Can't acquire records for any partition in the 
share fetch request for group {}, member {}, " +
+                        "topic partitions {}", shareFetchData.groupId(), 
shareFetchData.memberId(),
+                    shareFetchData.partitionMaxBytes().keySet());
+            }
+            return false;
+        } catch (Exception e) {
+            log.error("Error processing delayed share fetch request", e);

Review Comment:
   Seems an incorrect error handling of release acquired partitions to me. Say 
line 155 acquires partitions and `topicPartitionData` is set. And we get an 
exception in `isMinBytesSatisfied` method (which anyways call 
getPartitionOrException method) or elsewhere then the locks released at line 
193 will not release any locks as they are invoked on `partitionsToComplete` 
which is not yet set. Moreover if forceComplete call is successful then the 
acquired partitions will anyways not be released.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +243,155 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
-        topicIdPartitions.forEach(tp -> {
-            SharePartition sharePartition = 
sharePartitionManager.sharePartition(groupId, tp);
+    private Map<TopicIdPartition, LogReadResult> 
maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData) {
+        Map<TopicIdPartition, FetchRequest.PartitionData> 
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+        topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+            if (sharePartition.fetchOffsetMetadata().isEmpty()) {
+                
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition, partitionData);
+            }
+        });
+        if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        // We fetch data from replica manager corresponding to the topic 
partitions that have missing fetch offset metadata.
+        return readFromLog(missingFetchOffsetMetadataTopicPartitions);
+    }
+
+    private Map<TopicIdPartition, LogReadResult> updateFetchOffsetMetadata(
+        Map<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponseData.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+            LogReadResult replicaManagerLogReadResult = entry.getValue();
+            if (replicaManagerLogReadResult.error().code() != 
Errors.NONE.code()) {
+                log.debug("Replica manager read log result {} does not contain 
topic partition {}",
+                    replicaManagerReadResponseData, topicIdPartition);
+                continue;
+            }
+            
sharePartition.updateFetchOffsetMetadata(Optional.of(replicaManagerLogReadResult.info().fetchOffsetMetadata));
+        }
+        return replicaManagerReadResponseData;
+    }
+
+    private boolean isMinBytesSatisfied(Map<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData) {
+        long accumulatedSize = 0;
+        for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : 
topicPartitionData.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            FetchRequest.PartitionData partitionData = entry.getValue();
+            LogOffsetMetadata endOffsetMetadata = 
endOffsetMetadataForTopicPartition(topicIdPartition);
+
+            if (endOffsetMetadata == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+                continue;
+
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+
+            Optional<LogOffsetMetadata> optionalFetchOffsetMetadata = 
sharePartition.fetchOffsetMetadata();
+            if (optionalFetchOffsetMetadata.isEmpty() || 
optionalFetchOffsetMetadata.get() == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+                continue;
+            LogOffsetMetadata fetchOffsetMetadata = 
optionalFetchOffsetMetadata.get();
+
+            if (fetchOffsetMetadata.messageOffset > 
endOffsetMetadata.messageOffset) {
+                log.debug("Satisfying delayed share fetch request for group 
{}, member {} since it is fetching later segments of " +
+                    "topicIdPartition {}", shareFetchData.groupId(), 
shareFetchData.memberId(), topicIdPartition);
+                return true;
+            } else if (fetchOffsetMetadata.messageOffset < 
endOffsetMetadata.messageOffset) {
+                if (fetchOffsetMetadata.onOlderSegment(endOffsetMetadata)) {
+                    // This can happen when the fetch operation is falling 
behind the current segment or the partition
+                    // has just rolled a new segment.
+                    log.debug("Satisfying delayed share fetch request for 
group {}, member {} immediately since it is fetching older " +
+                        "segments of topicIdPartition {}", 
shareFetchData.groupId(), shareFetchData.memberId(), topicIdPartition);
+                    return true;
+                } else if 
(fetchOffsetMetadata.onSameSegment(endOffsetMetadata)) {
+                    // we take the partition fetch size as upper bound when 
accumulating the bytes.
+                    long bytesAvailable = 
Math.min(endOffsetMetadata.positionDiff(fetchOffsetMetadata), 
partitionData.maxBytes);
+                    accumulatedSize += bytesAvailable;
+                }
+            }
+        }
+        return accumulatedSize >= shareFetchData.fetchParams().minBytes;
+    }
+
+    private LogOffsetMetadata 
endOffsetMetadataForTopicPartition(TopicIdPartition topicIdPartition) {
+        Partition partition = 
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+        LogOffsetSnapshot offsetSnapshot = 
partition.fetchOffsetSnapshot(Optional.empty(), true);
+        // The FetchIsolation type that we use for share fetch is 
FetchIsolation.HIGH_WATERMARK. In the future, we can
+        // extend it other FetchIsolation types.

Review Comment:
   ```suggestion
           // extend it to support other FetchIsolation types.
   ```



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +243,155 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
-        topicIdPartitions.forEach(tp -> {
-            SharePartition sharePartition = 
sharePartitionManager.sharePartition(groupId, tp);
+    private Map<TopicIdPartition, LogReadResult> 
maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData) {
+        Map<TopicIdPartition, FetchRequest.PartitionData> 
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+        topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+            if (sharePartition.fetchOffsetMetadata().isEmpty()) {
+                
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition, partitionData);
+            }
+        });
+        if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        // We fetch data from replica manager corresponding to the topic 
partitions that have missing fetch offset metadata.
+        return readFromLog(missingFetchOffsetMetadataTopicPartitions);
+    }
+
+    private Map<TopicIdPartition, LogReadResult> updateFetchOffsetMetadata(
+        Map<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponseData.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+            LogReadResult replicaManagerLogReadResult = entry.getValue();
+            if (replicaManagerLogReadResult.error().code() != 
Errors.NONE.code()) {
+                log.debug("Replica manager read log result {} does not contain 
topic partition {}",

Review Comment:
   Isn't the log incorrect as it says the the log does not contain 
topicIdPartition rather the response exists but it errored. Also do you need to 
log `replicaManagerLogReadResult` or complete `replicaManagerReadResponseData`? 
Shouldn't we be logging former which corresponds to topic id partition?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -103,34 +113,27 @@ public void onComplete() {
             topicPartitionData, shareFetchData.groupId(), 
shareFetchData.fetchParams());
 
         try {
-            Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(
-                shareFetchData.fetchParams(),
-                CollectionConverters.asScala(
-                    topicPartitionData.entrySet().stream().map(entry ->
-                        new Tuple2<>(entry.getKey(), 
entry.getValue())).collect(Collectors.toList())
-                ),
-                QuotaFactory.UNBOUNDED_QUOTA,
-                true);
-
-            Map<TopicIdPartition, FetchPartitionData> responseData = new 
HashMap<>();
-            responseLogResult.foreach(tpLogResult -> {
-                TopicIdPartition topicIdPartition = tpLogResult._1();
-                LogReadResult logResult = tpLogResult._2();
-                FetchPartitionData fetchPartitionData = 
logResult.toFetchPartitionData(false);
-                responseData.put(topicIdPartition, fetchPartitionData);
-                return BoxedUnit.UNIT;
-            });
-
-            log.trace("Data successfully retrieved by replica manager: {}", 
responseData);
+            Map<TopicIdPartition, LogReadResult> responseData;
+            if (partitionsAlreadyFetched.isEmpty())
+                responseData = readFromLog(topicPartitionData);
+            else
+                // There can't be a case when we have a non-empty 
logReadResponse, and we have a fresh topicPartitionData
+                // using tryComplete because purgatory ensures that 2 
tryCompletes calls do not happen at the same time.

Review Comment:
   I understand there can't be concurrent calls to tryComplete but I didn't get 
this comment. Though forceComplete() cannot be called twice. But 
forceComplete() on expiration can be on different thread and tryComplete() on 
different can this change any behaviour here?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,51 @@ public void onComplete() {
      */
     @Override
     public boolean tryComplete() {
-        topicPartitionDataFromTryComplete = acquirablePartitions();
+        if (anySharePartitionNoLongerExists()) {
+            return forceComplete();
+        }
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
acquirablePartitions();
 
-        if (!topicPartitionDataFromTryComplete.isEmpty()) {
+        try {
+            if (!topicPartitionData.isEmpty()) {

Review Comment:
   nit: I personally find the code hard to read with nested if/else blocks, 
same is the case here. Though I leave it on you.
   
   ```
   if (topicPartitionData.isEmpty()) {
        log.trace("Can't acquire records for any partition in the share fetch 
request for group {}, member {}, " +
                           "topic partitions {}", shareFetchData.groupId(), 
shareFetchData.memberId(),
                       shareFetchData.partitionMaxBytes().keySet());
       return false;
   }
   
   // In case, fetch offset metadata doesn't exist for one or more topic 
partitions, we do a
   // replicaManager.readFromLog to populate the offset metadata and update the 
fetch offset metadata for
   // those topic partitions.
   Map<TopicIdPartition, LogReadResult> replicaManagerReadResponse = 
updateFetchOffsetMetadata(maybeReadFromLog(topicPartitionData));
   if (!anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) && 
!isMinBytesSatisfied(topicPartitionData)) {
          log.debug("minBytes is not satisfied for the share fetch request for 
group {}, member {}, " +
                               "topic partitions {}", shareFetchData.groupId(), 
shareFetchData.memberId(),
          shareFetchData.partitionMaxBytes().keySet());
          releasePartitionLocks(topicPartitionData.keySet());
          return false;
   }
   
   partitionsToComplete = topicPartitionData;
   partitionsAlreadyFetched = replicaManagerReadResponse;
   ....
   ....
   



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +243,155 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
-        topicIdPartitions.forEach(tp -> {
-            SharePartition sharePartition = 
sharePartitionManager.sharePartition(groupId, tp);
+    private Map<TopicIdPartition, LogReadResult> 
maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData) {
+        Map<TopicIdPartition, FetchRequest.PartitionData> 
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+        topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+            if (sharePartition.fetchOffsetMetadata().isEmpty()) {
+                
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition, partitionData);
+            }
+        });
+        if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        // We fetch data from replica manager corresponding to the topic 
partitions that have missing fetch offset metadata.
+        return readFromLog(missingFetchOffsetMetadataTopicPartitions);
+    }
+
+    private Map<TopicIdPartition, LogReadResult> updateFetchOffsetMetadata(
+        Map<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponseData.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+            LogReadResult replicaManagerLogReadResult = entry.getValue();
+            if (replicaManagerLogReadResult.error().code() != 
Errors.NONE.code()) {
+                log.debug("Replica manager read log result {} does not contain 
topic partition {}",
+                    replicaManagerReadResponseData, topicIdPartition);
+                continue;
+            }
+            
sharePartition.updateFetchOffsetMetadata(Optional.of(replicaManagerLogReadResult.info().fetchOffsetMetadata));
+        }
+        return replicaManagerReadResponseData;
+    }
+
+    private boolean isMinBytesSatisfied(Map<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData) {
+        long accumulatedSize = 0;
+        for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : 
topicPartitionData.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            FetchRequest.PartitionData partitionData = entry.getValue();
+            LogOffsetMetadata endOffsetMetadata = 
endOffsetMetadataForTopicPartition(topicIdPartition);
+
+            if (endOffsetMetadata == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+                continue;
+
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+
+            Optional<LogOffsetMetadata> optionalFetchOffsetMetadata = 
sharePartition.fetchOffsetMetadata();
+            if (optionalFetchOffsetMetadata.isEmpty() || 
optionalFetchOffsetMetadata.get() == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
+                continue;
+            LogOffsetMetadata fetchOffsetMetadata = 
optionalFetchOffsetMetadata.get();
+
+            if (fetchOffsetMetadata.messageOffset > 
endOffsetMetadata.messageOffset) {
+                log.debug("Satisfying delayed share fetch request for group 
{}, member {} since it is fetching later segments of " +
+                    "topicIdPartition {}", shareFetchData.groupId(), 
shareFetchData.memberId(), topicIdPartition);
+                return true;
+            } else if (fetchOffsetMetadata.messageOffset < 
endOffsetMetadata.messageOffset) {
+                if (fetchOffsetMetadata.onOlderSegment(endOffsetMetadata)) {
+                    // This can happen when the fetch operation is falling 
behind the current segment or the partition
+                    // has just rolled a new segment.
+                    log.debug("Satisfying delayed share fetch request for 
group {}, member {} immediately since it is fetching older " +
+                        "segments of topicIdPartition {}", 
shareFetchData.groupId(), shareFetchData.memberId(), topicIdPartition);
+                    return true;
+                } else if 
(fetchOffsetMetadata.onSameSegment(endOffsetMetadata)) {
+                    // we take the partition fetch size as upper bound when 
accumulating the bytes.
+                    long bytesAvailable = 
Math.min(endOffsetMetadata.positionDiff(fetchOffsetMetadata), 
partitionData.maxBytes);
+                    accumulatedSize += bytesAvailable;
+                }
+            }
+        }
+        return accumulatedSize >= shareFetchData.fetchParams().minBytes;
+    }
+
+    private LogOffsetMetadata 
endOffsetMetadataForTopicPartition(TopicIdPartition topicIdPartition) {
+        Partition partition = 
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());

Review Comment:
   What @junrao meant was that it should not be a top level rather partition 
level error.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +243,155 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
-        topicIdPartitions.forEach(tp -> {
-            SharePartition sharePartition = 
sharePartitionManager.sharePartition(groupId, tp);
+    private Map<TopicIdPartition, LogReadResult> 
maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData) {
+        Map<TopicIdPartition, FetchRequest.PartitionData> 
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+        topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+            if (sharePartition.fetchOffsetMetadata().isEmpty()) {
+                
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition, partitionData);
+            }
+        });
+        if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        // We fetch data from replica manager corresponding to the topic 
partitions that have missing fetch offset metadata.
+        return readFromLog(missingFetchOffsetMetadataTopicPartitions);
+    }
+
+    private Map<TopicIdPartition, LogReadResult> updateFetchOffsetMetadata(
+        Map<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
+        for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponseData.entrySet()) {

Review Comment:
   nit: Will `forEach` be more convenient here then you don't need to call 
entry.getKey and entry.getValue?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -103,34 +113,27 @@ public void onComplete() {
             topicPartitionData, shareFetchData.groupId(), 
shareFetchData.fetchParams());
 
         try {
-            Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(
-                shareFetchData.fetchParams(),
-                CollectionConverters.asScala(
-                    topicPartitionData.entrySet().stream().map(entry ->
-                        new Tuple2<>(entry.getKey(), 
entry.getValue())).collect(Collectors.toList())
-                ),
-                QuotaFactory.UNBOUNDED_QUOTA,
-                true);
-
-            Map<TopicIdPartition, FetchPartitionData> responseData = new 
HashMap<>();
-            responseLogResult.foreach(tpLogResult -> {
-                TopicIdPartition topicIdPartition = tpLogResult._1();
-                LogReadResult logResult = tpLogResult._2();
-                FetchPartitionData fetchPartitionData = 
logResult.toFetchPartitionData(false);
-                responseData.put(topicIdPartition, fetchPartitionData);
-                return BoxedUnit.UNIT;
-            });
-
-            log.trace("Data successfully retrieved by replica manager: {}", 
responseData);
+            Map<TopicIdPartition, LogReadResult> responseData;
+            if (partitionsAlreadyFetched.isEmpty())
+                responseData = readFromLog(topicPartitionData);
+            else
+                // There can't be a case when we have a non-empty 
logReadResponse, and we have a fresh topicPartitionData
+                // using tryComplete because purgatory ensures that 2 
tryCompletes calls do not happen at the same time.
+                responseData = combineLogReadResponse(topicPartitionData);
+
+            Map<TopicIdPartition, FetchPartitionData> fetchPartitionsData = 
new LinkedHashMap<>();
+            for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
responseData.entrySet())
+                fetchPartitionsData.put(entry.getKey(), 
entry.getValue().toFetchPartitionData(false));
+
             Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result 
=
-                ShareFetchUtils.processFetchResponse(shareFetchData, 
responseData, sharePartitionManager, replicaManager);
+                ShareFetchUtils.processFetchResponse(shareFetchData, 
fetchPartitionsData, sharePartitionManager, replicaManager);
             shareFetchData.future().complete(result);

Review Comment:
   Can be merged together.
   
   ```
    
shareFetchData.future().complete(ShareFetchUtils.processFetchResponse(shareFetchData,
 fetchPartitionsData, sharePartitionManager, replicaManager));



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,47 @@ public void onComplete() {
      */
     @Override
     public boolean tryComplete() {
-        topicPartitionDataFromTryComplete = acquirablePartitions();
+        if (anySharePartitionNoLongerExists()) {

Review Comment:
   Shouldn't we iterate on `sharePartitions` map passed in delayed share fetch 
which will guarantee that SharePartition cannot be null it can only be fenced 
(fenced handling has been separate), hence no null check is required. Also no 
`anySharePartitionNoLongerExists()` method call is required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to