This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 77aff85b3e9 KAFKA-19268 Missing mocks for SharePartitionManagerTest
tests (#19786)
77aff85b3e9 is described below
commit 77aff85b3e92c94f922b56aa2aeb8e8265f5ed75
Author: Ji-Seung Ryu <[email protected]>
AuthorDate: Tue May 27 06:17:44 2025 +0900
KAFKA-19268 Missing mocks for SharePartitionManagerTest tests (#19786)
Added missing mocks for SharePartitionManagerTests.
Reviewers: Abhinav Dixit <[email protected]>, Andrew Schofield
<[email protected]>
---
.../server/share/SharePartitionManagerTest.java | 31 ++++++++++++++++++----
1 file changed, 26 insertions(+), 5 deletions(-)
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index d4c68fe555b..f74a231e98d 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
@@ -43,6 +44,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.ShareFetchResponse;
import org.apache.kafka.common.requests.ShareRequestMetadata;
@@ -778,9 +780,6 @@ public class SharePartitionManagerTest {
@Test
public void testCachedTopicPartitionsForValidShareSessions() {
ShareSessionCache cache = new ShareSessionCache(10);
- sharePartitionManager = SharePartitionManagerBuilder.builder()
- .withCache(cache)
- .build();
Uuid tpId0 = Uuid.randomUuid();
Uuid tpId1 = Uuid.randomUuid();
@@ -791,6 +790,24 @@ public class SharePartitionManagerTest {
String groupId = "grp";
Uuid memberId1 = Uuid.randomUuid();
Uuid memberId2 = Uuid.randomUuid();
+ SharePartition sp0 = mock(SharePartition.class);
+ SharePartition sp1 = mock(SharePartition.class);
+ SharePartition sp2 = mock(SharePartition.class);
+
+
when(sp0.releaseAcquiredRecords(ArgumentMatchers.eq(String.valueOf(memberId1)))).thenReturn(CompletableFuture.completedFuture(null));
+
when(sp1.releaseAcquiredRecords(ArgumentMatchers.eq(String.valueOf(memberId1)))).thenReturn(CompletableFuture.completedFuture(null));
+
when(sp2.releaseAcquiredRecords(ArgumentMatchers.eq(String.valueOf(memberId1)))).thenReturn(CompletableFuture.completedFuture(null));
+
+ SharePartitionCache partitionCache = new SharePartitionCache();
+ partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
+ partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
+ partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
+
+ sharePartitionManager = SharePartitionManagerBuilder.builder()
+ .withCache(cache)
+ .withPartitionCache(partitionCache)
+ .build();
+
// Create a new share session with an initial share fetch request.
List<TopicIdPartition> reqData1 = List.of(tp0, tp1);
@@ -1724,7 +1741,7 @@ public class SharePartitionManagerTest {
// Since acquisition lock for sp1 and sp2 cannot be acquired, we
should have 2 watched keys.
assertEquals(2, delayedShareFetchPurgatory.watched());
- doAnswer(invocation ->
buildLogReadResult(topicIdPartitions)).when(mockReplicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
+ doAnswer(invocation ->
buildLogReadResult(List.of(tp1))).when(mockReplicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
Map<TopicIdPartition, List<ShareAcknowledgementBatch>>
acknowledgeTopics = new HashMap<>();
acknowledgeTopics.put(tp1, List.of(
@@ -1868,6 +1885,7 @@ public class SharePartitionManagerTest {
SharePartition sp1 = mock(SharePartition.class);
SharePartition sp2 = mock(SharePartition.class);
+ SharePartition sp3 = mock(SharePartition.class);
ShareSessionCache cache = mock(ShareSessionCache.class);
ShareSession shareSession = mock(ShareSession.class);
@@ -1882,10 +1900,12 @@ public class SharePartitionManagerTest {
when(sp2.canAcquireRecords()).thenReturn(true);
return CompletableFuture.completedFuture(Optional.empty());
}).when(sp2).releaseAcquiredRecords(ArgumentMatchers.eq(memberId));
+
when(sp3.releaseAcquiredRecords(ArgumentMatchers.eq(memberId))).thenReturn(CompletableFuture.completedFuture(null));
SharePartitionCache partitionCache = new SharePartitionCache();
partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
+ partitionCache.put(new SharePartitionKey(groupId, tp3), sp3);
ShareFetch shareFetch = new ShareFetch(
FETCH_PARAMS,
@@ -3146,7 +3166,8 @@ public class SharePartitionManagerTest {
static Seq<Tuple2<TopicIdPartition, LogReadResult>>
buildLogReadResult(List<TopicIdPartition> topicIdPartitions) {
List<Tuple2<TopicIdPartition, LogReadResult>> logReadResults = new
ArrayList<>();
topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new
Tuple2<>(topicIdPartition, new LogReadResult(
- new FetchDataInfo(new LogOffsetMetadata(0, 0, 0),
MemoryRecords.EMPTY),
+ new FetchDataInfo(new LogOffsetMetadata(0, 0, 0),
MemoryRecords.withRecords(
+ Compression.NONE, new SimpleRecord("test-key".getBytes(),
"test-value".getBytes()))),
Option.empty(),
-1L,
-1L,