chirag-wadhwa5 commented on code in PR #20280:
URL: https://github.com/apache/kafka/pull/20280#discussion_r2262861759


##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -1288,6 +1357,62 @@ public void 
testRemoteStorageFetchTryCompleteReturnsFalse() {
         delayedShareFetch.lock().unlock();
     }
 
+    @Test
+    public void testRemoteStorageFetchPartitionLeaderChanged() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+
+        SharePartition sp0 = mock(SharePartition.class);
+
+        when(sp0.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            new CompletableFuture<>(), List.of(tp0), BATCH_SIZE, 
MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+
+        // Fetch offset does not match with the cached entry for sp0, hence, a 
replica manager fetch will happen for sp0.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking remote storage read result for tp0.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), 
Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object does not complete 
within tryComplete in this mock.
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), 
any())).thenReturn(mock(Future.class));
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        Partition p0 = mock(Partition.class);
+        when(p0.isLeader()).thenReturn(false);
+
+        
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
+
+        Uuid fetchId = Uuid.randomUuid();
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0)))
+            .withFetchId(fetchId)
+            .build());
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true);
+
+        assertFalse(delayedShareFetch.isCompleted());

Review Comment:
   Thanks for the review. The DelayedShareFetch should complete, but only after 
tryComplete is called. I have added an assertTrue for this after tryComplete is 
called to make the test better. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to