This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 802431901ab MINOR: Correcting exception codes in Share Partition
(#20028)
802431901ab is described below
commit 802431901ab0356fe65a077130bfe7d6ef249c16
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue Jun 24 12:46:27 2025 +0100
MINOR: Correcting exception codes in Share Partition (#20028)
The PR cherrypicks below commit:
```
commit 1ca8779bee9cdbd53b0cb4536270461a6ed0b6d2
Author: Apoorv Mittal <[email protected]>
Date: Tue Jun 24 09:46:14 2025 +0100
MINOR: Correcting client error for fenced share partition (#20023)
Correct the error when SharePartition is fenced.
Reviewers: Abhinav Dixit <[email protected]>, Sushant Mahajan
<[email protected]>, Andrew Schofield <[email protected]>
```
And selective changes from following commit, which are critical:
```
commit 3d4407ff9daeb3586b049bfd4a70bd374130616d
Author: Sushant Mahajan <[email protected]>
Date: Mon Jun 23 23:57:15 2025 +0530
MINOR: Change exceptions for few error codes in SharePartition.
(#20020)
* The `SharePartition` class wraps the errors received from
`PersisterStateManager` to be sent to the client. * In this PR, we
are categorizing the errors a bit better. * Some exception messages
in `PersisterStateManager` have been updated to show the share
partition key. * Tests have been updated wherever needed.
Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal
<[email protected]>
```
Reviewers: Andrew Schofield <[email protected]>, Sushant Mahajan
<[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 7 ++---
.../kafka/server/share/SharePartitionTest.java | 33 ++++++++++++++++++++--
2 files changed, 32 insertions(+), 8 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 91a11d488f4..09385d6c48c 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
-import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
@@ -1490,7 +1489,7 @@ public class SharePartition {
String.format("Share partition failed to load %s-%s", groupId,
topicIdPartition));
case INITIALIZING -> throw new LeaderNotAvailableException(
String.format("Share partition is already initializing %s-%s",
groupId, topicIdPartition));
- case FENCED -> throw new FencedStateEpochException(
+ case FENCED -> throw new LeaderNotAvailableException(
String.format("Share partition is fenced %s-%s", groupId,
topicIdPartition));
case EMPTY ->
// The share partition is not yet initialized.
@@ -2343,9 +2342,7 @@ public class SharePartition {
new GroupIdNotFoundException(errorMessage);
case UNKNOWN_TOPIC_OR_PARTITION ->
new UnknownTopicOrPartitionException(errorMessage);
- case FENCED_STATE_EPOCH ->
- new FencedStateEpochException(errorMessage);
- case FENCED_LEADER_EPOCH ->
+ case FENCED_LEADER_EPOCH, FENCED_STATE_EPOCH ->
new NotLeaderOrFollowerException(errorMessage);
default ->
new UnknownServerException(errorMessage);
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index ac8e9ba0fd6..0cc7ad4d9a1 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -28,10 +28,10 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
-import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -758,7 +758,7 @@ public class SharePartitionTest {
result = sharePartition.maybeInitialize();
assertTrue(result.isDone());
assertTrue(result.isCompletedExceptionally());
- assertFutureThrows(FencedStateEpochException.class, result);
+ assertFutureThrows(NotLeaderOrFollowerException.class, result);
assertEquals(SharePartitionState.FAILED,
sharePartition.partitionState());
// Mock FENCED_LEADER_EPOCH error.
@@ -788,6 +788,20 @@ public class SharePartitionTest {
assertTrue(result.isCompletedExceptionally());
assertFutureThrows(UnknownServerException.class, result);
assertEquals(SharePartitionState.FAILED,
sharePartition.partitionState());
+
+ // Mock NETWORK_EXCEPTION error.
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 5, 10L,
Errors.NETWORK_EXCEPTION.code(), Errors.NETWORK_EXCEPTION.message(),
+ List.of())))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(UnknownServerException.class, result);
+ assertEquals(SharePartitionState.FAILED,
sharePartition.partitionState());
}
@Test
@@ -935,6 +949,19 @@ public class SharePartitionTest {
assertThrows(RuntimeException.class, sharePartition2::maybeInitialize);
}
+ @Test
+ public void testMaybeInitializeFencedSharePartition() {
+ SharePartition sharePartition =
SharePartitionBuilder.builder().build();
+ // Mark the share partition as fenced.
+ sharePartition.markFenced();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ assertFutureThrows(LeaderNotAvailableException.class, result);
+ assertEquals(SharePartitionState.FENCED,
sharePartition.partitionState());
+ }
+
@Test
public void testMaybeInitializeStateBatchesWithGapAtBeginning() {
Persister persister = Mockito.mock(Persister.class);
@@ -5564,7 +5591,7 @@ public class SharePartitionTest {
result = sharePartition.writeShareGroupState(anyList());
assertTrue(result.isCompletedExceptionally());
- assertFutureThrows(FencedStateEpochException.class, result);
+ assertFutureThrows(NotLeaderOrFollowerException.class, result);
// Mock Write state RPC to return error response, FENCED_LEADER_EPOCH.
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(