This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 9ea30fb5eefce71ed79a4843845d3fe84b920d47
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Dec 5 14:40:57 2025 +0100

    KAFKA-19945: Always set status field in StreamsGroupHeartbeat response 
(#21031)
    
    The status field should always set in the response, even when empty, to
    ensure that the status gets reset on the client.
    
    We are not doing this due to an oversight - it is defined as nullable,
    and it is null when not set. So status does not clear correctly on the
    client, which ignores the field if it's null.
    
    This can cause the streams application to incorrectly timeout, if the
    source topic does not exist when the application is first started, but
    it is created after the application started. Otherwise, there is no
    noticable difference.
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../message/StreamsGroupHeartbeatResponse.json     |   4 +-
 .../server/StreamsGroupHeartbeatRequestTest.scala  |   2 +-
 .../coordinator/group/GroupMetadataManager.java    |   7 +-
 .../group/GroupMetadataManagerTest.java            | 165 +++++++++++++++++----
 4 files changed, 139 insertions(+), 39 deletions(-)

diff --git 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
index efeaf452571..220c9704a23 100644
--- 
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
+++ 
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
@@ -53,8 +53,8 @@
     { "name": "TaskOffsetIntervalMs", "type": "int32", "versions": "0+",
       "about": "The interval in which the task changelog offsets on a client 
are updated on the broker. The offsets are sent with the next heartbeat after 
this time has passed." },
 
-    { "name": "Status", "type":  "[]Status", "versions":  "0+",  
"nullableVersions": "0+", "default": "null",
-      "about": "Indicate zero or more status for the group.  Null if unchanged 
since last heartbeat." },
+    { "name": "Status", "type":  "[]Status", "versions":  "0+",  
"nullableVersions": "0+",
+      "about": "Indicate zero or more status for the group." },
 
     // The streams app knows which partitions to fetch from given this 
information
     { "name": "ActiveTasks", "type": "[]TaskIds", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
diff --git 
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
index 2d7f0649333..252ee82fb14 100644
--- 
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
@@ -209,7 +209,7 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
       assertNotNull(streamsGroupHeartbeatResponse, 
"StreamsGroupHeartbeatResponse should not be null")
       assertEquals(memberId, streamsGroupHeartbeatResponse.memberId())
       assertEquals(3, streamsGroupHeartbeatResponse.memberEpoch())
-      assertEquals(null, streamsGroupHeartbeatResponse.status())
+      assertEquals(List.empty.asJava, streamsGroupHeartbeatResponse.status())
       val expectedActiveTasks = List(
         new StreamsGroupHeartbeatResponseData.TaskIds()
           .setSubtopologyId("subtopology-1")
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 6bf963c968e..6e6eadecd1e 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2155,9 +2155,7 @@ public class GroupMetadataManager {
                 )
         ));
 
-        if (!returnedStatus.isEmpty()) {
-            response.setStatus(returnedStatus);
-        }
+        response.setStatus(returnedStatus);
         return new CoordinatorResult<>(records, new 
StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated));
     }
 
@@ -4217,7 +4215,8 @@ public class GroupMetadataManager {
         }
         StreamsGroupHeartbeatResponseData response = new 
StreamsGroupHeartbeatResponseData()
             .setMemberId(memberId)
-            .setMemberEpoch(memberEpoch);
+            .setMemberEpoch(memberEpoch)
+            .setStatus(List.of());
 
         if (instanceId == null) {
             StreamsGroupMember member = group.getMemberOrThrow(memberId);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index b8beedca616..3f24ba06f4e 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -16326,7 +16326,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1, 2))
                 ))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -16411,7 +16412,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1, 2, 3, 4, 5))
                 ))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -16946,7 +16948,8 @@ public class GroupMetadataManagerTest {
         assertResponseEquals(
             new StreamsGroupHeartbeatResponseData()
                 .setMemberId(memberId1)
-                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH),
+                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+                .setStatus(List.of()),
             result1.response().data()
         );
         assertRecordsEquals(
@@ -17056,7 +17059,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1, 2))
                 ))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -17176,7 +17180,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1, 2))
                 ))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -17315,7 +17320,8 @@ public class GroupMetadataManagerTest {
                 .setHeartbeatIntervalMs(5000)
                 .setActiveTasks(List.of())
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
 
             result.response().data()
         );
@@ -17379,7 +17385,8 @@ public class GroupMetadataManagerTest {
         assertResponseEquals(
             new StreamsGroupHeartbeatResponseData()
                 .setMemberId(memberId2)
-                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH),
+                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -17441,7 +17448,8 @@ public class GroupMetadataManagerTest {
                         .setSubtopologyId(subtopology1)
                         .setPartitions(List.of(0, 1))))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -17456,7 +17464,77 @@ public class GroupMetadataManagerTest {
             new StreamsGroupHeartbeatResponseData()
                 .setMemberId(memberId)
                 .setMemberEpoch(2)
-                .setHeartbeatIntervalMs(5000),
+                .setHeartbeatIntervalMs(5000)
+                .setStatus(List.of()),
+            result.response().data()
+        );
+    }
+
+    @Test
+    public void testStreamsGroupHeartbeatAlwaysSetsStatus() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 2)
+                .buildCoordinatorMetadataImage())
+            
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
 0)
