This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/kip1071 by this push:
     new 9dd4f438b67 MINOR: add endpoint to sync RPCs with KIP (#18035)
9dd4f438b67 is described below

commit 9dd4f438b67f3ffef31da7e2f01f48682ae8e29b
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Dec 4 16:06:22 2024 +0100

    MINOR: add endpoint to sync RPCs with KIP (#18035)
---
 .../common/message/StreamsGroupDescribeResponse.json | 20 +++++++++++++++-----
 .../common/message/StreamsGroupHeartbeatRequest.json | 10 +++++-----
 .../group/streams/StreamsGroupMember.java            | 11 ++++++++---
 .../message/StreamsGroupMemberMetadataValue.json     |  4 ++--
 .../group/streams/StreamsGroupMemberTest.java        |  8 ++++++--
 5 files changed, 36 insertions(+), 17 deletions(-)

diff --git 
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json 
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
index 61dd4a38dae..dbd582e1f41 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupDescribeResponse.json
@@ -46,13 +46,13 @@
         { "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
           "about": "The assignment epoch." },
 
-        { "name":  "Topology", "type": "Topology", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-          "about": "The topology metadata currently initialized for the 
streams application. Can be null in case of a describe error.",
+        { "name":  "Topology", "type": "Topology", "versions": "0+",
+          "about": "The topology metadata currently initialized for the 
streams application.",
           "fields": [
             { "name": "Epoch", "type": "int32", "versions": "0+",
               "about": "The epoch of the currently initialized topology for 
this group." },
             { "name": "Subtopologies", "type": "[]Subtopology", "versions": 
"0+", "nullableVersions": "0+", "default": "null",
-              "about": "The subtopologies of the streams application. This 
contains the configured subtopologies, where the number of partitions are set 
and any regular expressions are resolved to actual topics. Null if the group is 
uninitialized, source topics are missing or inconsistent.",
+              "about": "The subtopologies of the streams application. This 
contains the configured subtopologies, where the number of partitions are set 
and any regular expressions are resolved to actual topics. Null if the group is 
uninitialized, source topics are missing or incorrectly partitioned.",
               "fields": [
                 { "name": "SubtopologyId", "type": "string", "versions": "0+",
                   "about": "String to uniquely identify the subtopology." },
@@ -88,6 +88,8 @@
 
             { "name": "ProcessId", "type": "string", "versions": "0+",
               "about": "Identity of the streams instance that may have 
multiple clients. " },
+            { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+              "about": "User-defined endpoint for Interactive Queries. Null if 
not defined for this client." },
             { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+",
               "about": "Used for rack-aware assignment algorithm." },
             { "name": "TaskOffsets", "type": "[]TaskOffset", "versions": "0+",
@@ -98,7 +100,9 @@
             { "name": "Assignment", "type": "Assignment", "versions": "0+",
               "about": "The current assignment." },
             { "name": "TargetAssignment", "type": "Assignment", "versions": 
"0+",
-              "about": "The target assignment." }
+              "about": "The target assignment." },
+            { "name": "IsClassic", "type": "bool", "versions": "0+",
+              "about": "True for classic members that have not been upgraded 
yet." }
           ]},
         { "name": "AuthorizedOperations", "type": "int32", "versions": "0+", 
"default": "-2147483648",
           "about": "32-bit bitfield to represent authorized operations for 
this group." }
@@ -106,6 +110,12 @@
     }
   ],
   "commonStructs": [
+    { "name": "Endpoint", "versions": "0+", "fields": [
+      { "name": "Host", "type": "string", "versions": "0+",
+        "about": "host of the endpoint" },
+      { "name": "Port", "type": "uint16", "versions": "0+",
+        "about": "port of the endpoint" }
+    ]},
     { "name": "TaskOffset", "versions": "0+", "fields": [
       { "name": "SubtopologyId", "type": "string", "versions": "0+",
         "about": "The subtopology identifier." },
@@ -154,4 +164,4 @@
       }
     ]}
   ]
-}
+}
\ No newline at end of file
diff --git 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
index d63ee0f8fd9..b031d2da07d 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatRequest.json
@@ -30,15 +30,15 @@
     { "name": "InstanceId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "null if not provided or if it didn't change since the last 
heartbeat; the instance ID for static membership otherwise." },
     { "name": "RackId", "type": "string", "versions": "0+",  
"nullableVersions": "0+", "default": "null",
-      "about": "null if not provided or if it didn't change since the last 
heartbeat; the rack ID of consumer otherwise." },
+      "about": "null if not provided or if it didn't change since the last 
heartbeat; the rack ID of the member otherwise." },
     { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", 
"default": -1,
-      "about": "-1 if it didn't change since the last heartbeat; the maximum 
time in milliseconds that the coordinator will wait on the member to revoke its 
partitions otherwise." },
+      "about": "-1 if it didn't change since the last heartbeat; the maximum 
time in milliseconds that the coordinator will wait on the member to revoke its 
tasks otherwise." },
 
     { "name": "Topology", "type": "Topology", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "The topology metadata of the streams application. Used to 
initialize the topology of the group and to check if the topology corresponds 
to the topology initialized for the group. Only sent when memberEpoch = 0, must 
be non-empty. Null otherwise.",
       "fields": [
         { "name": "Epoch", "type": "int32", "versions": "0+",
-          "about": "The epoch of the topology. Used to check if the topology 
corresponds to the topology initialized on the brokers. Must be non-zero." },
+          "about": "The epoch of the topology. Used to check if the topology 
corresponds to the topology initialized on the brokers." },
         { "name": "Subtopologies", "type": "[]Subtopology", "versions": "0+",
           "about": "The sub-topologies of the streams application.",
           "fields": [
@@ -78,7 +78,7 @@
     { "name": "ProcessId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "Identity of the streams instance that may have multiple 
consumers. Null if unchanged since last heartbeat." },
     { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-      "about": "User-defined endpoint for Interactive Queries. Null if 
unchanged since last heartbeat." },
+      "about": "User-defined endpoint for Interactive Queries. Null if 
unchanged since last heartbeat, or if not defined on the client." },
     { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "Used for rack-aware assignment algorithm. Null if unchanged 
since last heartbeat." },
 
@@ -129,4 +129,4 @@
         "about": "The partitions of the input topics processed by this 
member." }
     ]}
   ]
-}
+}
\ No newline at end of file
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
index 144d1198063..b73f1ce5459 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
@@ -621,9 +621,14 @@ public class StreamsGroupMember {
                     .setValue(entry.getValue())
             ).collect(Collectors.toList()))
             .setProcessId(processId)
-            .setTopologyEpoch(topologyEpoch);
-        // TODO: TaskOffset and TaskEndOffset are missing.
-
+            .setTopologyEpoch(topologyEpoch)
+            .setUserEndpoint(
+                userEndpoint == null ? null :
+                    new StreamsGroupDescribeResponseData.Endpoint()
+                        .setHost(userEndpoint.host())
+                        .setPort(userEndpoint.port())
+            );
+        // TODO: TaskOffset, TaskEndOffset, IsClassic are to be implemented.
     }
 
     private static List<StreamsGroupDescribeResponseData.TaskIds> 
taskIdsFromMap(
diff --git 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json
 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json
index 07ef8d4c252..0725ccb1cbc 100644
--- 
a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json
+++ 
b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json
@@ -21,7 +21,7 @@
   "flexibleVersions": "0+",
   "fields": [
     { "name": "InstanceId", "versions": "0+", "nullableVersions": "0+", 
"type": "string",
-      "about": "The (optional) instance id." },
+      "about": "The (optional) instance ID for static membership." },
     { "name": "RackId", "versions": "0+", "nullableVersions": "0+", "type": 
"string",
       "about": "The (optional) rack id." },
     { "name": "ClientId", "versions": "0+", "type": "string",
@@ -36,7 +36,7 @@
 
     { "name": "ProcessId", "type": "string", "versions": "0+",
       "about": "Identity of the streams instance that may have multiple 
consumers." },
-    { "name": "UserEndpoint", "type": "Endpoint", "nullableVersions": "0+", 
"versions": "0+",
+    { "name": "UserEndpoint", "type": "Endpoint", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "User-defined endpoint for running interactive queries on this 
instance." },
     { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+",
       "about": "Used for rack-aware assignment algorithm." }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
index 5af3a225577..0120e6f11ec 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMemberTest.java
@@ -340,6 +340,9 @@ public class StreamsGroupMemberTest {
             .setAssignedWarmupTasks(
                 mkMap(mkEntry(subTopology3, new HashSet<>(assignedTasks3)))
             )
+            .setUserEndpoint(
+                new 
StreamsGroupMemberMetadataValue.Endpoint().setHost("host").setPort(9090)
+            )
             .build();
 
         StreamsGroupDescribeResponseData.Member actual = 
member.asStreamsGroupDescribeMember(targetAssignment);
@@ -376,8 +379,9 @@ public class StreamsGroupMemberTest {
                     .setWarmupTasks(Collections.singletonList(new 
StreamsGroupDescribeResponseData.TaskIds()
                         .setSubtopologyId(subTopology3)
                         .setPartitions(assignedTasks1)))
-            );
-        // TODO: Add TaskOffsets
+            )
+            .setUserEndpoint(new 
StreamsGroupDescribeResponseData.Endpoint().setHost("host").setPort(9090));
+        // TODO: TaskOffset, TaskEndOffset, IsClassic are to be implemented.
 
         assertEquals(expected, actual);
     }

Reply via email to