junrao commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1815510878


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1525,6 +1537,24 @@ private Optional<Throwable> acknowledgeCompleteBatch(
         return Optional.empty();
     }
 
+    protected void updateLatestFetchOffsetMetadata(LogOffsetMetadata 
fetchOffsetMetadata) {
+        lock.writeLock().lock();
+        try {
+            latestFetchOffsetMetadata = fetchOffsetMetadata;

Review Comment:
   We need to reset `latestFetchOffsetMetadata` every time `endOffset` changes.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -542,6 +552,8 @@ public ShareAcquiredRecords acquire(
         RecordBatch firstBatch = 
fetchPartitionData.records.batches().iterator().next();
         lock.writeLock().lock();
         try {
+            // Update the latest fetch offset metadata for any future queries.
+            this.latestFetchOffsetMetadata = fetchOffsetMetadata;

Review Comment:
   The `acquire` call always comes after `DelayedShareFetch.tryComplete`, which 
already updates  `latestFetchOffsetMetadata`. So, it seems that we don't need 
to update `latestFetchOffsetMetadata` in `acquire`?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +202,117 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
+    // In case, fetch offset metadata doesn't exist for any topic partition in 
the list of topic partitions, we do a
+    // replicaManager.readFromLog to populate the offset metadata.
+    private void 
maybeUpdateFetchOffsetMetadataForTopicPartitions(Map<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData) {
+        Map<TopicIdPartition, FetchPartitionOffsetData> 
replicaManagerReadResponseData = null;
+        for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : 
topicPartitionData.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            SharePartition sharePartition = 
sharePartitionManager.sharePartition(shareFetchData.groupId(), 
topicIdPartition);
+            if (sharePartition == null) {
+                log.error("Encountered null share partition for groupId={}, 
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+                continue;
+            }
+            if (sharePartition.latestFetchOffsetMetadata() == null) {

Review Comment:
   Perhaps it's clearer to make sharePartition.latestFetchOffsetMetadata() an 
Optional?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -153,13 +141,23 @@ public boolean tryComplete() {
             shareFetchData.groupId(), shareFetchData.memberId(),
             shareFetchData.partitionMaxBytes().keySet());
 
-        topicPartitionDataFromTryComplete = acquirablePartitions();
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
acquirablePartitions();
 
-        if (!topicPartitionDataFromTryComplete.isEmpty())
-            return forceComplete();
-        log.info("Can't acquire records for any partition in the share fetch 
request for group {}, member {}, " +
-                "topic partitions {}", shareFetchData.groupId(),
-                shareFetchData.memberId(), 
shareFetchData.partitionMaxBytes().keySet());
+        if (!topicPartitionData.isEmpty()) {
+            
maybeUpdateFetchOffsetMetadataForTopicPartitions(topicPartitionData);

Review Comment:
   It's possible that `maybeUpdateFetchOffsetMetadataForTopicPartitions` calls 
`readFromLog`, which returns enough bytes. In that case, it's more efficient to 
reuse the fetched result instead of calling `readFromLog` again in `onComplete`.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +202,117 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
+    // In case, fetch offset metadata doesn't exist for any topic partition in 
the list of topic partitions, we do a
+    // replicaManager.readFromLog to populate the offset metadata.
+    private void 
maybeUpdateFetchOffsetMetadataForTopicPartitions(Map<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData) {
+        Map<TopicIdPartition, FetchPartitionOffsetData> 
replicaManagerReadResponseData = null;
+        for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : 
topicPartitionData.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            SharePartition sharePartition = 
sharePartitionManager.sharePartition(shareFetchData.groupId(), 
topicIdPartition);
+            if (sharePartition == null) {
+                log.error("Encountered null share partition for groupId={}, 
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+                continue;
+            }
+            if (sharePartition.latestFetchOffsetMetadata() == null) {
+                if (replicaManagerReadResponseData == null) {
+                    replicaManagerReadResponseData = 
readFromLog(topicPartitionData);

Review Comment:
   Hmm, we don't want to read all partitions if only one partition's offset 
metadata is missing, right?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +202,117 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
+    // In case, fetch offset metadata doesn't exist for any topic partition in 
the list of topic partitions, we do a
+    // replicaManager.readFromLog to populate the offset metadata.
+    private void 
maybeUpdateFetchOffsetMetadataForTopicPartitions(Map<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData) {
+        Map<TopicIdPartition, FetchPartitionOffsetData> 
replicaManagerReadResponseData = null;
+        for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : 
topicPartitionData.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            SharePartition sharePartition = 
sharePartitionManager.sharePartition(shareFetchData.groupId(), 
topicIdPartition);
+            if (sharePartition == null) {
+                log.error("Encountered null share partition for groupId={}, 
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+                continue;
+            }
+            if (sharePartition.latestFetchOffsetMetadata() == null) {
+                if (replicaManagerReadResponseData == null) {
+                    replicaManagerReadResponseData = 
readFromLog(topicPartitionData);
+                }
+                FetchPartitionOffsetData fetchPartitionOffsetData = 
replicaManagerReadResponseData.get(topicIdPartition);
+                if (fetchPartitionOffsetData == null) {
+                    log.debug("Replica manager read log result {} does not 
contain topic partition {}",
+                        replicaManagerReadResponseData, topicIdPartition);
+                    continue;
+                }
+                
sharePartition.updateLatestFetchOffsetMetadata(fetchPartitionOffsetData.logOffsetMetadata());
+            }
+        }
+    }
+
+    private boolean isMinBytesSatisfied(Map<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData) {
+        AtomicLong accumulatedSize = new AtomicLong(0);

Review Comment:
   Could this just be a `long`?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -204,7 +202,117 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
+    // In case, fetch offset metadata doesn't exist for any topic partition in 
the list of topic partitions, we do a
+    // replicaManager.readFromLog to populate the offset metadata.
+    private void 
maybeUpdateFetchOffsetMetadataForTopicPartitions(Map<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData) {
+        Map<TopicIdPartition, FetchPartitionOffsetData> 
replicaManagerReadResponseData = null;
+        for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : 
topicPartitionData.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            SharePartition sharePartition = 
sharePartitionManager.sharePartition(shareFetchData.groupId(), 
topicIdPartition);
+            if (sharePartition == null) {
+                log.error("Encountered null share partition for groupId={}, 
topicIdPartition={}. Skipping it.", shareFetchData.groupId(), topicIdPartition);
+                continue;

Review Comment:
   In DelayedFetch, if a partition no longer exists, we complete the operation 
immediately.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -277,6 +279,12 @@ public static RecordState forId(byte id) {
      */
     private final ReplicaManager replicaManager;
 
+    /**
+     * We maintain the latest fetch offset metadata in order to know the last 
segment position that has been fetched

Review Comment:
   1. Perhaps "We maintain the latest fetch offset metadata to estimate the 
minBytes requirement more efficiently."? 
   2. Also, could we keep it together with `endOffset` since they are related.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to