apoorvmittal10 commented on code in PR #16842:
URL: https://github.com/apache/kafka/pull/16842#discussion_r1819931435


##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -576,21 +605,24 @@ void processShareFetch(ShareFetchData shareFetchData) {
             addDelayedShareFetch(new DelayedShareFetch(shareFetchData, 
replicaManager, this),
                 delayedShareFetchWatchKeys);
         } catch (Exception e) {
-            // In case exception occurs then release the locks so queue can be 
further processed.
-            log.error("Error processing fetch queue for share partitions", e);
-            if (!shareFetchData.future().isDone()) {
-                shareFetchData.future().completeExceptionally(e);
-            }
+            // Complete the whole fetch request with an exception if there is 
an error processing.
+            // The exception currently can be thrown only if there is an error 
while initializing
+            // the share partition. But skip the processing for other share 
partitions in the request
+            // as this situation is not expected.
+            log.error("Error processing share fetch request", e);
+            maybeCompleteShareFetchWithException(shareFetchData.future(), 
shareFetchData.partitionMaxBytes().keySet(), e);
         }
     }
 
     private SharePartition getOrCreateSharePartition(SharePartitionKey 
sharePartitionKey) {
         return partitionCacheMap.computeIfAbsent(sharePartitionKey,
                 k -> {
                     long start = time.hiResClockMs();
+                    int leaderEpoch = 
ShareFetchUtils.leaderEpoch(replicaManager, 
sharePartitionKey.topicIdPartition().topicPartition());
                     SharePartition partition = new SharePartition(
                             sharePartitionKey.groupId(),
                             sharePartitionKey.topicIdPartition(),
+                            leaderEpoch,

Review Comment:
   Handled at partition level. But I have added a couple of TODOs in the PR to 
take them in follow up as this PR is getting bigger. I am of opinion that we 
can take the changes incrementally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to