This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fedbb90c128 KAFKA-19232: Handle Share session limit reached exception
in clients. (#19619)
fedbb90c128 is described below
commit fedbb90c12887a693dfa333e8c7c7cd970f4fd33
Author: Shivsundar R <[email protected]>
AuthorDate: Sun May 4 14:59:40 2025 -0400
KAFKA-19232: Handle Share session limit reached exception in clients.
(#19619)
Handle the new `ShareSessionLimitReachedException` in
`ShareSessionHandler` in the client to reset the ShareSession. Added a
unit test verifying the change.
Reviewers: Andrew Schofield <[email protected]>
---
.../kafka/clients/consumer/internals/ShareSessionHandler.java | 3 ++-
.../clients/consumer/internals/ShareSessionHandlerTest.java | 9 ++++++---
2 files changed, 8 insertions(+), 4 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
index 34a109944be..634a9839c5d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
@@ -219,7 +219,8 @@ public class ShareSessionHandler {
*/
public boolean handleResponse(ShareFetchResponse response, short version) {
if ((response.error() == Errors.SHARE_SESSION_NOT_FOUND) ||
- (response.error() == Errors.INVALID_SHARE_SESSION_EPOCH)) {
+ (response.error() == Errors.INVALID_SHARE_SESSION_EPOCH) ||
+ (response.error() == Errors.SHARE_SESSION_LIMIT_REACHED)) {
log.info("Node {} was unable to process the ShareFetch request
with {}: {}.",
node, nextMetadata, response.error());
nextMetadata = nextMetadata.nextCloseExistingAttemptNew();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
index 5a52b7bc35f..aa5ebb80380 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
@@ -31,6 +31,8 @@ import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.util.ArrayList;
import java.util.Collections;
@@ -149,8 +151,9 @@ public class ShareSessionHandlerTest {
return topicIdPartitionToPartition;
}
- @Test
- public void testShareSession() {
+ @ParameterizedTest
+ @EnumSource(value = Errors.class, names = {"INVALID_SHARE_SESSION_EPOCH",
"SHARE_SESSION_NOT_FOUND", "SHARE_SESSION_LIMIT_REACHED"})
+ public void testShareSession(Errors error) {
String groupId = "G1";
Uuid memberId = Uuid.randomUuid();
ShareSessionHandler handler = new ShareSessionHandler(LOG_CONTEXT, 1,
memberId);
@@ -199,7 +202,7 @@ public class ShareSessionHandlerTest {
handler.handleResponse(resp2, ApiKeys.SHARE_FETCH.latestVersion(true));
// A top-level error code will reset the session epoch
- ShareFetchResponse resp3 =
ShareFetchResponse.of(Errors.INVALID_SHARE_SESSION_EPOCH, 0, new
LinkedHashMap<>(), List.of(), 0);
+ ShareFetchResponse resp3 = ShareFetchResponse.of(error, 0, new
LinkedHashMap<>(), List.of(), 0);
handler.handleResponse(resp3, ApiKeys.SHARE_FETCH.latestVersion(true));
ShareFetchRequestData requestData4 =
handler.newShareFetchBuilder(groupId, fetchConfig).build().data();