This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2267902b401 MINOR: Mark streams RPCs as unstable (#19292)
2267902b401 is described below
commit 2267902b40135076ba732779191d7e39b6b55935
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Mar 27 14:22:01 2025 +0100
MINOR: Mark streams RPCs as unstable (#19292)
Streams groups RPCs are not enabled by default, but they should also be
marked as unstable.
Reviewers: Bruno Cadonna <[email protected]>
---
.../consumer/internals/StreamsGroupHeartbeatRequestManager.java | 2 +-
.../main/resources/common/message/StreamsGroupDescribeRequest.json | 4 ++++
.../main/resources/common/message/StreamsGroupHeartbeatRequest.json | 4 ++++
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | 2 +-
4 files changed, 10 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index 08aa6b6927f..319a708e216 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -476,7 +476,7 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final
long currentTimeMs) {
NetworkClientDelegate.UnsentRequest request = new
NetworkClientDelegate.UnsentRequest(
- new
StreamsGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
+ new
StreamsGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData(),
true),
coordinatorRequestManager.coordinator()
);
heartbeatRequestState.onSendAttempt(currentTimeMs);
diff --git
a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
index 6e36479043a..e3dad6fa8ec 100644
--- a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
+++ b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
@@ -20,6 +20,10 @@
"name": "StreamsGroupDescribeRequest",
"validVersions": "0",
"flexibleVersions": "0+",
+ // The StreamsGroupDescribeRequest API is added as part of KIP-1071 and is
still under
+ // development. Hence, the API is not exposed by default by brokers unless
+ // explicitly enabled.
+ "latestVersionUnstable": true,
"fields": [
{ "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType":
"groupId",
"about": "The ids of the groups to describe" },
diff --git
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
index 3395688983b..6af7fad4d2b 100644
---
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
+++
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
@@ -20,6 +20,10 @@
"name": "StreamsGroupHeartbeatRequest",
"validVersions": "0",
"flexibleVersions": "0+",
+ // The StreamsGroupDescribeRequest API is added as part of KIP-1071 and is
still under
+ // development. Hence, the API is not exposed by default by brokers unless
+ // explicitly enabled.
+ "latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType":
"groupId",
"about": "The group identifier." },
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index d5910a25520..b5e019fdffd 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -9652,7 +9652,7 @@ class KafkaApisTest extends Logging {
def testStreamsGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
val streamsGroupHeartbeatRequest = new
StreamsGroupHeartbeatRequestData().setGroupId("group")
- val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
+ val requestChannelRequest = buildRequest(new
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest,
true).build())
metadataCache = {
val cache = new KRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_1)
val delta = new MetadataDelta(MetadataImage.EMPTY);