This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 80b9abebade KAFKA-19444: Add back JoinGroup v0 & v1 (#20116)
80b9abebade is described below
commit 80b9abebade920148b4295a1bccf84005c5989c4
Author: Ismael Juma <[email protected]>
AuthorDate: Mon Jul 7 08:44:24 2025 -0700
KAFKA-19444: Add back JoinGroup v0 & v1 (#20116)
This fixes librdkafka older than the recently released 2.11.0 with
Kerberos authentication and Apache Kafka 4.x.
Even though this is a bug in librdkafka, a key goal of KIP-896 is not to
break the popular client libraries listed in it. Adding back JoinGroup
v0 & v1 is a very small change and worth it from that perspective.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../resources/common/message/JoinGroupRequest.json | 4 +---
.../resources/common/message/JoinGroupResponse.json | 4 +---
.../kafka/common/requests/JoinGroupRequestTest.java | 19 +++++++++++++++++++
.../kafka/common/requests/RequestResponseTest.java | 8 ++++++++
4 files changed, 29 insertions(+), 6 deletions(-)
diff --git a/clients/src/main/resources/common/message/JoinGroupRequest.json
b/clients/src/main/resources/common/message/JoinGroupRequest.json
index 41d7c1acbae..31afdb1a32a 100644
--- a/clients/src/main/resources/common/message/JoinGroupRequest.json
+++ b/clients/src/main/resources/common/message/JoinGroupRequest.json
@@ -18,8 +18,6 @@
"type": "request",
"listeners": ["broker"],
"name": "JoinGroupRequest",
- // Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new
baseline.
- //
// Version 1 adds RebalanceTimeoutMs. Version 2 and 3 are the same as
version 1.
//
// Starting from version 4, the client needs to issue a second request to
join group
@@ -34,7 +32,7 @@
// Version 8 adds the Reason field (KIP-800).
//
// Version 9 is the same as version 8.
- "validVersions": "2-9",
+ "validVersions": "0-9",
"flexibleVersions": "6+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType":
"groupId",
diff --git a/clients/src/main/resources/common/message/JoinGroupResponse.json
b/clients/src/main/resources/common/message/JoinGroupResponse.json
index 364309596eb..d2f016f62f6 100644
--- a/clients/src/main/resources/common/message/JoinGroupResponse.json
+++ b/clients/src/main/resources/common/message/JoinGroupResponse.json
@@ -17,8 +17,6 @@
"apiKey": 11,
"type": "response",
"name": "JoinGroupResponse",
- // Versions 0-1 were removed in Apache Kafka 4.0, Version 2 is the new
baseline.
- //
// Version 1 is the same as version 0.
//
// Version 2 adds throttle time.
@@ -37,7 +35,7 @@
// Version 8 is the same as version 7.
//
// Version 9 adds the SkipAssignment field.
- "validVersions": "2-9",
+ "validVersions": "0-9",
"flexibleVersions": "6+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "2+",
"ignorable": true,
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
index 60d10a68939..a3301908957 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/JoinGroupRequestTest.java
@@ -19,12 +19,15 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.JoinGroupRequestData;
+import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
+import java.nio.ByteBuffer;
import java.util.Arrays;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
@@ -65,4 +68,20 @@ public class JoinGroupRequestTest {
.setProtocolType("consumer")
).build((short) 4));
}
+
+ @Test
+ public void testRebalanceTimeoutDefaultsToSessionTimeoutV0() {
+ int sessionTimeoutMs = 30000;
+ short version = 0;
+
+ ByteBuffer buffer = MessageUtil.toByteBuffer(new JoinGroupRequestData()
+ .setGroupId("groupId")
+ .setMemberId("consumerId")
+ .setProtocolType("consumer")
+ .setSessionTimeoutMs(sessionTimeoutMs), version);
+
+ JoinGroupRequest request = JoinGroupRequest.parse(buffer, version);
+ assertEquals(sessionTimeoutMs, request.data().sessionTimeoutMs());
+ assertEquals(sessionTimeoutMs, request.data().rebalanceTimeoutMs());
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 6578302e81e..ad52482a3b3 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -665,6 +665,14 @@ public class RequestResponseTest {
assertEquals(request.isolationLevel(), deserialized.isolationLevel());
}
+ @Test
+ public void testJoinGroupRequestV0RebalanceTimeout() {
+ final short version = 0;
+ JoinGroupRequest jgr = createJoinGroupRequest(version);
+ JoinGroupRequest jgr2 = JoinGroupRequest.parse(jgr.serialize(),
version);
+ assertEquals(jgr2.data().rebalanceTimeoutMs(),
jgr.data().rebalanceTimeoutMs());
+ }
+
@Test
public void testSerializeWithHeader() {
CreatableTopicCollection topicsToCreate = new
CreatableTopicCollection(1);