This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2267902b401 MINOR: Mark streams RPCs as unstable (#19292)
2267902b401 is described below

commit 2267902b40135076ba732779191d7e39b6b55935
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Mar 27 14:22:01 2025 +0100

    MINOR: Mark streams RPCs as unstable (#19292)
    
    Streams groups RPCs are not enabled by default, but they should also be
    marked as unstable.
    
    Reviewers: Bruno Cadonna <[email protected]>
---
 .../consumer/internals/StreamsGroupHeartbeatRequestManager.java       | 2 +-
 .../main/resources/common/message/StreamsGroupDescribeRequest.json    | 4 ++++
 .../main/resources/common/message/StreamsGroupHeartbeatRequest.json   | 4 ++++
 core/src/test/scala/unit/kafka/server/KafkaApisTest.scala             | 2 +-
 4 files changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index 08aa6b6927f..319a708e216 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -476,7 +476,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
 
     private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final 
long currentTimeMs) {
         NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
-            new 
StreamsGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()),
+            new 
StreamsGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData(), 
true),
             coordinatorRequestManager.coordinator()
         );
         heartbeatRequestState.onSendAttempt(currentTimeMs);
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json 
b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
index 6e36479043a..e3dad6fa8ec 100644
--- a/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
+++ b/clients/src/main/resources/common/message/StreamsGroupDescribeRequest.json
@@ -20,6 +20,10 @@
   "name": "StreamsGroupDescribeRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
+  // The StreamsGroupDescribeRequest API is added as part of KIP-1071 and is 
still under
+  // development. Hence, the API is not exposed by default by brokers unless
+  // explicitly enabled.
+  "latestVersionUnstable": true,
   "fields": [
     { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": 
"groupId",
       "about": "The ids of the groups to describe" },
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
index 3395688983b..6af7fad4d2b 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
@@ -20,6 +20,10 @@
   "name": "StreamsGroupHeartbeatRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
+  // The StreamsGroupDescribeRequest API is added as part of KIP-1071 and is 
still under
+  // development. Hence, the API is not exposed by default by brokers unless
+  // explicitly enabled.
+  "latestVersionUnstable": true,
   "fields": [
     { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
       "about": "The group identifier." },
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index d5910a25520..b5e019fdffd 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -9652,7 +9652,7 @@ class KafkaApisTest extends Logging {
   def testStreamsGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
     val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequestData().setGroupId("group")
 
-    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest).build())
+    val requestChannelRequest = buildRequest(new 
StreamsGroupHeartbeatRequest.Builder(streamsGroupHeartbeatRequest, 
true).build())
     metadataCache = {
       val cache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_1)
       val delta = new MetadataDelta(MetadataImage.EMPTY);

Reply via email to