This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new aa0d1f50003 MINOR: Reject requests using unsupported features in
KIP-1071 (#20031)
aa0d1f50003 is described below
commit aa0d1f500039c57892518f6ca012cb85385d52e6
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Jun 25 14:48:56 2025 +0200
MINOR: Reject requests using unsupported features in KIP-1071 (#20031)
KIP-1071 does not currently support all features planned in the KIP. We
should reject any requests that are using features that are currently
not implemented.
Reviewers: Chia-Ping Tsai <[email protected]>, Matthias J. Sax
<[email protected]>, Bill Bejeck <[email protected]>
---
.../coordinator/group/GroupCoordinatorService.java | 22 +++++
.../org/apache/kafka/coordinator/group/Utils.java | 16 ++++
.../group/GroupCoordinatorServiceTest.java | 105 +++++++++++++++++----
3 files changed, 124 insertions(+), 19 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 94ef75c846e..ab7ede49cfe 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -141,6 +141,7 @@ import static
org.apache.kafka.coordinator.common.runtime.CoordinatorOperationEx
import static org.apache.kafka.coordinator.group.Utils.throwIfEmptyString;
import static
org.apache.kafka.coordinator.group.Utils.throwIfNotEmptyCollection;
import static org.apache.kafka.coordinator.group.Utils.throwIfNotNull;
+import static org.apache.kafka.coordinator.group.Utils.throwIfNotNullOrEmpty;
import static org.apache.kafka.coordinator.group.Utils.throwIfNull;
/**
@@ -540,6 +541,26 @@ public class GroupCoordinatorService implements
GroupCoordinator {
}
}
+ /**
+ * Validates the request. Specifically, throws if any not-yet-supported
features are used.
+ *
+ * @param request The request to validate.
+ * @throws InvalidRequestException if the request is not valid.
+ */
+ private static void
throwIfStreamsGroupHeartbeatRequestIsUsingUnsupportedFeatures(
+ StreamsGroupHeartbeatRequestData request
+ ) throws InvalidRequestException {
+ throwIfNotNull(request.instanceId(), "Static membership is not yet
supported.");
+ throwIfNotNull(request.taskOffsets(), "TaskOffsets are not supported
yet.");
+ throwIfNotNull(request.taskEndOffsets(), "TaskEndOffsets are not
supported yet.");
+ throwIfNotNullOrEmpty(request.warmupTasks(), "WarmupTasks are not
supported yet.");
+ if (request.topology() != null) {
+ for (StreamsGroupHeartbeatRequestData.Subtopology subtopology :
request.topology().subtopologies()) {
+ throwIfNotEmptyCollection(subtopology.sourceTopicRegex(),
"Regular expressions for source topics are not supported yet.");
+ }
+ }
+ }
+
/**
* See
* {@link
GroupCoordinator#streamsGroupHeartbeat(AuthorizableRequestContext,
StreamsGroupHeartbeatRequestData)}.
@@ -559,6 +580,7 @@ public class GroupCoordinatorService implements
GroupCoordinator {
}
try {
+
throwIfStreamsGroupHeartbeatRequestIsUsingUnsupportedFeatures(request);
throwIfStreamsGroupHeartbeatRequestIsInvalid(request);
} catch (Throwable ex) {
ApiError apiError = ApiError.fromThrowable(ex);
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
index d614123d2a7..02b0ed28e6e 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java
@@ -280,6 +280,22 @@ public class Utils {
}
}
+ /**
+ * Throws an InvalidRequestException if the value is not null and
non-empty.
+ *
+ * @param value The value.
+ * @param error The error message.
+ * @throws InvalidRequestException
+ */
+ static void throwIfNotNullOrEmpty(
+ Collection<?> value,
+ String error
+ ) throws InvalidRequestException {
+ if (value != null && !value.isEmpty()) {
+ throw new InvalidRequestException(error);
+ }
+ }
+
/**
* Throws an InvalidRequestException if the value is non-null.
*
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 1c54abaa40a..01c87696053 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -574,6 +574,91 @@ public class GroupCoordinatorServiceTest {
future.get(5, TimeUnit.SECONDS)
);
}
+ @Test
+ public void testStreamsGroupHeartbeatFailsForUnsupportedFeatures() throws
Exception {
+
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(mockRuntime())
+ .build(true);
+
+ AuthorizableRequestContext context =
mock(AuthorizableRequestContext.class);
+ when(context.requestVersion()).thenReturn((int)
ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion());
+
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("Static membership is not yet
supported."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ .setInstanceId(Uuid.randomUuid().toString())
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("TaskOffsets are not supported yet."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ .setTaskOffsets(List.of(new
StreamsGroupHeartbeatRequestData.TaskOffset()))
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("TaskEndOffsets are not supported yet."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ .setTaskEndOffsets(List.of(new
StreamsGroupHeartbeatRequestData.TaskOffset()))
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("WarmupTasks are not supported yet."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ .setWarmupTasks(List.of(new
StreamsGroupHeartbeatRequestData.TaskIds()))
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("Regular expressions for source topics
are not supported yet."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ .setTopology(new
StreamsGroupHeartbeatRequestData.Topology()
+ .setSubtopologies(List.of(new
StreamsGroupHeartbeatRequestData.Subtopology()
+ .setSourceTopicRegex(List.of("foo.*"))
+ ))
+ )
+ ).get(5, TimeUnit.SECONDS)
+ );
+ }
@SuppressWarnings("MethodLength")
@Test
@@ -584,7 +669,7 @@ public class GroupCoordinatorServiceTest {
.build(true);
AuthorizableRequestContext context =
mock(AuthorizableRequestContext.class);
- when(context.requestVersion()).thenReturn((int)
ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion());
+ when(context.requestVersion()).thenReturn((int)
ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion());
String memberId = Uuid.randomUuid().toString();
@@ -743,24 +828,6 @@ public class GroupCoordinatorServiceTest {
).get(5, TimeUnit.SECONDS)
);
- // InstanceId must be non-empty if provided in all requests.
- assertEquals(
- new StreamsGroupHeartbeatResult(
- new StreamsGroupHeartbeatResponseData()
- .setErrorCode(Errors.INVALID_REQUEST.code())
- .setErrorMessage("InstanceId can't be empty."),
- Map.of()
- ),
- service.streamsGroupHeartbeat(
- context,
- new StreamsGroupHeartbeatRequestData()
- .setGroupId("foo")
- .setMemberId(memberId)
- .setMemberEpoch(1)
- .setInstanceId("")
- ).get(5, TimeUnit.SECONDS)
- );
-
// RackId must be non-empty if provided in all requests.
assertEquals(
new StreamsGroupHeartbeatResult(