This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 391b258d50a KAFKA-19077: Propagate shutdownRequested field (#19359)
391b258d50a is described below

commit 391b258d50ab3d2edc636bf6f56f51c6e37a98a5
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Apr 7 15:11:59 2025 +0200

    KAFKA-19077: Propagate shutdownRequested field (#19359)
    
    When a member indicates that the application should shut down, set a
    soft-state flag on the streams group and continuously set the status
    `SHUTDOWN_APPLICATION` to all members, until the group is empty, which
    resets the flag.
    
    Reviewers: Matthias J. Sax <[email protected]>, Chia-Ping Tsai
    <[email protected]>, Jeff Kim <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    | 19 +++--
 .../coordinator/group/streams/StreamsGroup.java    | 26 ++++++
 .../group/GroupMetadataManagerTest.java            | 92 ++++++++++++++++++++++
 .../group/streams/StreamsGroupTest.java            | 34 +++++++-
 4 files changed, 163 insertions(+), 8 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index c51dea94e96..c1e1fe3760d 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -61,7 +61,6 @@ import 
org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
 import 
org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.Endpoint;
 import 
org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.KeyValue;
 import 
org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.TaskIds;
-import 
org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.TaskOffset;
 import 
org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.Topology;
 import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
 import 
org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData.Status;
@@ -2068,8 +2067,6 @@ public class GroupMetadataManager {
      * @param ownedWarmupTasks    The list of owned warmup tasks from the 
request or null.
      * @param userEndpoint        User-defined endpoint for Interactive 
Queries, or null.
      * @param clientTags          Used for rack-aware assignment algorithm, or 
null.
-     * @param taskEndOffsets      Cumulative changelog offsets for tasks, or 
null.
-     * @param taskOffsets         Cumulative changelog end-offsets for tasks, 
or null.
      * @param shutdownApplication Whether all Streams clients in the group 
should shut down.
      * @return A result containing the StreamsGroupHeartbeat response and a 
list of records to update the state machine.
      */
@@ -2089,8 +2086,6 @@ public class GroupMetadataManager {
         String processId,
         Endpoint userEndpoint,
         List<KeyValue> clientTags,
-        List<TaskOffset> taskOffsets,
-        List<TaskOffset> taskEndOffsets,
         boolean shutdownApplication
     ) throws ApiException {
         final long currentTimeMs = time.milliseconds();
@@ -2224,6 +2219,9 @@ public class GroupMetadataManager {
         );
 
         scheduleStreamsGroupSessionTimeout(groupId, memberId);
+        if (shutdownApplication) {
+            group.setShutdownRequestMemberId(memberId);
+        }
 
         // Prepare the response.
         StreamsGroupHeartbeatResponseData response = new 
StreamsGroupHeartbeatResponseData()
@@ -2251,6 +2249,15 @@ public class GroupMetadataManager {
             );
         }
 
+        group.getShutdownRequestMemberId().ifPresent(requestingMemberId -> 
returnedStatus.add(
+            new Status()
+                
.setStatusCode(StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code())
+                .setStatusDetail(
+                    String.format("Streams group member %s encountered a fatal 
error and requested a shutdown for the entire application.",
+                        requestingMemberId)
+                )
+        ));
+
         if (!returnedStatus.isEmpty()) {
             response.setStatus(returnedStatus);
         }
@@ -4843,8 +4850,6 @@ public class GroupMetadataManager {
                 request.processId(),
                 request.userEndpoint(),
                 request.clientTags(),
-                request.taskOffsets(),
-                request.taskEndOffsets(),
                 request.shutdownApplication()
             );
         }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index dacb041d93b..3ace0d1c5b6 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -197,6 +197,14 @@ public class StreamsGroup implements Group {
      */
     private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
 
+    /**
+     * Keeps a member ID that requested a shutdown for this group.
+     * This has no direct effect inside the group coordinator, but is 
propagated to old and new members of the group.
+     * It is cleared once the group is empty.
+     * This is not persisted in the log, as the shutdown request is 
best-effort.
+     */
+    private Optional<String> shutdownRequestMemberId = Optional.empty();
+
     public StreamsGroup(
         LogContext logContext,
         SnapshotRegistry snapshotRegistry,
@@ -824,6 +832,7 @@ public class StreamsGroup implements Group {
         StreamsGroupState newState = STABLE;
         if (members.isEmpty()) {
             newState = EMPTY;
+            clearShutdownRequestMemberId();
         } else if (topology().isEmpty() || configuredTopology().isEmpty() || 
!configuredTopology().get().isReady()) {
             newState = NOT_READY;
         } else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
@@ -1049,4 +1058,21 @@ public class StreamsGroup implements Group {
         return describedGroup;
     }
 
+    public void setShutdownRequestMemberId(final String memberId) {
+        if (shutdownRequestMemberId.isEmpty()) {
+            log.info("[GroupId {}][MemberId {}] Shutdown requested for the 
streams application.", groupId, memberId);
+            shutdownRequestMemberId = Optional.of(memberId);
+        }
+    }
+
+    public Optional<String> getShutdownRequestMemberId() {
+        return shutdownRequestMemberId;
+    }
+
+    private void clearShutdownRequestMemberId() {
+        if (shutdownRequestMemberId.isPresent()) {
+            log.info("[GroupId {}] Clearing shutdown requested for the streams 
application.", groupId);
+            shutdownRequestMemberId = Optional.empty();
+        }
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 2014b900626..95e247beef7 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -16278,6 +16278,98 @@ public class GroupMetadataManagerTest {
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testStreamsGroupMemberRequestingShutdownApplication() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+                .withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
+                    
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                        TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
+                    .build())
+                .withMember(streamsGroupMemberBuilderWithDefaults(memberId2)
+                    
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                        TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5)))
+                    .build())
+                .withTargetAssignment(memberId1, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                    TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
+                .withTargetAssignment(memberId2, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                    TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5)))
+                .withTargetAssignmentEpoch(10)
+                .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
+                .withPartitionMetadata(Map.of(
+                    fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6)
+                ))
+            )
+            .build();
+
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result1 = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId1)
+                .setMemberEpoch(10)
+                .setShutdownApplication(true)
+        );
+
+        String statusDetail = String.format("Streams group member %s 
encountered a fatal error and requested a shutdown for the entire 
application.", memberId1);
+
+        assertResponseEquals(
+            new StreamsGroupHeartbeatResponseData()
+                .setMemberId(memberId1)
+                .setMemberEpoch(10)
+                .setHeartbeatIntervalMs(5000)
+                .setStatus(List.of(
+                    new StreamsGroupHeartbeatResponseData.Status()
+                        .setStatusCode(Status.SHUTDOWN_APPLICATION.code())
+                        .setStatusDetail(statusDetail)
+                )),
+            result1.response().data()
+        );
+        assertRecordsEquals(List.of(), result1.records());
+
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result2 = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(10)
+        );
+
+        assertResponseEquals(
+            new StreamsGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(10)
+                .setHeartbeatIntervalMs(5000)
+                .setStatus(List.of(
+                    new StreamsGroupHeartbeatResponseData.Status()
+                        .setStatusCode(Status.SHUTDOWN_APPLICATION.code())
+                        .setStatusDetail(statusDetail)
+                )),
+            result2.response().data()
+        );
+
+        assertRecordsEquals(List.of(), result2.records());
+    }
+
     @Test
     public void testStreamsUpdatingMemberMetadataTriggersNewTargetAssignment() 
{
         String groupId = "fooup";
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index fe8161df615..e9038b33402 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -1106,4 +1106,36 @@ public class StreamsGroupTest {
         assertTrue(streamsGroup.isSubscribedToTopic("test-topic2"));
         assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
     }
-}
\ No newline at end of file
+
+    @Test
+    public void testShutdownRequestedMethods() {
+        String memberId1 = "test-member-id1";
+        String memberId2 = "test-member-id2";
+        LogContext logContext = new LogContext();
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+        GroupCoordinatorMetricsShard metricsShard = 
mock(GroupCoordinatorMetricsShard.class);
+        StreamsGroup streamsGroup = new StreamsGroup(logContext, 
snapshotRegistry, "test-group", metricsShard);
+
+        
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId1));
+        
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId2));
+
+        // Initially, shutdown should not be requested
+        assertTrue(streamsGroup.getShutdownRequestMemberId().isEmpty());
+
+        // Set shutdown requested
+        streamsGroup.setShutdownRequestMemberId(memberId1);
+        assertEquals(Optional.of(memberId1), 
streamsGroup.getShutdownRequestMemberId());
+
+        // Setting shutdown requested again will be ignored
+        streamsGroup.setShutdownRequestMemberId(memberId2);
+        assertEquals(Optional.of(memberId1), 
streamsGroup.getShutdownRequestMemberId());
+
+        // As long as group not empty, remain in shutdown requested state
+        streamsGroup.removeMember(memberId1);
+        assertEquals(Optional.of(memberId1), 
streamsGroup.getShutdownRequestMemberId());
+
+        // As soon as the group is empty, clear the shutdown requested state
+        streamsGroup.removeMember(memberId2);
+        assertEquals(Optional.empty(), 
streamsGroup.getShutdownRequestMemberId());
+    }
+}

Reply via email to