apoorvmittal10 commented on code in PR #16842:
URL: https://github.com/apache/kafka/pull/16842#discussion_r1823155007


##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -2157,19 +2153,203 @@ public void testShareFetchProcessingExceptions() 
throws Exception {
             future::isDone,
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
             () -> "Processing for delayed share fetch request not finished.");
-        assertTrue(future.isCompletedExceptionally());
-        assertFutureThrows(future, RuntimeException.class, "Error creating 
instance");
+        validateShareFetchFutureException(future, tp0, 
Errors.UNKNOWN_SERVER_ERROR, "Error creating instance");
+    }
+
+    @Test
+    public void testSharePartitionInitializationFailure() throws Exception {
+        String groupId = "grp";
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = 
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
+
+        // Send map to check no share partition is created.
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        // Validate when partition is not the leader.
+        Partition partition = mock(Partition.class);
+        when(partition.isLeader()).thenReturn(false);
+
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        // First check should throw KafkaStorageException, second check should 
return partition which
+        // is not leader.
+        when(replicaManager.getPartitionOrException(any()))
+            .thenThrow(new KafkaStorageException("Exception"))
+            .thenReturn(partition);
+        SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+            .withReplicaManager(replicaManager)
+            .withPartitionCacheMap(partitionCacheMap)
+            .build();
 
-        // Throw exception from share partition for second fetch request.
-        when(sp0.maybeInitialize()).thenThrow(new RuntimeException("Error 
initializing instance"));
+        // Validate when exception is thrown.
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
+            sharePartitionManager.fetchMessages(groupId, 
Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
+        TestUtils.waitForCondition(
+            future::isDone,
+            DELAYED_SHARE_FETCH_TIMEOUT_MS,
+            () -> "Processing for delayed share fetch request not finished.");
+        validateShareFetchFutureException(future, tp0, 
Errors.KAFKA_STORAGE_ERROR, "Exception");
+        assertTrue(partitionCacheMap.isEmpty());
 
+        // Validate when partition is not leader.
         future = sharePartitionManager.fetchMessages(groupId, 
Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
         TestUtils.waitForCondition(
             future::isDone,
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
             () -> "Processing for delayed share fetch request not finished.");
-        assertTrue(future.isCompletedExceptionally());
-        assertFutureThrows(future, RuntimeException.class, "Error initializing 
instance");
+        validateShareFetchFutureException(future, tp0, 
Errors.NOT_LEADER_OR_FOLLOWER);
+        assertTrue(partitionCacheMap.isEmpty());
+    }
+
+    @Test
+    public void testSharePartitionPartialInitializationFailure() throws 
Exception {
+        String groupId = "grp";
+        Uuid memberId1 = Uuid.randomUuid();
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(memberId1, new 
TopicPartition("foo", 1));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = Map.of(tp0, 
PARTITION_MAX_BYTES, tp1, PARTITION_MAX_BYTES);
+
+        // Mark partition1 as not the leader.
+        Partition partition1 = mock(Partition.class);
+        when(partition1.isLeader()).thenReturn(false);
+
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        when(replicaManager.getPartitionOrException(any()))
+            .thenReturn(partition1);
+
+        SharePartition sp1 = mock(SharePartition.class);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+        when(sp1.acquire(anyString(), anyInt(), any())).thenReturn(new 
ShareAcquiredRecords(Collections.emptyList(), 0));
+
+        DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
+            "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
+            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+        mockReplicaManagerDelayedShareFetch(replicaManager, 
delayedShareFetchPurgatory);
+
+        doAnswer(invocation -> 
buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
+
+        SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+            .withReplicaManager(replicaManager)
+            .withPartitionCacheMap(partitionCacheMap)
+            .build();
+
+        // Validate when exception is thrown.
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
+            sharePartitionManager.fetchMessages(groupId, 
Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
+        assertTrue(future.isDone());
+        assertFalse(future.isCompletedExceptionally());
+
+        Map<TopicIdPartition, PartitionData> partitionDataMap = future.get();
+        // For now only 1 successful partition is included, this will be fixed 
in subsequents PRs.
+        assertEquals(1, partitionDataMap.size());
+        assertTrue(partitionDataMap.containsKey(tp1));
+        assertEquals(Errors.NONE.code(), 
partitionDataMap.get(tp1).errorCode());
+
+        Mockito.verify(replicaManager, times(1)).readFromLog(
+            any(), any(), any(ReplicaQuota.class), anyBoolean());
+    }
+
+    @Test
+    public void testReplicaManagerFetchException() {
+        String groupId = "grp";
+        Uuid memberId = Uuid.randomUuid();
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = 
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
+
+        SharePartition sp0 = mock(SharePartition.class);
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+
+        DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
+            "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
+            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+        mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
+
+        doThrow(new 
RuntimeException("Exception")).when(mockReplicaManager).readFromLog(any(), 
any(), any(ReplicaQuota.class), anyBoolean());
+
+        SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+            .withPartitionCacheMap(partitionCacheMap)
+            .withReplicaManager(mockReplicaManager)
+            .withTimer(mockTimer)
+            .build();
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
+            sharePartitionManager.fetchMessages(groupId, memberId.toString(), 
FETCH_PARAMS, partitionMaxBytes);
+        validateShareFetchFutureException(future, tp0, 
Errors.UNKNOWN_SERVER_ERROR, "Exception");
+        // Verify that the share partition is still in the cache on exception.
+        assertEquals(1, partitionCacheMap.size());
+
+        // Throw NotLeaderOrFollowerException from replica manager fetch which 
should evict instance from the cache.
+        doThrow(new NotLeaderOrFollowerException("Leader 
exception")).when(mockReplicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        future = sharePartitionManager.fetchMessages(groupId, 
memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
+        validateShareFetchFutureException(future, tp0, 
Errors.NOT_LEADER_OR_FOLLOWER, "Leader exception");
+        assertTrue(partitionCacheMap.isEmpty());
+    }
+
+    @Test
+    public void testReplicaManagerFetchMultipleSharePartitionsException() {
+        String groupId = "grp";
+        Uuid memberId = Uuid.randomUuid();
+
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("bar", 0));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+        partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+        partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+        SharePartition sp0 = mock(SharePartition.class);
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+
+        SharePartition sp1 = mock(SharePartition.class);
+        // Do not make the share partition acquirable hence it shouldn't be 
removed from the cache,

Review Comment:
   Thanks, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to