junrao commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1826157479


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1576,6 +1582,34 @@ private Optional<Throwable> acknowledgeCompleteBatch(
         return Optional.empty();
     }
 
+    protected void updateEndOffsetAndFetchOffsetMetadata(long endOffset, 
Optional<LogOffsetMetadata> fetchOffsetMetadata) {

Review Comment:
   There is no need to pass in `fetchOffsetMetadata` since it's always empty.
   
   updateEndOffsetAndFetchOffsetMetadata => 
updateEndOffsetAndResetFetchOffsetMetadata?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,47 @@ public void onComplete() {
      */
     @Override
     public boolean tryComplete() {
-        topicPartitionDataFromTryComplete = acquirablePartitions();
+        if (anySharePartitionNoLongerExists()) {

Review Comment:
   This problem is still there?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,47 @@ public void onComplete() {
      */
     @Override
     public boolean tryComplete() {
-        topicPartitionDataFromTryComplete = acquirablePartitions();
+        if (anySharePartitionNoLongerExists()) {
+            return forceComplete();
+        }
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
acquirablePartitions();
 
-        if (!topicPartitionDataFromTryComplete.isEmpty()) {
+        try {
+            if (!topicPartitionData.isEmpty()) {
+                Map<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse = 
maybeReadFromLogAndUpdateFetchOffsetMetadata(topicPartitionData);
+                if 
(anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) || 
isMinBytesSatisfied(topicPartitionData)) {
+                    partitionsToComplete = topicPartitionData;
+                    if (!replicaManagerReadResponse.isEmpty())
+                        partitionsAlreadyFetched = replicaManagerReadResponse;
+                    boolean completedByMe = forceComplete();
+                    // If invocation of forceComplete is not successful, then 
that means the request is already completed
+                    // hence release the acquired locks.
+                    if (!completedByMe) {
+                        releasePartitionLocks(partitionsToComplete.keySet());

Review Comment:
   Should we reset `partitionsToComplete` and `partitionsAlreadyFetched` too 
when we release the locks?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1576,6 +1582,34 @@ private Optional<Throwable> acknowledgeCompleteBatch(
         return Optional.empty();
     }
 
+    protected void updateEndOffsetAndFetchOffsetMetadata(long endOffset, 
Optional<LogOffsetMetadata> fetchOffsetMetadata) {
+        lock.writeLock().lock();
+        try {
+            this.endOffset = endOffset;

Review Comment:
   Let's be consistent with the usage of `this`. Most other places don't use 
`this`. 



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1576,6 +1582,34 @@ private Optional<Throwable> acknowledgeCompleteBatch(
         return Optional.empty();
     }
 
+    protected void updateEndOffsetAndFetchOffsetMetadata(long endOffset, 
Optional<LogOffsetMetadata> fetchOffsetMetadata) {
+        lock.writeLock().lock();

Review Comment:
   All callers hold the lock. So we could remove the locking here and add a 
comment that the caller is expected to hold the lock when calling this method.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,47 @@ public void onComplete() {
      */
     @Override
     public boolean tryComplete() {
-        topicPartitionDataFromTryComplete = acquirablePartitions();
+        if (anySharePartitionNoLongerExists()) {
+            return forceComplete();
+        }
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
acquirablePartitions();
 
-        if (!topicPartitionDataFromTryComplete.isEmpty()) {
+        try {
+            if (!topicPartitionData.isEmpty()) {
+                Map<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse = 
maybeReadFromLogAndUpdateFetchOffsetMetadata(topicPartitionData);
+                if 
(anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) || 
isMinBytesSatisfied(topicPartitionData)) {
+                    partitionsToComplete = topicPartitionData;
+                    if (!replicaManagerReadResponse.isEmpty())

Review Comment:
   This check is unnecessary since partitionsAlreadyFetched initializes to 
empty.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +239,159 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
-        topicIdPartitions.forEach(tp -> {
-            SharePartition sharePartition = 
sharePartitionManager.sharePartition(groupId, tp);
+    // In case, fetch offset metadata doesn't exist for one or more topic 
partitions, we do a
+    // replicaManager.readFromLog to populate the offset metadata.
+    private Map<TopicIdPartition, LogReadResult> 
maybeReadFromLogAndUpdateFetchOffsetMetadata(Map<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData) {
+        Map<TopicIdPartition, FetchRequest.PartitionData> 
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+        for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : 
topicPartitionData.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+            if (sharePartition.fetchOffsetMetadata().isEmpty()) {
+                
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition, 
entry.getValue());
+            }
+        }
+        if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        // We fetch data from replica manager corresponding to the topic 
partitions that have missing fetch offset metadata.
+        Map<TopicIdPartition, LogReadResult> replicaManagerReadResponseData = 
readFromLog(missingFetchOffsetMetadataTopicPartitions);
+        return 
updateFetchOffsetMetadataForMissingTopicPartitions(replicaManagerReadResponseData);
+    }
+
+    private Map<TopicIdPartition, LogReadResult> 
updateFetchOffsetMetadataForMissingTopicPartitions(

Review Comment:
   updateFetchOffsetMetadataForMissingTopicPartitions => 
updateFetchOffsetMetadata ?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +239,159 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
-        topicIdPartitions.forEach(tp -> {
-            SharePartition sharePartition = 
sharePartitionManager.sharePartition(groupId, tp);
+    // In case, fetch offset metadata doesn't exist for one or more topic 
partitions, we do a
+    // replicaManager.readFromLog to populate the offset metadata.
+    private Map<TopicIdPartition, LogReadResult> 
maybeReadFromLogAndUpdateFetchOffsetMetadata(Map<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData) {
+        Map<TopicIdPartition, FetchRequest.PartitionData> 
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+        for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : 
topicPartitionData.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+            if (sharePartition.fetchOffsetMetadata().isEmpty()) {
+                
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition, 
entry.getValue());
+            }

Review Comment:
   This code can be a bit simpler.
   
   ```
   topicPartitionData.forEach((topicIdPartition, partitionData) -> {
       SharePartition sharePartition = sharePartitions.get(topicIdPartition);
       if (sharePartition.fetchOffsetMetadata().isEmpty()) {
           missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition, 
partitionData);
       }
   });
   ```



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,47 @@ public void onComplete() {
      */
     @Override
     public boolean tryComplete() {
-        topicPartitionDataFromTryComplete = acquirablePartitions();
+        if (anySharePartitionNoLongerExists()) {
+            return forceComplete();
+        }
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
acquirablePartitions();

Review Comment:
   Should we just assign the return value to partitionsToComplete directly? We 
already acquired the locks for those partitions and partitionsToComplete is the 
only place to track them for releasing.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +239,159 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
-        topicIdPartitions.forEach(tp -> {
-            SharePartition sharePartition = 
sharePartitionManager.sharePartition(groupId, tp);
+    // In case, fetch offset metadata doesn't exist for one or more topic 
partitions, we do a
+    // replicaManager.readFromLog to populate the offset metadata.
+    private Map<TopicIdPartition, LogReadResult> 
maybeReadFromLogAndUpdateFetchOffsetMetadata(Map<TopicIdPartition, 
FetchRequest.PartitionData> topicPartitionData) {
+        Map<TopicIdPartition, FetchRequest.PartitionData> 
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+        for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : 
topicPartitionData.entrySet()) {
+            TopicIdPartition topicIdPartition = entry.getKey();
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+            if (sharePartition.fetchOffsetMetadata().isEmpty()) {
+                
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition, 
entry.getValue());
+            }
+        }
+        if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        // We fetch data from replica manager corresponding to the topic 
partitions that have missing fetch offset metadata.
+        Map<TopicIdPartition, LogReadResult> replicaManagerReadResponseData = 
readFromLog(missingFetchOffsetMetadataTopicPartitions);
+        return 
updateFetchOffsetMetadataForMissingTopicPartitions(replicaManagerReadResponseData);

Review Comment:
   Perhaps do this in the caller? Then the purpose of the method is simpler and 
the method name can just be `maybeReadFromLog`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to