This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 391b258d50a KAFKA-19077: Propagate shutdownRequested field (#19359)
391b258d50a is described below
commit 391b258d50ab3d2edc636bf6f56f51c6e37a98a5
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Apr 7 15:11:59 2025 +0200
KAFKA-19077: Propagate shutdownRequested field (#19359)
When a member indicates that the application should shut down, set a
soft-state flag on the streams group and continuously set the status
`SHUTDOWN_APPLICATION` to all members, until the group is empty, which
resets the flag.
Reviewers: Matthias J. Sax <[email protected]>, Chia-Ping Tsai
<[email protected]>, Jeff Kim <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 19 +++--
.../coordinator/group/streams/StreamsGroup.java | 26 ++++++
.../group/GroupMetadataManagerTest.java | 92 ++++++++++++++++++++++
.../group/streams/StreamsGroupTest.java | 34 +++++++-
4 files changed, 163 insertions(+), 8 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index c51dea94e96..c1e1fe3760d 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -61,7 +61,6 @@ import
org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import
org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.Endpoint;
import
org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.KeyValue;
import
org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.TaskIds;
-import
org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.TaskOffset;
import
org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData.Topology;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import
org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData.Status;
@@ -2068,8 +2067,6 @@ public class GroupMetadataManager {
* @param ownedWarmupTasks The list of owned warmup tasks from the
request or null.
* @param userEndpoint User-defined endpoint for Interactive
Queries, or null.
* @param clientTags Used for rack-aware assignment algorithm, or
null.
- * @param taskEndOffsets Cumulative changelog offsets for tasks, or
null.
- * @param taskOffsets Cumulative changelog end-offsets for tasks,
or null.
* @param shutdownApplication Whether all Streams clients in the group
should shut down.
* @return A result containing the StreamsGroupHeartbeat response and a
list of records to update the state machine.
*/
@@ -2089,8 +2086,6 @@ public class GroupMetadataManager {
String processId,
Endpoint userEndpoint,
List<KeyValue> clientTags,
- List<TaskOffset> taskOffsets,
- List<TaskOffset> taskEndOffsets,
boolean shutdownApplication
) throws ApiException {
final long currentTimeMs = time.milliseconds();
@@ -2224,6 +2219,9 @@ public class GroupMetadataManager {
);
scheduleStreamsGroupSessionTimeout(groupId, memberId);
+ if (shutdownApplication) {
+ group.setShutdownRequestMemberId(memberId);
+ }
// Prepare the response.
StreamsGroupHeartbeatResponseData response = new
StreamsGroupHeartbeatResponseData()
@@ -2251,6 +2249,15 @@ public class GroupMetadataManager {
);
}
+ group.getShutdownRequestMemberId().ifPresent(requestingMemberId ->
returnedStatus.add(
+ new Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code())
+ .setStatusDetail(
+ String.format("Streams group member %s encountered a fatal
error and requested a shutdown for the entire application.",
+ requestingMemberId)
+ )
+ ));
+
if (!returnedStatus.isEmpty()) {
response.setStatus(returnedStatus);
}
@@ -4843,8 +4850,6 @@ public class GroupMetadataManager {
request.processId(),
request.userEndpoint(),
request.clientTags(),
- request.taskOffsets(),
- request.taskEndOffsets(),
request.shutdownApplication()
);
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index dacb041d93b..3ace0d1c5b6 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -197,6 +197,14 @@ public class StreamsGroup implements Group {
*/
private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+ /**
+ * Keeps a member ID that requested a shutdown for this group.
+ * This has no direct effect inside the group coordinator, but is
propagated to old and new members of the group.
+ * It is cleared once the group is empty.
+ * This is not persisted in the log, as the shutdown request is
best-effort.
+ */
+ private Optional<String> shutdownRequestMemberId = Optional.empty();
+
public StreamsGroup(
LogContext logContext,
SnapshotRegistry snapshotRegistry,
@@ -824,6 +832,7 @@ public class StreamsGroup implements Group {
StreamsGroupState newState = STABLE;
if (members.isEmpty()) {
newState = EMPTY;
+ clearShutdownRequestMemberId();
} else if (topology().isEmpty() || configuredTopology().isEmpty() ||
!configuredTopology().get().isReady()) {
newState = NOT_READY;
} else if (groupEpoch.get() > targetAssignmentEpoch.get()) {
@@ -1049,4 +1058,21 @@ public class StreamsGroup implements Group {
return describedGroup;
}
+ public void setShutdownRequestMemberId(final String memberId) {
+ if (shutdownRequestMemberId.isEmpty()) {
+ log.info("[GroupId {}][MemberId {}] Shutdown requested for the
streams application.", groupId, memberId);
+ shutdownRequestMemberId = Optional.of(memberId);
+ }
+ }
+
+ public Optional<String> getShutdownRequestMemberId() {
+ return shutdownRequestMemberId;
+ }
+
+ private void clearShutdownRequestMemberId() {
+ if (shutdownRequestMemberId.isPresent()) {
+ log.info("[GroupId {}] Clearing shutdown requested for the streams
application.", groupId);
+ shutdownRequestMemberId = Optional.empty();
+ }
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 2014b900626..95e247beef7 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -16278,6 +16278,98 @@ public class GroupMetadataManagerTest {
assertRecordsEquals(expectedRecords, result.records());
}
+ @Test
+ public void testStreamsGroupMemberRequestingShutdownApplication() {
+ String groupId = "fooup";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build())
+ .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+ .withMember(streamsGroupMemberBuilderWithDefaults(memberId1)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
+ .build())
+ .withMember(streamsGroupMemberBuilderWithDefaults(memberId2)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5)))
+ .build())
+ .withTargetAssignment(memberId1,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)))
+ .withTargetAssignment(memberId2,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 3, 4, 5)))
+ .withTargetAssignmentEpoch(10)
+ .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
+ .withPartitionMetadata(Map.of(
+ fooTopicName, new
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId,
fooTopicName, 6)
+ ))
+ )
+ .build();
+
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result1 = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(10)
+ .setShutdownApplication(true)
+ );
+
+ String statusDetail = String.format("Streams group member %s
encountered a fatal error and requested a shutdown for the entire
application.", memberId1);
+
+ assertResponseEquals(
+ new StreamsGroupHeartbeatResponseData()
+ .setMemberId(memberId1)
+ .setMemberEpoch(10)
+ .setHeartbeatIntervalMs(5000)
+ .setStatus(List.of(
+ new StreamsGroupHeartbeatResponseData.Status()
+ .setStatusCode(Status.SHUTDOWN_APPLICATION.code())
+ .setStatusDetail(statusDetail)
+ )),
+ result1.response().data()
+ );
+ assertRecordsEquals(List.of(), result1.records());
+
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result2 = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(10)
+ );
+
+ assertResponseEquals(
+ new StreamsGroupHeartbeatResponseData()
+ .setMemberId(memberId2)
+ .setMemberEpoch(10)
+ .setHeartbeatIntervalMs(5000)
+ .setStatus(List.of(
+ new StreamsGroupHeartbeatResponseData.Status()
+ .setStatusCode(Status.SHUTDOWN_APPLICATION.code())
+ .setStatusDetail(statusDetail)
+ )),
+ result2.response().data()
+ );
+
+ assertRecordsEquals(List.of(), result2.records());
+ }
+
@Test
public void testStreamsUpdatingMemberMetadataTriggersNewTargetAssignment()
{
String groupId = "fooup";
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index fe8161df615..e9038b33402 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -1106,4 +1106,36 @@ public class StreamsGroupTest {
assertTrue(streamsGroup.isSubscribedToTopic("test-topic2"));
assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
}
-}
\ No newline at end of file
+
+ @Test
+ public void testShutdownRequestedMethods() {
+ String memberId1 = "test-member-id1";
+ String memberId2 = "test-member-id2";
+ LogContext logContext = new LogContext();
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+ GroupCoordinatorMetricsShard metricsShard =
mock(GroupCoordinatorMetricsShard.class);
+ StreamsGroup streamsGroup = new StreamsGroup(logContext,
snapshotRegistry, "test-group", metricsShard);
+
+
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId1));
+
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId2));
+
+ // Initially, shutdown should not be requested
+ assertTrue(streamsGroup.getShutdownRequestMemberId().isEmpty());
+
+ // Set shutdown requested
+ streamsGroup.setShutdownRequestMemberId(memberId1);
+ assertEquals(Optional.of(memberId1),
streamsGroup.getShutdownRequestMemberId());
+
+ // Setting shutdown requested again will be ignored
+ streamsGroup.setShutdownRequestMemberId(memberId2);
+ assertEquals(Optional.of(memberId1),
streamsGroup.getShutdownRequestMemberId());
+
+ // As long as group not empty, remain in shutdown requested state
+ streamsGroup.removeMember(memberId1);
+ assertEquals(Optional.of(memberId1),
streamsGroup.getShutdownRequestMemberId());
+
+ // As soon as the group is empty, clear the shutdown requested state
+ streamsGroup.removeMember(memberId2);
+ assertEquals(Optional.empty(),
streamsGroup.getShutdownRequestMemberId());
+ }
+}