+            .build();
+
+        // Prepare new assignment for the group.
+        assignor.prepareGroupAssignment(
+            Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, 
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1))));
+
+        // Heartbeat with no errors should still have status field set to 
empty list.
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(1500)
+                .setTopology(topology)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
+
+        // Verify that status is always set, even when empty.
+        assertResponseEquals(
+            new StreamsGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(2)
+                .setHeartbeatIntervalMs(5000)
+                .setActiveTasks(List.of(
+                    new StreamsGroupHeartbeatResponseData.TaskIds()
+                        .setSubtopologyId(subtopology1)
+                        .setPartitions(List.of(0, 1))))
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
+            result.response().data()
+        );
+
+        // Verify status field is present in subsequent heartbeats as well.
+        result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(result.response().data().memberEpoch()));
+
+        assertResponseEquals(
+            new StreamsGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(2)
+                .setHeartbeatIntervalMs(5000)
+                .setStatus(List.of()),
             result.response().data()
         );
     }
@@ -17503,7 +17581,8 @@ public class GroupMetadataManagerTest {
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
                 .setPartitionsByUserEndpoint(null)
-                .setEndpointInformationEpoch(0),
+                .setEndpointInformationEpoch(0)
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -17531,7 +17610,8 @@ public class GroupMetadataManagerTest {
                         .setSubtopologyId(subtopology1)
                         .setPartitions(List.of(0, 1))))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
     }
@@ -17639,7 +17719,8 @@ public class GroupMetadataManagerTest {
                 .setHeartbeatIntervalMs(5000)
                 .setActiveTasks(List.of())
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -17680,7 +17761,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0))
                 ))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -17725,7 +17807,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(2))
                 ))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -17758,7 +17841,8 @@ public class GroupMetadataManagerTest {
             new StreamsGroupHeartbeatResponseData()
                 .setMemberId(memberId3)
                 .setMemberEpoch(11)
-                .setHeartbeatIntervalMs(5000),
+                .setHeartbeatIntervalMs(5000)
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -17797,7 +17881,8 @@ public class GroupMetadataManagerTest {
             new StreamsGroupHeartbeatResponseData()
                 .setMemberId(memberId1)
                 .setMemberEpoch(11)
-                .setHeartbeatIntervalMs(5000),
+                .setHeartbeatIntervalMs(5000)
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -17826,7 +17911,8 @@ public class GroupMetadataManagerTest {
             new StreamsGroupHeartbeatResponseData()
                 .setMemberId(memberId2)
                 .setMemberEpoch(10)
-                .setHeartbeatIntervalMs(5000),
+                .setHeartbeatIntervalMs(5000)
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -17852,7 +17938,8 @@ public class GroupMetadataManagerTest {
                         .setSubtopologyId(subtopology2)
                         .setPartitions(List.of(1))))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -17882,7 +17969,8 @@ public class GroupMetadataManagerTest {
             new StreamsGroupHeartbeatResponseData()
                 .setMemberId(memberId3)
                 .setMemberEpoch(11)
-                .setHeartbeatIntervalMs(5000),
+                .setHeartbeatIntervalMs(5000)
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -17924,7 +18012,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(2))
                 ))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -17970,7 +18059,8 @@ public class GroupMetadataManagerTest {
                         .setSubtopologyId(subtopology2)
                         .setPartitions(List.of(1))))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -18180,7 +18270,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1, 2, 3, 4, 5))
                 ))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -18309,7 +18400,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1, 2, 3, 4, 5))
                 ))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -18535,7 +18627,8 @@ public class GroupMetadataManagerTest {
                         .setSubtopologyId(subtopology1)
                         .setPartitions(List.of(0, 1, 2))))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -18573,7 +18666,8 @@ public class GroupMetadataManagerTest {
                 .setHeartbeatIntervalMs(5000)
                 .setActiveTasks(List.of())
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -18601,7 +18695,8 @@ public class GroupMetadataManagerTest {
                         .setSubtopologyId(subtopology1)
                         .setPartitions(List.of(0, 1))))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -18632,7 +18727,8 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId1)
                 .setMemberEpoch(3)
                 .setHeartbeatIntervalMs(5000)
-                .setEndpointInformationEpoch(0),
+                .setEndpointInformationEpoch(0)
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -18696,7 +18792,8 @@ public class GroupMetadataManagerTest {
                         .setSubtopologyId(subtopology1)
                         .setPartitions(List.of(0, 1, 2))))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -18734,7 +18831,8 @@ public class GroupMetadataManagerTest {
                 .setHeartbeatIntervalMs(5000)
                 .setActiveTasks(List.of())
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -18761,7 +18859,8 @@ public class GroupMetadataManagerTest {
                         .setSubtopologyId(subtopology1)
                         .setPartitions(List.of(0, 1))))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -18943,7 +19042,8 @@ public class GroupMetadataManagerTest {
                                         .setPartitions(List.of(0, 1))))
                         .setStandbyTasks(List.of())
                         .setWarmupTasks(List.of())
-                        
.setPartitionsByUserEndpoint(List.of(expectedEndpointToPartitions)),
+                        
.setPartitionsByUserEndpoint(List.of(expectedEndpointToPartitions))
+                        .setStatus(List.of()),
                 result.response().data()
         );
 
@@ -19119,7 +19219,8 @@ public class GroupMetadataManagerTest {
                         .setSubtopologyId(subtopology1)
                         .setPartitions(List.of(0, 1, 2))))
                 .setStandbyTasks(List.of())
-                .setWarmupTasks(List.of()),
+                .setWarmupTasks(List.of())
+                .setStatus(List.of()),
             result.response().data()
         );
 

Reply via email to