junrao commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1828254033


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1576,6 +1582,30 @@ private Optional<Throwable> acknowledgeCompleteBatch(
         return Optional.empty();
     }
 
+    // The caller of this function is expected to hold lock.writeLock() when 
calling this method.
+    protected void updateEndOffsetAndResetFetchOffsetMetadata(long 
updatedEndOffset) {
+        endOffset = updatedEndOffset;
+        fetchOffsetMetadata = Optional.empty();
+    }
+
+    protected void updateFetchOffsetMetadata(Optional<LogOffsetMetadata> 
fetchOffsetMetadata) {
+        lock.writeLock().lock();
+        try {
+            this.fetchOffsetMetadata = fetchOffsetMetadata;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    protected Optional<LogOffsetMetadata> fetchOffsetMetadata() {
+        lock.readLock().lock();
+        try {
+            return fetchOffsetMetadata;

Review Comment:
   We need to return an optional `fetchOffsetMetadata` if the value returned 
from `nextFetchOffset` changes. If `findNextFetchOffset` is false, 
`nextFetchOffset` returns a value based on endOffset. This case is already 
covered in this PR. If `findNextFetchOffset` is true, `nextFetchOffset` returns 
a value not depending on endOffset. So, we should return empty here if 
`findNextFetchOffset` is true.



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -107,23 +109,125 @@ public void 
testDelayedShareFetchTryCompleteReturnsFalse() {
         when(sp0.maybeAcquireFetchLock()).thenReturn(true);
         when(sp1.maybeAcquireFetchLock()).thenReturn(true);
 
-        SharePartitionManager sharePartitionManager = 
mock(SharePartitionManager.class);
-        when(sharePartitionManager.sharePartition(groupId, 
tp0)).thenReturn(sp0);
-        when(sharePartitionManager.sharePartition(groupId, 
tp1)).thenReturn(sp1);
+        Map<TopicIdPartition, SharePartition> sharePartitions = new 
HashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
 
         ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, 
groupId, Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
 
         when(sp0.canAcquireRecords()).thenReturn(false);
         when(sp1.canAcquireRecords()).thenReturn(false);
-        DelayedShareFetch delayedShareFetch = 
DelayedShareFetchBuilder.builder()
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
             .withShareFetchData(shareFetchData)
-            .withSharePartitionManager(sharePartitionManager)
-            .build();
+            .withSharePartitions(sharePartitions)
+            .build());
 
         // Since there is no partition that can be acquired, tryComplete 
should return false.
         assertFalse(delayedShareFetch.tryComplete());
         assertFalse(delayedShareFetch.isCompleted());
+        Mockito.verify(delayedShareFetch, 
times(0)).releasePartitionLocks(any());
+    }
+
+    @Test
+    public void 
testDelayedShareFetchTryCompleteReturnsFalseDueToMinBytesOnFirstSharePartitionFetch()
 {
+        String groupId = "grp";
+        Uuid topicId = Uuid.randomUuid();
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(topicId, new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(topicId, new 
TopicPartition("foo", 1));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+        partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+        partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+
+        Map<TopicIdPartition, SharePartition> sharePartitions = new 
HashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        ShareFetchData shareFetchData = new ShareFetchData(
+            new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+                2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
+            new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
+
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(false);
+        when(sp0.acquire(any(), anyInt(), any())).thenReturn(
+            ShareAcquiredRecords.fromAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        // We are testing the case when the share partition is getting fetched 
for the first time, hence we are using 1
+        // as the file position, so it doesn't satisfy the minBytes(2).
+        when(sp0.fetchOffsetMetadata()).thenReturn(Optional.of(new 
LogOffsetMetadata(0, 1, 0)));
+        LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, 1);
+        mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetchData)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .build());
+        assertFalse(delayedShareFetch.isCompleted());
+
+        // Since sp1 can be acquired, tryComplete should return true.

Review Comment:
   This comment doesn't match the code.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,47 @@ public void onComplete() {
      */
     @Override
     public boolean tryComplete() {
-        topicPartitionDataFromTryComplete = acquirablePartitions();
+        if (anySharePartitionNoLongerExists()) {

Review Comment:
   It seems that sharePartitions is always a subset of 
shareFetchData.partitionMaxBytes()? If that's the case, I agree that we don't 
need anySharePartitionNoLongerExists. However, it would be useful to make sure 
that the caller passes in sharePartitions and 
shareFetchData.partitionMaxBytes() with the same set of partition keys.



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -385,6 +492,108 @@ public void 
testForceCompleteTriggersDelayedActionsQueue() {
         assertFalse(delayedShareFetch1.isCompleted());
         Mockito.verify(replicaManager, times(1)).addToActionQueue(any());
         Mockito.verify(replicaManager, times(0)).tryCompleteActions();
+        Mockito.verify(delayedShareFetch2, 
times(1)).releasePartitionLocks(any());
+    }
+
+    @Test
+    public void testCombineLogReadResponse() {
+        String groupId = "grp";
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 1));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+        partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+        partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        Map<TopicIdPartition, SharePartition> sharePartitions = new 
HashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+        ShareFetchData shareFetchData = new ShareFetchData(
+            new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+                1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
+            future, partitionMaxBytes, MAX_FETCH_RECORDS);
+
+        DelayedShareFetch delayedShareFetch = 
DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetchData)
+            .withReplicaManager(replicaManager)
+            .withSharePartitions(sharePartitions)
+            .build();
+
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
new HashMap<>();
+        topicPartitionData.put(tp0, mock(FetchRequest.PartitionData.class));
+        topicPartitionData.put(tp1, mock(FetchRequest.PartitionData.class));
+
+        // Case 1 - logReadResponse contains tp0.
+
+        Map<TopicIdPartition, LogReadResult> logReadResponse = 
Collections.singletonMap(
+            tp0, mock(LogReadResult.class));
+        delayedShareFetch.updateLogReadResponse(logReadResponse);
+
+        doAnswer(invocation -> 
buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
+        Map<TopicIdPartition, LogReadResult> combinedLogReadResponse = 
delayedShareFetch.combineLogReadResponse(topicPartitionData);

Review Comment:
   Could we change `combineLogReadResponse` to also take 
`partitionsAlreadyFetched`? This way, we can get rid of 
`delayedShareFetch.updateLogReadResponse`.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -146,18 +149,51 @@ public void onComplete() {
      */
     @Override
     public boolean tryComplete() {
-        topicPartitionDataFromTryComplete = acquirablePartitions();
+        if (anySharePartitionNoLongerExists()) {
+            return forceComplete();
+        }
+        Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = 
acquirablePartitions();
 
-        if (!topicPartitionDataFromTryComplete.isEmpty()) {
+        try {
+            if (!topicPartitionData.isEmpty()) {
+                // In case, fetch offset metadata doesn't exist for one or 
more topic partitions, we do a
+                // replicaManager.readFromLog to populate the offset metadata 
and update the fetch offset metadata for
+                // those topic partitions.
+                Map<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse = 
updateFetchOffsetMetadata(maybeReadFromLog(topicPartitionData));
+                if 
(anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) || 
isMinBytesSatisfied(topicPartitionData)) {
+                    partitionsToComplete = topicPartitionData;
+                    partitionsAlreadyFetched = replicaManagerReadResponse;
+                    boolean completedByMe = forceComplete();
+                    // If invocation of forceComplete is not successful, then 
that means the request is already completed
+                    // hence release the acquired locks.
+                    if (!completedByMe) {
+                        releasePartitionLocks(partitionsToComplete.keySet());
+                        partitionsAlreadyFetched.clear();
+                        partitionsToComplete.clear();
+                    }
+                    return completedByMe;
+                } else {
+                    log.debug("minBytes is not satisfied for the share fetch 
request for group {}, member {}, " +
+                            "topic partitions {}", shareFetchData.groupId(), 
shareFetchData.memberId(),
+                        shareFetchData.partitionMaxBytes().keySet());
+                    releasePartitionLocks(topicPartitionData.keySet());
+                }
+            } else {
+                log.trace("Can't acquire records for any partition in the 
share fetch request for group {}, member {}, " +
+                        "topic partitions {}", shareFetchData.groupId(), 
shareFetchData.memberId(),
+                    shareFetchData.partitionMaxBytes().keySet());
+            }
+            return false;
+        } catch (Exception e) {
+            log.error("Error processing delayed share fetch request", e);
             boolean completedByMe = forceComplete();
             // If invocation of forceComplete is not successful, then that 
means the request is already completed
             // hence release the acquired locks.
             if (!completedByMe) {
-                releasePartitionLocks(shareFetchData.groupId(), 
topicPartitionDataFromTryComplete.keySet());
+                releasePartitionLocks(partitionsToComplete.keySet());

Review Comment:
   Hmm, when we hit an exception, do we guarantee that `partitionsToComplete` 
has been set?



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -107,23 +109,125 @@ public void 
testDelayedShareFetchTryCompleteReturnsFalse() {
         when(sp0.maybeAcquireFetchLock()).thenReturn(true);
         when(sp1.maybeAcquireFetchLock()).thenReturn(true);
 
-        SharePartitionManager sharePartitionManager = 
mock(SharePartitionManager.class);
-        when(sharePartitionManager.sharePartition(groupId, 
tp0)).thenReturn(sp0);
-        when(sharePartitionManager.sharePartition(groupId, 
tp1)).thenReturn(sp1);
+        Map<TopicIdPartition, SharePartition> sharePartitions = new 
HashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
 
         ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, 
groupId, Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
 
         when(sp0.canAcquireRecords()).thenReturn(false);
         when(sp1.canAcquireRecords()).thenReturn(false);
-        DelayedShareFetch delayedShareFetch = 
DelayedShareFetchBuilder.builder()
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
             .withShareFetchData(shareFetchData)
-            .withSharePartitionManager(sharePartitionManager)
-            .build();
+            .withSharePartitions(sharePartitions)
+            .build());
 
         // Since there is no partition that can be acquired, tryComplete 
should return false.
         assertFalse(delayedShareFetch.tryComplete());
         assertFalse(delayedShareFetch.isCompleted());
+        Mockito.verify(delayedShareFetch, 
times(0)).releasePartitionLocks(any());
+    }
+
+    @Test
+    public void 
testDelayedShareFetchTryCompleteReturnsFalseDueToMinBytesOnFirstSharePartitionFetch()
 {
+        String groupId = "grp";
+        Uuid topicId = Uuid.randomUuid();
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(topicId, new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(topicId, new 
TopicPartition("foo", 1));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+        partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+        partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+
+        Map<TopicIdPartition, SharePartition> sharePartitions = new 
HashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        ShareFetchData shareFetchData = new ShareFetchData(
+            new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+                2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
+            new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
+
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(false);
+        when(sp0.acquire(any(), anyInt(), any())).thenReturn(
+            ShareAcquiredRecords.fromAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        // We are testing the case when the share partition is getting fetched 
for the first time, hence we are using 1
+        // as the file position, so it doesn't satisfy the minBytes(2).
+        when(sp0.fetchOffsetMetadata()).thenReturn(Optional.of(new 
LogOffsetMetadata(0, 1, 0)));

Review Comment:
   Hmm, not sure how this test is different from the next one. If this is 
testing fetching for the first time, sp0.fetchOffsetMetadata() should return 
empty, right?



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -207,13 +243,155 @@ Map<TopicIdPartition, FetchRequest.PartitionData> 
acquirablePartitions() {
         return topicPartitionData;
     }
 
-    private void releasePartitionLocks(String groupId, Set<TopicIdPartition> 
topicIdPartitions) {
-        topicIdPartitions.forEach(tp -> {
-            SharePartition sharePartition = 
sharePartitionManager.sharePartition(groupId, tp);
+    private Map<TopicIdPartition, LogReadResult> 
maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData) {
+        Map<TopicIdPartition, FetchRequest.PartitionData> 
missingFetchOffsetMetadataTopicPartitions = new LinkedHashMap<>();
+        topicPartitionData.forEach((topicIdPartition, partitionData) -> {
+            SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
+            if (sharePartition.fetchOffsetMetadata().isEmpty()) {
+                
missingFetchOffsetMetadataTopicPartitions.put(topicIdPartition, partitionData);
+            }
+        });
+        if (missingFetchOffsetMetadataTopicPartitions.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        // We fetch data from replica manager corresponding to the topic 
partitions that have missing fetch offset metadata.
+        return readFromLog(missingFetchOffsetMetadataTopicPartitions);
+    }
+
+    private Map<TopicIdPartition, LogReadResult> updateFetchOffsetMetadata(

Review Comment:
   It's kind of weird for this method to return the input. It's more natural 
for this method to return nothing.



##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -54,18 +59,23 @@ public class DelayedShareFetch extends DelayedOperation {
     private final ShareFetchData shareFetchData;
     private final ReplicaManager replicaManager;
 
-    private Map<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionDataFromTryComplete;
+    private Map<TopicIdPartition, FetchRequest.PartitionData> 
partitionsToComplete;

Review Comment:
   shareFetchData => partitionsToComplete ?
   partitionsToComplete => partitionsAcquired ?



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -107,23 +109,125 @@ public void 
testDelayedShareFetchTryCompleteReturnsFalse() {
         when(sp0.maybeAcquireFetchLock()).thenReturn(true);
         when(sp1.maybeAcquireFetchLock()).thenReturn(true);
 
-        SharePartitionManager sharePartitionManager = 
mock(SharePartitionManager.class);
-        when(sharePartitionManager.sharePartition(groupId, 
tp0)).thenReturn(sp0);
-        when(sharePartitionManager.sharePartition(groupId, 
tp1)).thenReturn(sp1);
+        Map<TopicIdPartition, SharePartition> sharePartitions = new 
HashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
 
         ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, 
groupId, Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
 
         when(sp0.canAcquireRecords()).thenReturn(false);
         when(sp1.canAcquireRecords()).thenReturn(false);
-        DelayedShareFetch delayedShareFetch = 
DelayedShareFetchBuilder.builder()
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
             .withShareFetchData(shareFetchData)
-            .withSharePartitionManager(sharePartitionManager)
-            .build();
+            .withSharePartitions(sharePartitions)
+            .build());
 
         // Since there is no partition that can be acquired, tryComplete 
should return false.
         assertFalse(delayedShareFetch.tryComplete());
         assertFalse(delayedShareFetch.isCompleted());
+        Mockito.verify(delayedShareFetch, 
times(0)).releasePartitionLocks(any());
+    }
+
+    @Test
+    public void 
testDelayedShareFetchTryCompleteReturnsFalseDueToMinBytesOnFirstSharePartitionFetch()
 {

Review Comment:
   Quite a long name. How about sth like 
testTryCompleteReturnsFalseWhenMinBytesNotSatisfied?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to