This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 77aff85b3e9 KAFKA-19268 Missing mocks for SharePartitionManagerTest 
tests (#19786)
77aff85b3e9 is described below

commit 77aff85b3e92c94f922b56aa2aeb8e8265f5ed75
Author: Ji-Seung Ryu <[email protected]>
AuthorDate: Tue May 27 06:17:44 2025 +0900

    KAFKA-19268 Missing mocks for SharePartitionManagerTest tests (#19786)
    
    Added missing mocks for SharePartitionManagerTests.
    
    Reviewers: Abhinav Dixit <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../server/share/SharePartitionManagerTest.java    | 31 ++++++++++++++++++----
 1 file changed, 26 insertions(+), 5 deletions(-)

diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index d4c68fe555b..f74a231e98d 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.AcknowledgeType;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.FencedStateEpochException;
 import org.apache.kafka.common.errors.InvalidRecordStateException;
@@ -43,6 +44,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.ShareFetchResponse;
 import org.apache.kafka.common.requests.ShareRequestMetadata;
@@ -778,9 +780,6 @@ public class SharePartitionManagerTest {
     @Test
     public void testCachedTopicPartitionsForValidShareSessions() {
         ShareSessionCache cache = new ShareSessionCache(10);
-        sharePartitionManager = SharePartitionManagerBuilder.builder()
-            .withCache(cache)
-            .build();
 
         Uuid tpId0 = Uuid.randomUuid();
         Uuid tpId1 = Uuid.randomUuid();
@@ -791,6 +790,24 @@ public class SharePartitionManagerTest {
         String groupId = "grp";
         Uuid memberId1 = Uuid.randomUuid();
         Uuid memberId2 = Uuid.randomUuid();
+        SharePartition sp0 = mock(SharePartition.class);
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
+
+        
when(sp0.releaseAcquiredRecords(ArgumentMatchers.eq(String.valueOf(memberId1)))).thenReturn(CompletableFuture.completedFuture(null));
+        
when(sp1.releaseAcquiredRecords(ArgumentMatchers.eq(String.valueOf(memberId1)))).thenReturn(CompletableFuture.completedFuture(null));
+        
when(sp2.releaseAcquiredRecords(ArgumentMatchers.eq(String.valueOf(memberId1)))).thenReturn(CompletableFuture.completedFuture(null));
+
+        SharePartitionCache partitionCache = new SharePartitionCache();
+        partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
+        partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
+        partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
+
+        sharePartitionManager = SharePartitionManagerBuilder.builder()
+                .withCache(cache)
+                .withPartitionCache(partitionCache)
+                .build();
+
 
         // Create a new share session with an initial share fetch request.
         List<TopicIdPartition> reqData1 = List.of(tp0, tp1);
@@ -1724,7 +1741,7 @@ public class SharePartitionManagerTest {
         // Since acquisition lock for sp1 and sp2 cannot be acquired, we 
should have 2 watched keys.
         assertEquals(2, delayedShareFetchPurgatory.watched());
 
-        doAnswer(invocation -> 
buildLogReadResult(topicIdPartitions)).when(mockReplicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
+        doAnswer(invocation -> 
buildLogReadResult(List.of(tp1))).when(mockReplicaManager).readFromLog(any(), 
any(), any(ReplicaQuota.class), anyBoolean());
 
         Map<TopicIdPartition, List<ShareAcknowledgementBatch>> 
acknowledgeTopics = new HashMap<>();
         acknowledgeTopics.put(tp1, List.of(
@@ -1868,6 +1885,7 @@ public class SharePartitionManagerTest {
 
         SharePartition sp1 = mock(SharePartition.class);
         SharePartition sp2 = mock(SharePartition.class);
+        SharePartition sp3 = mock(SharePartition.class);
 
         ShareSessionCache cache = mock(ShareSessionCache.class);
         ShareSession shareSession = mock(ShareSession.class);
@@ -1882,10 +1900,12 @@ public class SharePartitionManagerTest {
             when(sp2.canAcquireRecords()).thenReturn(true);
             return CompletableFuture.completedFuture(Optional.empty());
         }).when(sp2).releaseAcquiredRecords(ArgumentMatchers.eq(memberId));
+        
when(sp3.releaseAcquiredRecords(ArgumentMatchers.eq(memberId))).thenReturn(CompletableFuture.completedFuture(null));
 
         SharePartitionCache partitionCache = new SharePartitionCache();
         partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
         partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
+        partitionCache.put(new SharePartitionKey(groupId, tp3), sp3);
 
         ShareFetch shareFetch = new ShareFetch(
             FETCH_PARAMS,
@@ -3146,7 +3166,8 @@ public class SharePartitionManagerTest {
     static Seq<Tuple2<TopicIdPartition, LogReadResult>> 
buildLogReadResult(List<TopicIdPartition> topicIdPartitions) {
         List<Tuple2<TopicIdPartition, LogReadResult>> logReadResults = new 
ArrayList<>();
         topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new 
Tuple2<>(topicIdPartition, new LogReadResult(
-            new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), 
MemoryRecords.EMPTY),
+            new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), 
MemoryRecords.withRecords(
+                    Compression.NONE, new SimpleRecord("test-key".getBytes(), 
"test-value".getBytes()))),
             Option.empty(),
             -1L,
             -1L,

Reply via email to