adixitconfluent commented on code in PR #17539:
URL: https://github.com/apache/kafka/pull/17539#discussion_r1828758874


##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -107,23 +109,125 @@ public void 
testDelayedShareFetchTryCompleteReturnsFalse() {
         when(sp0.maybeAcquireFetchLock()).thenReturn(true);
         when(sp1.maybeAcquireFetchLock()).thenReturn(true);
 
-        SharePartitionManager sharePartitionManager = 
mock(SharePartitionManager.class);
-        when(sharePartitionManager.sharePartition(groupId, 
tp0)).thenReturn(sp0);
-        when(sharePartitionManager.sharePartition(groupId, 
tp1)).thenReturn(sp1);
+        Map<TopicIdPartition, SharePartition> sharePartitions = new 
HashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
 
         ShareFetchData shareFetchData = new ShareFetchData(FETCH_PARAMS, 
groupId, Uuid.randomUuid().toString(),
             new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
 
         when(sp0.canAcquireRecords()).thenReturn(false);
         when(sp1.canAcquireRecords()).thenReturn(false);
-        DelayedShareFetch delayedShareFetch = 
DelayedShareFetchBuilder.builder()
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
             .withShareFetchData(shareFetchData)
-            .withSharePartitionManager(sharePartitionManager)
-            .build();
+            .withSharePartitions(sharePartitions)
+            .build());
 
         // Since there is no partition that can be acquired, tryComplete 
should return false.
         assertFalse(delayedShareFetch.tryComplete());
         assertFalse(delayedShareFetch.isCompleted());
+        Mockito.verify(delayedShareFetch, 
times(0)).releasePartitionLocks(any());
+    }
+
+    @Test
+    public void 
testDelayedShareFetchTryCompleteReturnsFalseDueToMinBytesOnFirstSharePartitionFetch()
 {
+        String groupId = "grp";
+        Uuid topicId = Uuid.randomUuid();
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(topicId, new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(topicId, new 
TopicPartition("foo", 1));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+        partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+        partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+
+        Map<TopicIdPartition, SharePartition> sharePartitions = new 
HashMap<>();
+        sharePartitions.put(tp0, sp0);
+        sharePartitions.put(tp1, sp1);
+
+        ShareFetchData shareFetchData = new ShareFetchData(
+            new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS,
+                2, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()), groupId, Uuid.randomUuid().toString(),
+            new CompletableFuture<>(), partitionMaxBytes, MAX_FETCH_RECORDS);
+
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(false);
+        when(sp0.acquire(any(), anyInt(), any())).thenReturn(
+            ShareAcquiredRecords.fromAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+
+        // We are testing the case when the share partition is getting fetched 
for the first time, hence we are using 1
+        // as the file position, so it doesn't satisfy the minBytes(2).
+        when(sp0.fetchOffsetMetadata()).thenReturn(Optional.of(new 
LogOffsetMetadata(0, 1, 0)));
+        LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, 1);
+        mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);
+
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetchData)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .build());
+        assertFalse(delayedShareFetch.isCompleted());
+
+        // Since sp1 can be acquired, tryComplete should return true.

Review Comment:
   my bad, I've corrected it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to