junrao commented on code in PR #16842:
URL: https://github.com/apache/kafka/pull/16842#discussion_r1821630956


##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -490,6 +497,30 @@ public void acknowledgeSessionUpdate(String groupId, 
ShareRequestMetadata reqMet
         }
     }
 
+    /**
+     * The handleFetchException method is used to handle the exception that 
occurred while reading from log.
+     * The method will handle the exception for each topic-partition in the 
request. The share partition
+     * might get removed from the cache.
+     * <p>
+     * The replica read request might error out for one share partition
+     * but as we cannot determine which share partition errored out, we might 
remove all the share partitions
+     * in the request.
+     *
+     * @param groupId The group id in the share fetch request.
+     * @param topicIdPartitions The topic-partitions in the replica read 
request.
+     * @param future The future to complete with the exception.
+     * @param throwable The exception that occurred while fetching messages.
+     */
+    public void handleFetchException(
+        String groupId,
+        Set<TopicIdPartition> topicIdPartitions,
+        CompletableFuture<Map<TopicIdPartition, PartitionData>> future,
+        Throwable throwable
+    ) {
+        topicIdPartitions.forEach(topicIdPartition -> 
handleFencedSharePartitionException(sharePartitionKey(groupId, 
topicIdPartition), throwable));

Review Comment:
   This is weird. We actually don't know which partition causes throwable. 
Ideally, we should just set a top level error instead of applying it on each 
partition. We probably shouldn't remove the SharePartition here since we are 
not sure which partition to remove.



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -617,22 +667,44 @@ private void maybeCompleteInitializationWithException(
             return;
         }
 
-        if (throwable instanceof NotLeaderOrFollowerException || throwable 
instanceof FencedStateEpochException) {
+        // Remove the partition from the cache as it's failed to initialize.
+        partitionCacheMap.remove(sharePartitionKey);
+        // The partition initialization failed, so complete the request with 
the exception.
+        // The server should not be in this state, so log the error on broker 
and surface the same
+        // to the client. The broker should not be in this state, investigate 
the root cause of the error.
+        log.error("Error initializing share partition with key {}", 
sharePartitionKey, throwable);
+        maybeCompleteShareFetchWithException(future, 
Collections.singletonList(sharePartitionKey.topicIdPartition()), throwable);
+    }
+
+    private void handleFencedSharePartitionException(
+        SharePartitionKey sharePartitionKey,
+        Throwable throwable
+    ) {
+        if (throwable instanceof NotLeaderOrFollowerException || throwable 
instanceof FencedStateEpochException ||
+            throwable instanceof GroupIdNotFoundException || throwable 
instanceof UnknownTopicOrPartitionException) {
             log.info("The share partition with key {} is fenced: {}", 
sharePartitionKey, throwable.getMessage());
             // The share partition is fenced hence remove the partition from 
map and let the client retry.
             // But surface the error to the client so client might take some 
action i.e. re-fetch
             // the metadata and retry the fetch on new leader.
-            partitionCacheMap.remove(sharePartitionKey);
-            future.completeExceptionally(throwable);
-            return;
+            SharePartition sharePartition = 
partitionCacheMap.remove(sharePartitionKey);
+            if (sharePartition != null) {
+                sharePartition.markFenced();
+            }
         }
+    }
 
-        // The partition initialization failed, so complete the request with 
the exception.
-        // The server should not be in this state, so log the error on broker 
and surface the same
-        // to the client. As of now this state is in-recoverable for the 
broker, and we should
-        // investigate the root cause of the error.
-        log.error("Error initializing share partition with key {}", 
sharePartitionKey, throwable);
-        future.completeExceptionally(throwable);
+    private void 
maybeCompleteShareFetchWithException(CompletableFuture<Map<TopicIdPartition, 
PartitionData>> future,
+        Collection<TopicIdPartition> topicIdPartitions, Throwable throwable) {
+        if (!future.isDone()) {
+            
future.complete(topicIdPartitions.stream().collect(Collectors.toMap(
+                tp -> tp, tp -> new 
PartitionData().setErrorCode(Errors.forException(throwable).code()).setErrorMessage(throwable.getMessage()))));
+        }
+    }
+
+    private void 
completeShareFetchWithException(CompletableFuture<Map<TopicIdPartition, 
PartitionData>> future,
+        Map<TopicIdPartition, Throwable> erroneous) {
+        future.complete(erroneous.entrySet().stream().collect(Collectors.toMap(
+            Map.Entry::getKey, entry -> new 
PartitionData().setErrorCode(Errors.forException(entry.getValue()).code()).setErrorMessage(entry.getValue().getMessage()))));

Review Comment:
   The line is getting too long. Could we avoid calling `entry.getValue()` 
twice?



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -540,57 +571,76 @@ void processShareFetch(ShareFetchData shareFetchData) {
             return;
         }
 
-        try {
-            
shareFetchData.partitionMaxBytes().keySet().forEach(topicIdPartition -> {
-                SharePartitionKey sharePartitionKey = sharePartitionKey(
-                    shareFetchData.groupId(),
-                    topicIdPartition
-                );
-                SharePartition sharePartition = 
getOrCreateSharePartition(sharePartitionKey);
+        // Initialize lazily, if required.
+        Map<TopicIdPartition, Throwable> erroneous = null;
+        Set<DelayedShareFetchKey> delayedShareFetchWatchKeys = new HashSet<>();
+        for (TopicIdPartition topicIdPartition : 
shareFetchData.partitionMaxBytes().keySet()) {
+            SharePartitionKey sharePartitionKey = sharePartitionKey(
+                shareFetchData.groupId(),
+                topicIdPartition
+            );
+
+            SharePartition sharePartition;
+            try {
+                sharePartition = getOrCreateSharePartition(sharePartitionKey);
+            } catch (Exception e) {
+                // Complete the whole fetch request with an exception if there 
is an error processing.
+                // The exception currently can be thrown only if there is an 
error while initializing
+                // the share partition. But skip the processing for other 
share partitions in the request
+                // as this situation is not expected.
+                log.error("Error processing share fetch request", e);
+                if (erroneous == null) {
+                    erroneous = new HashMap<>();

Review Comment:
   It seems it's more intuitive to initialize erroneous as an empty map so that 
we don't need to deal with it being `null`. 



##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -2157,19 +2153,203 @@ public void testShareFetchProcessingExceptions() 
throws Exception {
             future::isDone,
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
             () -> "Processing for delayed share fetch request not finished.");
-        assertTrue(future.isCompletedExceptionally());
-        assertFutureThrows(future, RuntimeException.class, "Error creating 
instance");
+        validateShareFetchFutureException(future, tp0, 
Errors.UNKNOWN_SERVER_ERROR, "Error creating instance");
+    }
+
+    @Test
+    public void testSharePartitionInitializationFailure() throws Exception {
+        String groupId = "grp";
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = 
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
+
+        // Send map to check no share partition is created.
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        // Validate when partition is not the leader.
+        Partition partition = mock(Partition.class);
+        when(partition.isLeader()).thenReturn(false);
+
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        // First check should throw KafkaStorageException, second check should 
return partition which
+        // is not leader.
+        when(replicaManager.getPartitionOrException(any()))
+            .thenThrow(new KafkaStorageException("Exception"))
+            .thenReturn(partition);
+        SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+            .withReplicaManager(replicaManager)
+            .withPartitionCacheMap(partitionCacheMap)
+            .build();
 
-        // Throw exception from share partition for second fetch request.
-        when(sp0.maybeInitialize()).thenThrow(new RuntimeException("Error 
initializing instance"));
+        // Validate when exception is thrown.
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
+            sharePartitionManager.fetchMessages(groupId, 
Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
+        TestUtils.waitForCondition(
+            future::isDone,
+            DELAYED_SHARE_FETCH_TIMEOUT_MS,
+            () -> "Processing for delayed share fetch request not finished.");
+        validateShareFetchFutureException(future, tp0, 
Errors.KAFKA_STORAGE_ERROR, "Exception");
+        assertTrue(partitionCacheMap.isEmpty());
 
+        // Validate when partition is not leader.
         future = sharePartitionManager.fetchMessages(groupId, 
Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
         TestUtils.waitForCondition(
             future::isDone,
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
             () -> "Processing for delayed share fetch request not finished.");
-        assertTrue(future.isCompletedExceptionally());
-        assertFutureThrows(future, RuntimeException.class, "Error initializing 
instance");
+        validateShareFetchFutureException(future, tp0, 
Errors.NOT_LEADER_OR_FOLLOWER);
+        assertTrue(partitionCacheMap.isEmpty());
+    }
+
+    @Test
+    public void testSharePartitionPartialInitializationFailure() throws 
Exception {
+        String groupId = "grp";
+        Uuid memberId1 = Uuid.randomUuid();
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(memberId1, new 
TopicPartition("foo", 1));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = Map.of(tp0, 
PARTITION_MAX_BYTES, tp1, PARTITION_MAX_BYTES);
+
+        // Mark partition1 as not the leader.
+        Partition partition1 = mock(Partition.class);
+        when(partition1.isLeader()).thenReturn(false);
+
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        when(replicaManager.getPartitionOrException(any()))
+            .thenReturn(partition1);
+
+        SharePartition sp1 = mock(SharePartition.class);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+        when(sp1.acquire(anyString(), anyInt(), any())).thenReturn(new 
ShareAcquiredRecords(Collections.emptyList(), 0));
+
+        DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
+            "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
+            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+        mockReplicaManagerDelayedShareFetch(replicaManager, 
delayedShareFetchPurgatory);
+
+        doAnswer(invocation -> 
buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
+
+        SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+            .withReplicaManager(replicaManager)
+            .withPartitionCacheMap(partitionCacheMap)
+            .build();
+
+        // Validate when exception is thrown.
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
+            sharePartitionManager.fetchMessages(groupId, 
Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
+        assertTrue(future.isDone());
+        assertFalse(future.isCompletedExceptionally());
+
+        Map<TopicIdPartition, PartitionData> partitionDataMap = future.get();
+        // For now only 1 successful partition is included, this will be fixed 
in subsequents PRs.
+        assertEquals(1, partitionDataMap.size());
+        assertTrue(partitionDataMap.containsKey(tp1));
+        assertEquals(Errors.NONE.code(), 
partitionDataMap.get(tp1).errorCode());
+
+        Mockito.verify(replicaManager, times(1)).readFromLog(
+            any(), any(), any(ReplicaQuota.class), anyBoolean());
+    }
+
+    @Test
+    public void testReplicaManagerFetchException() {
+        String groupId = "grp";
+        Uuid memberId = Uuid.randomUuid();
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = 
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
+
+        SharePartition sp0 = mock(SharePartition.class);
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+
+        DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
+            "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
+            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+        mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
+
+        doThrow(new 
RuntimeException("Exception")).when(mockReplicaManager).readFromLog(any(), 
any(), any(ReplicaQuota.class), anyBoolean());
+
+        SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+            .withPartitionCacheMap(partitionCacheMap)
+            .withReplicaManager(mockReplicaManager)
+            .withTimer(mockTimer)
+            .build();
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
+            sharePartitionManager.fetchMessages(groupId, memberId.toString(), 
FETCH_PARAMS, partitionMaxBytes);
+        validateShareFetchFutureException(future, tp0, 
Errors.UNKNOWN_SERVER_ERROR, "Exception");
+        // Verify that the share partition is still in the cache on exception.
+        assertEquals(1, partitionCacheMap.size());
+
+        // Throw NotLeaderOrFollowerException from replica manager fetch which 
should evict instance from the cache.
+        doThrow(new NotLeaderOrFollowerException("Leader 
exception")).when(mockReplicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        future = sharePartitionManager.fetchMessages(groupId, 
memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
+        validateShareFetchFutureException(future, tp0, 
Errors.NOT_LEADER_OR_FOLLOWER, "Leader exception");
+        assertTrue(partitionCacheMap.isEmpty());
+    }
+
+    @Test
+    public void testReplicaManagerFetchMultipleSharePartitionsException() {
+        String groupId = "grp";
+        Uuid memberId = Uuid.randomUuid();
+
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("bar", 0));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+        partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+        partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+        SharePartition sp0 = mock(SharePartition.class);
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+
+        SharePartition sp1 = mock(SharePartition.class);
+        // Do not make the share partition acquirable hence it shouldn't be 
removed from the cache,
+        // as it won't be part of replica manger readFromLog request.
+        
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+
+        DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
+            "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
+            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+        mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
+
+        // Throw FencedStateEpochException from replica manager fetch which 
should evict instance from the cache.
+        doThrow(new FencedStateEpochException("Fenced 
exception")).when(mockReplicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+            .withPartitionCacheMap(partitionCacheMap)
+            .withReplicaManager(mockReplicaManager)
+            .withTimer(mockTimer)
+            .build();
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
+            sharePartitionManager.fetchMessages(groupId, memberId.toString(), 
FETCH_PARAMS, partitionMaxBytes);
+        validateShareFetchFutureException(future, tp0, 
Errors.FENCED_STATE_EPOCH, "Fenced exception");

Review Comment:
   Hmm, why does the completed future have only 1 partition?



##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -2157,19 +2153,203 @@ public void testShareFetchProcessingExceptions() 
throws Exception {
             future::isDone,
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
             () -> "Processing for delayed share fetch request not finished.");
-        assertTrue(future.isCompletedExceptionally());
-        assertFutureThrows(future, RuntimeException.class, "Error creating 
instance");
+        validateShareFetchFutureException(future, tp0, 
Errors.UNKNOWN_SERVER_ERROR, "Error creating instance");
+    }
+
+    @Test
+    public void testSharePartitionInitializationFailure() throws Exception {
+        String groupId = "grp";
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = 
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
+
+        // Send map to check no share partition is created.
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        // Validate when partition is not the leader.
+        Partition partition = mock(Partition.class);
+        when(partition.isLeader()).thenReturn(false);
+
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        // First check should throw KafkaStorageException, second check should 
return partition which
+        // is not leader.
+        when(replicaManager.getPartitionOrException(any()))
+            .thenThrow(new KafkaStorageException("Exception"))
+            .thenReturn(partition);
+        SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+            .withReplicaManager(replicaManager)
+            .withPartitionCacheMap(partitionCacheMap)
+            .build();
 
-        // Throw exception from share partition for second fetch request.
-        when(sp0.maybeInitialize()).thenThrow(new RuntimeException("Error 
initializing instance"));
+        // Validate when exception is thrown.
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
+            sharePartitionManager.fetchMessages(groupId, 
Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
+        TestUtils.waitForCondition(
+            future::isDone,
+            DELAYED_SHARE_FETCH_TIMEOUT_MS,
+            () -> "Processing for delayed share fetch request not finished.");
+        validateShareFetchFutureException(future, tp0, 
Errors.KAFKA_STORAGE_ERROR, "Exception");
+        assertTrue(partitionCacheMap.isEmpty());
 
+        // Validate when partition is not leader.
         future = sharePartitionManager.fetchMessages(groupId, 
Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
         TestUtils.waitForCondition(
             future::isDone,
             DELAYED_SHARE_FETCH_TIMEOUT_MS,
             () -> "Processing for delayed share fetch request not finished.");
-        assertTrue(future.isCompletedExceptionally());
-        assertFutureThrows(future, RuntimeException.class, "Error initializing 
instance");
+        validateShareFetchFutureException(future, tp0, 
Errors.NOT_LEADER_OR_FOLLOWER);
+        assertTrue(partitionCacheMap.isEmpty());
+    }
+
+    @Test
+    public void testSharePartitionPartialInitializationFailure() throws 
Exception {
+        String groupId = "grp";
+        Uuid memberId1 = Uuid.randomUuid();
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(memberId1, new 
TopicPartition("foo", 1));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = Map.of(tp0, 
PARTITION_MAX_BYTES, tp1, PARTITION_MAX_BYTES);
+
+        // Mark partition1 as not the leader.
+        Partition partition1 = mock(Partition.class);
+        when(partition1.isLeader()).thenReturn(false);
+
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        when(replicaManager.getPartitionOrException(any()))
+            .thenReturn(partition1);
+
+        SharePartition sp1 = mock(SharePartition.class);
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1);
+
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(true);
+        
when(sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+        when(sp1.acquire(anyString(), anyInt(), any())).thenReturn(new 
ShareAcquiredRecords(Collections.emptyList(), 0));
+
+        DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
+            "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
+            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+        mockReplicaManagerDelayedShareFetch(replicaManager, 
delayedShareFetchPurgatory);
+
+        doAnswer(invocation -> 
buildLogReadResult(partitionMaxBytes.keySet())).when(replicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
+
+        SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+            .withReplicaManager(replicaManager)
+            .withPartitionCacheMap(partitionCacheMap)
+            .build();
+
+        // Validate when exception is thrown.
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
+            sharePartitionManager.fetchMessages(groupId, 
Uuid.randomUuid().toString(), FETCH_PARAMS, partitionMaxBytes);
+        assertTrue(future.isDone());
+        assertFalse(future.isCompletedExceptionally());
+
+        Map<TopicIdPartition, PartitionData> partitionDataMap = future.get();
+        // For now only 1 successful partition is included, this will be fixed 
in subsequents PRs.
+        assertEquals(1, partitionDataMap.size());
+        assertTrue(partitionDataMap.containsKey(tp1));
+        assertEquals(Errors.NONE.code(), 
partitionDataMap.get(tp1).errorCode());
+
+        Mockito.verify(replicaManager, times(1)).readFromLog(
+            any(), any(), any(ReplicaQuota.class), anyBoolean());
+    }
+
+    @Test
+    public void testReplicaManagerFetchException() {
+        String groupId = "grp";
+        Uuid memberId = Uuid.randomUuid();
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = 
Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
+
+        SharePartition sp0 = mock(SharePartition.class);
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+        Map<SharePartitionKey, SharePartition> partitionCacheMap = new 
HashMap<>();
+        partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0);
+
+        DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
+            "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
+            DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+        mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
+
+        doThrow(new 
RuntimeException("Exception")).when(mockReplicaManager).readFromLog(any(), 
any(), any(ReplicaQuota.class), anyBoolean());
+
+        SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+            .withPartitionCacheMap(partitionCacheMap)
+            .withReplicaManager(mockReplicaManager)
+            .withTimer(mockTimer)
+            .build();
+
+        CompletableFuture<Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> future =
+            sharePartitionManager.fetchMessages(groupId, memberId.toString(), 
FETCH_PARAMS, partitionMaxBytes);
+        validateShareFetchFutureException(future, tp0, 
Errors.UNKNOWN_SERVER_ERROR, "Exception");
+        // Verify that the share partition is still in the cache on exception.
+        assertEquals(1, partitionCacheMap.size());
+
+        // Throw NotLeaderOrFollowerException from replica manager fetch which 
should evict instance from the cache.
+        doThrow(new NotLeaderOrFollowerException("Leader 
exception")).when(mockReplicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        future = sharePartitionManager.fetchMessages(groupId, 
memberId.toString(), FETCH_PARAMS, partitionMaxBytes);
+        validateShareFetchFutureException(future, tp0, 
Errors.NOT_LEADER_OR_FOLLOWER, "Leader exception");
+        assertTrue(partitionCacheMap.isEmpty());
+    }
+
+    @Test
+    public void testReplicaManagerFetchMultipleSharePartitionsException() {
+        String groupId = "grp";
+        Uuid memberId = Uuid.randomUuid();
+
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("bar", 0));
+        Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
+        partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+        partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+
+        SharePartition sp0 = mock(SharePartition.class);
+        when(sp0.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp0.canAcquireRecords()).thenReturn(true);
+        
when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
+
+        SharePartition sp1 = mock(SharePartition.class);
+        // Do not make the share partition acquirable hence it shouldn't be 
removed from the cache,

Review Comment:
   Hmm, should we explicitly mock `sp1.maybeAcquireFetchLock` to false?



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1053,6 +1085,22 @@ void releaseFetchLock() {
         fetchLock.set(false);
     }
 
+    /**
+     * Marks the share partition as fenced.
+     */
+    void markFenced() {
+        lock.writeLock().lock();
+        try {
+            partitionState = SharePartitionState.FENCED;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private boolean stateNotActive() {
+        return  partitionState() != SharePartitionState.ACTIVE;

Review Comment:
   We probably should throw a fenced exception and let the caller handle it. 
This can be done in a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to