This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5a9681651f1 MINOR: Move request static validations to
GroupCoordinatorService (#19556)
5a9681651f1 is described below
commit 5a9681651f14929211f8e09f952525cefec813d7
Author: David Jacot <[email protected]>
AuthorDate: Fri Apr 25 17:32:33 2025 +0200
MINOR: Move request static validations to GroupCoordinatorService (#19556)
This patches moves the static request validations from the
`GroupMetadataManager` to the `GroupCoordinatorService`. We already had
static validation in the service for other requests so it makes sense to
consolidate all the static validations at the same place. Moreover, it
also prevents faulty requests from unnecessarily using group
coordinator's resources.
Reviewers: Lucas Brutschy <[email protected]>, Andrew Schofield
<[email protected]>
---
.../coordinator/group/GroupCoordinatorService.java | 186 +++++++
.../coordinator/group/GroupMetadataManager.java | 144 -----
.../group/GroupCoordinatorServiceTest.java | 607 ++++++++++++++++++++-
.../group/GroupMetadataManagerTest.java | 311 -----------
4 files changed, 787 insertions(+), 461 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 035dfdb1d52..f92a43679a9 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -20,7 +20,10 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
@@ -67,6 +70,7 @@ import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
+import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
import org.apache.kafka.common.requests.StreamsGroupDescribeRequest;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
@@ -83,6 +87,7 @@ import
org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetrics;
import
org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilderSupplier;
import org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor;
import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
+import
org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataDelta;
@@ -126,7 +131,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;
+import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION;
+import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorOperationExceptionHelper.handleOperationException;
+import static org.apache.kafka.coordinator.group.Utils.throwIfEmptyString;
+import static
org.apache.kafka.coordinator.group.Utils.throwIfNotEmptyCollection;
+import static org.apache.kafka.coordinator.group.Utils.throwIfNotNull;
+import static org.apache.kafka.coordinator.group.Utils.throwIfNull;
/**
* The group coordinator service.
@@ -298,6 +310,11 @@ public class GroupCoordinatorService implements
GroupCoordinator {
*/
private final AtomicBoolean isActive = new AtomicBoolean(false);
+ /**
+ * The set of supported consumer group assignors.
+ */
+ private final Set<String> consumerGroupAssignors;
+
/**
* The number of partitions of the __consumer_offsets topics. This is
provided
* when the component is started.
@@ -336,6 +353,11 @@ public class GroupCoordinatorService implements
GroupCoordinator {
this.groupConfigManager = groupConfigManager;
this.persister = persister;
this.timer = timer;
+ this.consumerGroupAssignors = config
+ .consumerGroupAssignors()
+ .stream()
+ .map(ConsumerGroupPartitionAssignor::name)
+ .collect(Collectors.toSet());
}
/**
@@ -367,6 +389,55 @@ public class GroupCoordinatorService implements
GroupCoordinator {
return Utils.abs(groupId.hashCode()) % numPartitions;
}
+ /**
+ * Validates the request.
+ *
+ * @param request The request to validate.
+ * @param apiVersion The version of ConsumerGroupHeartbeat RPC
+ * @throws InvalidRequestException if the request is not valid.
+ * @throws UnsupportedAssignorException if the assignor is not supported.
+ */
+ private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
+ ConsumerGroupHeartbeatRequestData request,
+ int apiVersion
+ ) throws InvalidRequestException, UnsupportedAssignorException {
+ if (apiVersion >= CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION ||
+ request.memberEpoch() > 0 ||
+ request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH
+ ) {
+ throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
+ }
+
+ throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
+ throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
+ throwIfEmptyString(request.rackId(), "RackId can't be empty.");
+
+ if (request.memberEpoch() == 0) {
+ if (request.rebalanceTimeoutMs() == -1) {
+ throw new InvalidRequestException("RebalanceTimeoutMs must be
provided in first request.");
+ }
+ if (request.topicPartitions() == null ||
!request.topicPartitions().isEmpty()) {
+ throw new InvalidRequestException("TopicPartitions must be
empty when (re-)joining.");
+ }
+ // We accept members joining with an empty list of names or an
empty regex. It basically
+ // means that they are not subscribed to any topics, but they are
part of the group.
+ if (request.subscribedTopicNames() == null &&
request.subscribedTopicRegex() == null) {
+ throw new InvalidRequestException("Either SubscribedTopicNames
or SubscribedTopicRegex must" +
+ " be non-null when (re-)joining.");
+ }
+ } else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+ throwIfNull(request.instanceId(), "InstanceId can't be null.");
+ } else if (request.memberEpoch() < LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+ throw new InvalidRequestException("MemberEpoch is invalid.");
+ }
+
+ if (request.serverAssignor() != null &&
!consumerGroupAssignors.contains(request.serverAssignor())) {
+ throw new UnsupportedAssignorException("ServerAssignor " +
request.serverAssignor()
+ + " is not supported. Supported assignors: " + String.join(",
", consumerGroupAssignors)
+ + ".");
+ }
+ }
+
/**
* See {@link
GroupCoordinator#consumerGroupHeartbeat(AuthorizableRequestContext,
ConsumerGroupHeartbeatRequestData)}.
*/
@@ -381,6 +452,16 @@ public class GroupCoordinatorService implements
GroupCoordinator {
);
}
+ try {
+ throwIfConsumerGroupHeartbeatRequestIsInvalid(request,
context.requestVersion());
+ } catch (Throwable ex) {
+ ApiError apiError = ApiError.fromThrowable(ex);
+ return CompletableFuture.completedFuture(new
ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(apiError.error().code())
+ .setErrorMessage(apiError.message())
+ );
+ }
+
return runtime.scheduleWriteOperation(
"consumer-group-heartbeat",
topicPartitionFor(request.groupId()),
@@ -397,6 +478,65 @@ public class GroupCoordinatorService implements
GroupCoordinator {
));
}
+ private static void throwIfInvalidTopology(
+ StreamsGroupHeartbeatRequestData.Topology topology
+ ) throws StreamsInvalidTopologyException {
+ for (StreamsGroupHeartbeatRequestData.Subtopology subtopology:
topology.subtopologies()) {
+ for (StreamsGroupHeartbeatRequestData.TopicInfo topicInfo:
subtopology.stateChangelogTopics()) {
+ if (topicInfo.partitions() != 0) {
+ throw new StreamsInvalidTopologyException(String.format(
+ "Changelog topic %s must have an undefined partition
count, but it is set to %d.",
+ topicInfo.name(), topicInfo.partitions()
+ ));
+ }
+ }
+ }
+ }
+
+ /**
+ * Validates the request.
+ *
+ * @param request The request to validate.
+ * @throws InvalidRequestException if the request is not valid.
+ * @throws UnsupportedAssignorException if the assignor is not supported.
+ */
+ private static void throwIfStreamsGroupHeartbeatRequestIsInvalid(
+ StreamsGroupHeartbeatRequestData request
+ ) throws InvalidRequestException {
+ throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
+ throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
+ throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
+ throwIfEmptyString(request.rackId(), "RackId can't be empty.");
+
+ if (request.memberEpoch() == 0) {
+ if (request.rebalanceTimeoutMs() == -1) {
+ throw new InvalidRequestException("RebalanceTimeoutMs must be
provided in first request.");
+ }
+ throwIfNotEmptyCollection(request.activeTasks(), "ActiveTasks must
be empty when (re-)joining.");
+ throwIfNotEmptyCollection(request.standbyTasks(), "StandbyTasks
must be empty when (re-)joining.");
+ throwIfNotEmptyCollection(request.warmupTasks(), "WarmupTasks must
be empty when (re-)joining.");
+ throwIfNull(request.topology(), "Topology must be non-null when
(re-)joining.");
+ if (request.topology() != null) {
+ throwIfInvalidTopology(request.topology());
+ }
+ } else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+ throwIfNull(request.instanceId(), "InstanceId can't be null.");
+ } else if (request.memberEpoch() < LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+ throw new InvalidRequestException(String.format("MemberEpoch is
%d, but must be greater than or equal to -2.",
+ request.memberEpoch()));
+ }
+
+ if (request.activeTasks() != null || request.standbyTasks() != null ||
request.warmupTasks() != null) {
+ throwIfNull(request.activeTasks(), "If one task-type is non-null,
all must be non-null.");
+ throwIfNull(request.standbyTasks(), "If one task-type is non-null,
all must be non-null.");
+ throwIfNull(request.warmupTasks(), "If one task-type is non-null,
all must be non-null.");
+ }
+
+ if (request.memberEpoch() != 0) {
+ throwIfNotNull(request.topology(), "Topology can only be provided
when (re-)joining.");
+ }
+ }
+
/**
* See
* {@link
GroupCoordinator#streamsGroupHeartbeat(AuthorizableRequestContext,
StreamsGroupHeartbeatRequestData)}.
@@ -415,6 +555,20 @@ public class GroupCoordinatorService implements
GroupCoordinator {
);
}
+ try {
+ throwIfStreamsGroupHeartbeatRequestIsInvalid(request);
+ } catch (Throwable ex) {
+ ApiError apiError = ApiError.fromThrowable(ex);
+ return CompletableFuture.completedFuture(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(apiError.error().code())
+ .setErrorMessage(apiError.message()),
+ Map.of()
+ )
+ );
+ }
+
return runtime.scheduleWriteOperation(
"streams-group-heartbeat",
topicPartitionFor(request.groupId()),
@@ -435,6 +589,28 @@ public class GroupCoordinatorService implements
GroupCoordinator {
));
}
+ /**
+ * Validates the ShareGroupHeartbeat request.
+ *
+ * @param request The request to validate.
+ * @throws InvalidRequestException if the request is not valid.
+ */
+ private static void throwIfShareGroupHeartbeatRequestIsInvalid(
+ ShareGroupHeartbeatRequestData request
+ ) throws InvalidRequestException {
+ throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
+ throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
+ throwIfEmptyString(request.rackId(), "RackId can't be empty.");
+
+ if (request.memberEpoch() == 0) {
+ if (request.subscribedTopicNames() == null ||
request.subscribedTopicNames().isEmpty()) {
+ throw new InvalidRequestException("SubscribedTopicNames must
be set in first request.");
+ }
+ } else if (request.memberEpoch() <
ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH) {
+ throw new InvalidRequestException("MemberEpoch is invalid.");
+ }
+ }
+
/**
* See {@link
GroupCoordinator#shareGroupHeartbeat(AuthorizableRequestContext,
ShareGroupHeartbeatRequestData)}.
*/
@@ -449,6 +625,16 @@ public class GroupCoordinatorService implements
GroupCoordinator {
);
}
+ try {
+ throwIfShareGroupHeartbeatRequestIsInvalid(request);
+ } catch (Throwable ex) {
+ ApiError apiError = ApiError.fromThrowable(ex);
+ return CompletableFuture.completedFuture(new
ShareGroupHeartbeatResponseData()
+ .setErrorCode(apiError.error().code())
+ .setErrorMessage(apiError.message())
+ );
+ }
+
return runtime.scheduleWriteOperation(
"share-group-heartbeat",
topicPartitionFor(request.groupId()),
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index c94381ebf65..96fb8ff6550 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -30,11 +30,9 @@ import
org.apache.kafka.common.errors.InconsistentGroupProtocolException;
import org.apache.kafka.common.errors.InvalidRegularExpression;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
-import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnreleasedInstanceIdException;
-import org.apache.kafka.common.errors.UnsupportedAssignorException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
@@ -212,7 +210,6 @@ import static
org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR;
-import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION;
import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
import static
org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
@@ -240,10 +237,6 @@ import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord;
import static org.apache.kafka.coordinator.group.Utils.assignmentToString;
import static org.apache.kafka.coordinator.group.Utils.ofSentinel;
-import static org.apache.kafka.coordinator.group.Utils.throwIfEmptyString;
-import static
org.apache.kafka.coordinator.group.Utils.throwIfNotEmptyCollection;
-import static org.apache.kafka.coordinator.group.Utils.throwIfNotNull;
-import static org.apache.kafka.coordinator.group.Utils.throwIfNull;
import static
org.apache.kafka.coordinator.group.Utils.throwIfRegularExpressionIsInvalid;
import static
org.apache.kafka.coordinator.group.Utils.toConsumerProtocolAssignment;
import static org.apache.kafka.coordinator.group.Utils.toTopicPartitions;
@@ -1418,137 +1411,6 @@ public class GroupMetadataManager {
groups.remove(groupId);
}
- private static void throwIfInvalidTopology(
- StreamsGroupHeartbeatRequestData.Topology topology
- ) throws StreamsInvalidTopologyException {
- for (StreamsGroupHeartbeatRequestData.Subtopology subtopology:
topology.subtopologies()) {
- for (StreamsGroupHeartbeatRequestData.TopicInfo topicInfo:
subtopology.stateChangelogTopics()) {
- if (topicInfo.partitions() != 0) {
- throw new StreamsInvalidTopologyException(String.format(
- "Changelog topic %s must have an undefined partition
count, but it is set to %d.",
- topicInfo.name(), topicInfo.partitions()
- ));
- }
- }
- }
- }
-
- /**
- * Validates the request.
- *
- * @param request The request to validate.
- * @param apiVersion The version of ConsumerGroupHeartbeat RPC
- * @throws InvalidRequestException if the request is not valid.
- * @throws UnsupportedAssignorException if the assignor is not supported.
- */
- private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
- ConsumerGroupHeartbeatRequestData request,
- int apiVersion
- ) throws InvalidRequestException, UnsupportedAssignorException {
- if (apiVersion >= CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION ||
- request.memberEpoch() > 0 ||
- request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH
- ) {
- throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
- }
-
- throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
- throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
- throwIfEmptyString(request.rackId(), "RackId can't be empty.");
-
- if (request.memberEpoch() == 0) {
- if (request.rebalanceTimeoutMs() == -1) {
- throw new InvalidRequestException("RebalanceTimeoutMs must be
provided in first request.");
- }
- if (request.topicPartitions() == null ||
!request.topicPartitions().isEmpty()) {
- throw new InvalidRequestException("TopicPartitions must be
empty when (re-)joining.");
- }
- // We accept members joining with an empty list of names or an
empty regex. It basically
- // means that they are not subscribed to any topics, but they are
part of the group.
- if (request.subscribedTopicNames() == null &&
request.subscribedTopicRegex() == null) {
- throw new InvalidRequestException("Either SubscribedTopicNames
or SubscribedTopicRegex must" +
- " be non-null when (re-)joining.");
- }
- } else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
- throwIfNull(request.instanceId(), "InstanceId can't be null.");
- } else if (request.memberEpoch() < LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
- throw new InvalidRequestException("MemberEpoch is invalid.");
- }
-
- if (request.serverAssignor() != null &&
!consumerGroupAssignors.containsKey(request.serverAssignor())) {
- throw new UnsupportedAssignorException("ServerAssignor " +
request.serverAssignor()
- + " is not supported. Supported assignors: " + String.join(",
", consumerGroupAssignors.keySet())
- + ".");
- }
- }
-
- /**
- * Validates the ShareGroupHeartbeat request.
- *
- * @param request The request to validate.
- * @throws InvalidRequestException if the request is not valid.
- * @throws UnsupportedAssignorException if the assignor is not supported.
- */
- private void throwIfShareGroupHeartbeatRequestIsInvalid(
- ShareGroupHeartbeatRequestData request
- ) throws InvalidRequestException, UnsupportedAssignorException {
- throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
- throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
- throwIfEmptyString(request.rackId(), "RackId can't be empty.");
-
- if (request.memberEpoch() == 0) {
- if (request.subscribedTopicNames() == null ||
request.subscribedTopicNames().isEmpty()) {
- throw new InvalidRequestException("SubscribedTopicNames must
be set in first request.");
- }
- } else if (request.memberEpoch() <
ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH) {
- throw new InvalidRequestException("MemberEpoch is invalid.");
- }
- }
-
- /**
- * Validates the request.
- *
- * @param request The request to validate.
- * @throws InvalidRequestException if the request is not valid.
- * @throws UnsupportedAssignorException if the assignor is not supported.
- */
- private static void throwIfStreamsGroupHeartbeatRequestIsInvalid(
- StreamsGroupHeartbeatRequestData request
- ) throws InvalidRequestException {
- throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
- throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
- throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
- throwIfEmptyString(request.rackId(), "RackId can't be empty.");
-
- if (request.memberEpoch() == 0) {
- if (request.rebalanceTimeoutMs() == -1) {
- throw new InvalidRequestException("RebalanceTimeoutMs must be
provided in first request.");
- }
- throwIfNotEmptyCollection(request.activeTasks(), "ActiveTasks must
be empty when (re-)joining.");
- throwIfNotEmptyCollection(request.standbyTasks(), "StandbyTasks
must be empty when (re-)joining.");
- throwIfNotEmptyCollection(request.warmupTasks(), "WarmupTasks must
be empty when (re-)joining.");
- throwIfNull(request.topology(), "Topology must be non-null when
(re-)joining.");
- if (request.topology() != null) {
- throwIfInvalidTopology(request.topology());
- }
- } else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
- throwIfNull(request.instanceId(), "InstanceId can't be null.");
- } else if (request.memberEpoch() < LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
- throw new InvalidRequestException(String.format("MemberEpoch is
%d, but must be greater than or equal to -2.",
- request.memberEpoch()));
- }
-
- if (request.activeTasks() != null || request.standbyTasks() != null ||
request.warmupTasks() != null) {
- throwIfNull(request.activeTasks(), "If one task-type is non-null,
all must be non-null.");
- throwIfNull(request.standbyTasks(), "If one task-type is non-null,
all must be non-null.");
- throwIfNull(request.warmupTasks(), "If one task-type is non-null,
all must be non-null.");
- }
-
- if (request.memberEpoch() != 0) {
- throwIfNotNull(request.topology(), "Topology can only be provided
when (re-)joining.");
- }
- }
-
/**
* Verifies that the partitions currently owned by the member (the ones
set in the
* request) matches the ones that the member should own. It matches if the
consumer
@@ -4687,8 +4549,6 @@ public class GroupMetadataManager {
AuthorizableRequestContext context,
ConsumerGroupHeartbeatRequestData request
) throws ApiException {
- throwIfConsumerGroupHeartbeatRequestIsInvalid(request,
context.requestVersion());
-
if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH ||
request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
// -1 means that the member wants to leave the group.
// -2 means that a static member wants to leave the group.
@@ -4729,8 +4589,6 @@ public class GroupMetadataManager {
AuthorizableRequestContext context,
StreamsGroupHeartbeatRequestData request
) throws ApiException {
- throwIfStreamsGroupHeartbeatRequestIsInvalid(request);
-
if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH ||
request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
// -1 means that the member wants to leave the group.
// -2 means that a static member wants to leave the group.
@@ -4813,8 +4671,6 @@ public class GroupMetadataManager {
AuthorizableRequestContext context,
ShareGroupHeartbeatRequestData request
) throws ApiException {
- throwIfShareGroupHeartbeatRequestIsInvalid(request);
-
if (request.memberEpoch() ==
ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH) {
// -1 means that the member wants to leave the group.
CoordinatorResult<ShareGroupHeartbeatResponseData,
CoordinatorRecord> result = shareGroupLeave(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 56bc21ca2de..704914e3600 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -91,6 +91,7 @@ import
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.share.persister.DefaultStatePersister;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
@@ -135,6 +136,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
+import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
import static
org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
import static
org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext;
import static
org.apache.kafka.coordinator.group.GroupConfigManagerTest.createConfigManager;
@@ -224,7 +226,12 @@ public class GroupCoordinatorServiceTest {
.build(true);
ConsumerGroupHeartbeatRequestData request = new
ConsumerGroupHeartbeatRequestData()
- .setGroupId("foo");
+ .setGroupId("foo")
+ .setMemberId(Uuid.randomUuid().toString())
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo", "bar"))
+ .setTopicPartitions(List.of());
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("consumer-group-heartbeat"),
@@ -257,7 +264,12 @@ public class GroupCoordinatorServiceTest {
.build(true);
ConsumerGroupHeartbeatRequestData request = new
ConsumerGroupHeartbeatRequestData()
- .setGroupId("foo");
+ .setGroupId("foo")
+ .setMemberId(Uuid.randomUuid().toString())
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo", "bar"))
+ .setTopicPartitions(List.of());
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("consumer-group-heartbeat"),
@@ -279,6 +291,162 @@ public class GroupCoordinatorServiceTest {
);
}
+ @Test
+ public void testConsumerHeartbeatRequestValidation() throws
ExecutionException, InterruptedException, TimeoutException {
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(mockRuntime())
+ .build(true);
+
+ AuthorizableRequestContext context =
mock(AuthorizableRequestContext.class);
+ when(context.requestVersion()).thenReturn((int)
ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion());
+
+ String memberId = Uuid.randomUuid().toString();
+
+ // MemberId must be present in all requests.
+ assertEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("MemberId can't be empty."),
+ service.consumerGroupHeartbeat(
+ context,
+ new ConsumerGroupHeartbeatRequestData()
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // GroupId must be present in all requests.
+ assertEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("GroupId can't be empty."),
+ service.consumerGroupHeartbeat(
+ context,
+ new ConsumerGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // GroupId can't be all whitespaces.
+ assertEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("GroupId can't be empty."),
+ service.consumerGroupHeartbeat(
+ context,
+ new ConsumerGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId(" ")
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // RebalanceTimeoutMs must be present in the first request (epoch ==
0).
+ assertEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("RebalanceTimeoutMs must be provided in first
request."),
+ service.consumerGroupHeartbeat(
+ context,
+ new ConsumerGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(0)
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // TopicPartitions must be present and empty in the first request
(epoch == 0).
+ assertEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("TopicPartitions must be empty when
(re-)joining."),
+ service.consumerGroupHeartbeat(
+ context,
+ new ConsumerGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // SubscribedTopicNames or SubscribedTopicRegex must be present in the
first request (epoch == 0).
+ assertEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("Either SubscribedTopicNames or
SubscribedTopicRegex must be non-null when (re-)joining."),
+ service.consumerGroupHeartbeat(
+ context,
+ new ConsumerGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setTopicPartitions(List.of())
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // InstanceId must be non-empty if provided in all requests.
+ assertEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("InstanceId can't be empty."),
+ service.consumerGroupHeartbeat(
+ context,
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setInstanceId("")
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // RackId must be non-empty if provided in all requests.
+ assertEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("RackId can't be empty."),
+ service.consumerGroupHeartbeat(
+ context,
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setRackId("")
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // ServerAssignor must exist if provided in all requests.
+ assertEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.UNSUPPORTED_ASSIGNOR.code())
+ .setErrorMessage("ServerAssignor bar is not supported.
Supported assignors: range."),
+ service.consumerGroupHeartbeat(
+ context,
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setServerAssignor("bar")
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // InstanceId must be non-empty if provided in all requests.
+ assertEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("InstanceId can't be null."),
+ service.consumerGroupHeartbeat(
+ context,
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(memberId)
+ .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo", "bar"))
+ .setTopicPartitions(List.of())
+ ).get(5, TimeUnit.SECONDS)
+ );
+ }
+
@Test
public void testStreamsGroupHeartbeatWhenNotStarted() throws
ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
@@ -313,7 +481,14 @@ public class GroupCoordinatorServiceTest {
.build(true);
StreamsGroupHeartbeatRequestData request = new
StreamsGroupHeartbeatRequestData()
- .setGroupId("foo");
+ .setGroupId("foo")
+ .setMemberId(Uuid.randomUuid().toString())
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())
+ .setTopology(new StreamsGroupHeartbeatRequestData.Topology());
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("streams-group-heartbeat"),
@@ -366,7 +541,14 @@ public class GroupCoordinatorServiceTest {
.build(true);
StreamsGroupHeartbeatRequestData request = new
StreamsGroupHeartbeatRequestData()
- .setGroupId("foo");
+ .setGroupId("foo")
+ .setMemberId(Uuid.randomUuid().toString())
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())
+ .setTopology(new StreamsGroupHeartbeatRequestData.Topology());
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("streams-group-heartbeat"),
@@ -391,6 +573,306 @@ public class GroupCoordinatorServiceTest {
);
}
+ @SuppressWarnings("MethodLength")
+ @Test
+ public void testStreamsHeartbeatRequestValidation() throws
ExecutionException, InterruptedException, TimeoutException {
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(mockRuntime())
+ .build(true);
+
+ AuthorizableRequestContext context =
mock(AuthorizableRequestContext.class);
+ when(context.requestVersion()).thenReturn((int)
ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion());
+
+ String memberId = Uuid.randomUuid().toString();
+
+ // MemberId must be present in all requests.
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("MemberId can't be empty."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // MemberId can't be all whitespaces.
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("MemberId can't be empty."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(" ")
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // GroupId must be present in all requests.
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("GroupId can't be empty."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // GroupId can't be all whitespaces.
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("GroupId can't be empty."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId(" ")
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // RebalanceTimeoutMs must be present in the first request (epoch ==
0).
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("RebalanceTimeoutMs must be provided in
first request."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(0)
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // ActiveTasks must be present and empty in the first request (epoch
== 0).
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("ActiveTasks must be empty when
(re-)joining."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // StandbyTasks must be present and empty in the first request (epoch
== 0).
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("StandbyTasks must be empty when
(re-)joining."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setActiveTasks(List.of())
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // WarmupTasks must be present and empty in the first request (epoch
== 0).
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("WarmupTasks must be empty when
(re-)joining."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // Topology must be present in the first request (epoch == 0).
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("Topology must be non-null when
(re-)joining."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // InstanceId must be non-empty if provided in all requests.
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("InstanceId can't be empty."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setInstanceId("")
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // RackId must be non-empty if provided in all requests.
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("RackId can't be empty."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setRackId("")
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // Instance id cannot be null when leaving with -2.
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("InstanceId can't be null."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberId(memberId)
+ .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(new
StreamsGroupHeartbeatRequestData.Topology())
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // Member epoch cannot be < -2
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("MemberEpoch is -3, but must be greater
than or equal to -2."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(-3)
+ .setRebalanceTimeoutMs(1500)
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // Topology must not be present in the later requests (epoch != 0).
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("Topology can only be provided when
(re-)joining."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(1)
+ .setRebalanceTimeoutMs(1500)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())
+ .setTopology(new
StreamsGroupHeartbeatRequestData.Topology())
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // Topology must not contain changelog topics with fixed partition
numbers
+ assertEquals(
+ new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData()
+ .setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code())
+ .setErrorMessage("Changelog topic
changelog_topic_with_fixed_partition must have an undefined partition count,
but it is set to 3."),
+ Map.of()
+ ),
+ service.streamsGroupHeartbeat(
+ context,
+ new StreamsGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())
+ .setTopology(new
StreamsGroupHeartbeatRequestData.Topology().setSubtopologies(
+ List.of(
+ new StreamsGroupHeartbeatRequestData.Subtopology()
+ .setStateChangelogTopics(
+ List.of(
+ new
StreamsGroupHeartbeatRequestData.TopicInfo()
+
.setName("changelog_topic_with_fixed_partition")
+ .setPartitions(3)
+ )
+ )
+ )
+ ))
+ ).get(5, TimeUnit.SECONDS)
+ );
+ }
+
@Test
public void testPartitionFor() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
@@ -2658,7 +3140,10 @@ public class GroupCoordinatorServiceTest {
.build(true);
ShareGroupHeartbeatRequestData request = new
ShareGroupHeartbeatRequestData()
- .setGroupId("foo");
+ .setGroupId("foo")
+ .setMemberId(Uuid.randomUuid().toString())
+ .setMemberEpoch(0)
+ .setSubscribedTopicNames(List.of("foo", "bar"));
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("share-group-heartbeat"),
@@ -2694,7 +3179,10 @@ public class GroupCoordinatorServiceTest {
.build(true);
ShareGroupHeartbeatRequestData request = new
ShareGroupHeartbeatRequestData()
- .setGroupId("foo");
+ .setGroupId("foo")
+ .setMemberId(Uuid.randomUuid().toString())
+ .setMemberEpoch(0)
+ .setSubscribedTopicNames(List.of("foo", "bar"));
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("share-group-heartbeat"),
@@ -2716,6 +3204,113 @@ public class GroupCoordinatorServiceTest {
);
}
+ @Test
+ public void testShareGroupHeartbeatRequestValidation() throws
ExecutionException, InterruptedException, TimeoutException {
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(mockRuntime())
+ .build(true);
+
+ AuthorizableRequestContext context =
mock(AuthorizableRequestContext.class);
+ when(context.requestVersion()).thenReturn((int)
ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion());
+
+ String memberId = Uuid.randomUuid().toString();
+
+ // MemberId must be present in all requests.
+ assertEquals(
+ new ShareGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("MemberId can't be empty."),
+ service.shareGroupHeartbeat(
+ context,
+ new ShareGroupHeartbeatRequestData()
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // GroupId must be present in all requests.
+ assertEquals(
+ new ShareGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("GroupId can't be empty."),
+ service.shareGroupHeartbeat(
+ context,
+ new ShareGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // GroupId can't be all whitespaces.
+ assertEquals(
+ new ShareGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("GroupId can't be empty."),
+ service.shareGroupHeartbeat(
+ context,
+ new ShareGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId(" ")
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // SubscribedTopicNames must be present and empty in the first request
(epoch == 0).
+ assertEquals(
+ new ShareGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("SubscribedTopicNames must be set in first
request."),
+ service.shareGroupHeartbeat(
+ context,
+ new ShareGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(0)
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // MemberId must be non-empty in all requests except for the first one
where it
+ // could be empty (epoch != 0).
+ assertEquals(
+ new ShareGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("MemberId can't be empty."),
+ service.shareGroupHeartbeat(
+ context,
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId("foo")
+ .setMemberEpoch(1)
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // RackId must be non-empty if provided in all requests.
+ assertEquals(
+ new ShareGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("RackId can't be empty."),
+ service.shareGroupHeartbeat(
+ context,
+ new ShareGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(1)
+ .setRackId("")
+ ).get(5, TimeUnit.SECONDS)
+ );
+
+ // Invalid member epoch.
+ assertEquals(
+ new ShareGroupHeartbeatResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("MemberEpoch is invalid."),
+ service.shareGroupHeartbeat(
+ context,
+ new ShareGroupHeartbeatRequestData()
+ .setMemberId(memberId)
+ .setGroupId("foo")
+ .setMemberEpoch(-10)
+ ).get(5, TimeUnit.SECONDS)
+ );
+ }
+
+
@Test
public void testShareGroupDescribe() throws InterruptedException,
ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index b3a51d26447..e88f440aa23 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -28,15 +28,12 @@ import
org.apache.kafka.common.errors.GroupMaxSizeReachedException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
import org.apache.kafka.common.errors.InvalidRegularExpression;
-import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
-import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnreleasedInstanceIdException;
-import org.apache.kafka.common.errors.UnsupportedAssignorException;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
@@ -216,99 +213,6 @@ import static org.mockito.Mockito.when;
public class GroupMetadataManagerTest {
- @Test
- public void testConsumerHeartbeatRequestValidation() {
- MockPartitionAssignor assignor = new MockPartitionAssignor("range");
- String memberId = Uuid.randomUuid().toString();
- GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
-
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
- .build();
- Exception ex;
-
- // MemberId must be present in all requests.
- ex = assertThrows(InvalidRequestException.class, () ->
context.consumerGroupHeartbeat(
- new ConsumerGroupHeartbeatRequestData()));
- assertEquals("MemberId can't be empty.", ex.getMessage());
-
- // GroupId must be present in all requests.
- ex = assertThrows(InvalidRequestException.class, () ->
context.consumerGroupHeartbeat(
- new ConsumerGroupHeartbeatRequestData()
- .setMemberId(memberId)));
- assertEquals("GroupId can't be empty.", ex.getMessage());
-
- // GroupId can't be all whitespaces.
- ex = assertThrows(InvalidRequestException.class, () ->
context.consumerGroupHeartbeat(
- new ConsumerGroupHeartbeatRequestData()
- .setMemberId(memberId)
- .setGroupId(" ")));
- assertEquals("GroupId can't be empty.", ex.getMessage());
-
- // RebalanceTimeoutMs must be present in the first request (epoch ==
0).
- ex = assertThrows(InvalidRequestException.class, () ->
context.consumerGroupHeartbeat(
- new ConsumerGroupHeartbeatRequestData()
- .setMemberId(memberId)
- .setGroupId("foo")
- .setMemberEpoch(0)));
- assertEquals("RebalanceTimeoutMs must be provided in first request.",
ex.getMessage());
-
- // TopicPartitions must be present and empty in the first request
(epoch == 0).
- ex = assertThrows(InvalidRequestException.class, () ->
context.consumerGroupHeartbeat(
- new ConsumerGroupHeartbeatRequestData()
- .setMemberId(memberId)
- .setGroupId("foo")
- .setMemberEpoch(0)
- .setRebalanceTimeoutMs(5000)));
- assertEquals("TopicPartitions must be empty when (re-)joining.",
ex.getMessage());
-
- // SubscribedTopicNames or SubscribedTopicRegex must be present in the
first request (epoch == 0).
- ex = assertThrows(InvalidRequestException.class, () ->
context.consumerGroupHeartbeat(
- new ConsumerGroupHeartbeatRequestData()
- .setMemberId(memberId)
- .setGroupId("foo")
- .setMemberEpoch(0)
- .setRebalanceTimeoutMs(5000)
- .setTopicPartitions(List.of())));
- assertEquals("Either SubscribedTopicNames or SubscribedTopicRegex must
be non-null when (re-)joining.", ex.getMessage());
-
- // InstanceId must be non-empty if provided in all requests.
- ex = assertThrows(InvalidRequestException.class, () ->
context.consumerGroupHeartbeat(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("foo")
- .setMemberId(memberId)
- .setMemberEpoch(1)
- .setInstanceId("")));
- assertEquals("InstanceId can't be empty.", ex.getMessage());
-
- // RackId must be non-empty if provided in all requests.
- ex = assertThrows(InvalidRequestException.class, () ->
context.consumerGroupHeartbeat(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("foo")
- .setMemberId(memberId)
- .setMemberEpoch(1)
- .setRackId("")));
- assertEquals("RackId can't be empty.", ex.getMessage());
-
- // ServerAssignor must exist if provided in all requests.
- ex = assertThrows(UnsupportedAssignorException.class, () ->
context.consumerGroupHeartbeat(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("foo")
- .setMemberId(memberId)
- .setMemberEpoch(1)
- .setServerAssignor("bar")));
- assertEquals("ServerAssignor bar is not supported. Supported
assignors: range.", ex.getMessage());
-
- ex = assertThrows(InvalidRequestException.class, () ->
context.consumerGroupHeartbeat(
- new ConsumerGroupHeartbeatRequestData()
- .setGroupId("foo")
- .setMemberId(memberId)
- .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
- .setRebalanceTimeoutMs(5000)
- .setSubscribedTopicNames(List.of("foo", "bar"))
- .setTopicPartitions(List.of())));
-
- assertEquals("InstanceId can't be null.", ex.getMessage());
- }
-
@Test
public void testConsumerHeartbeatRegexValidation() {
String memberId = Uuid.randomUuid().toString();
@@ -14837,59 +14741,6 @@ public class GroupMetadataManagerTest {
assertEquals(Group.GroupType.CONSUMER,
context.groupMetadataManager.group(groupId).type());
}
- @Test
- public void testShareGroupHeartbeatRequestValidation() {
- MockPartitionAssignor assignor = new MockPartitionAssignor("share");
- GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
- .withShareGroupAssignor(assignor)
- .build();
- Exception ex;
-
- // MemberId must be present in all requests.
- ex = assertThrows(InvalidRequestException.class, () ->
context.shareGroupHeartbeat(
- new ShareGroupHeartbeatRequestData()));
- assertEquals("MemberId can't be empty.", ex.getMessage());
-
- // GroupId must be present in all requests.
- String memberId = Uuid.randomUuid().toString();
- ex = assertThrows(InvalidRequestException.class, () ->
context.shareGroupHeartbeat(
- new ShareGroupHeartbeatRequestData()
- .setMemberId(memberId)));
- assertEquals("GroupId can't be empty.", ex.getMessage());
-
- // GroupId can't be all whitespaces.
- ex = assertThrows(InvalidRequestException.class, () ->
context.shareGroupHeartbeat(
- new ShareGroupHeartbeatRequestData()
- .setMemberId(memberId)
- .setGroupId(" ")));
- assertEquals("GroupId can't be empty.", ex.getMessage());
-
- // SubscribedTopicNames must be present and empty in the first request
(epoch == 0).
- ex = assertThrows(InvalidRequestException.class, () ->
context.shareGroupHeartbeat(
- new ShareGroupHeartbeatRequestData()
- .setMemberId(memberId)
- .setGroupId("foo")
- .setMemberEpoch(0)));
- assertEquals("SubscribedTopicNames must be set in first request.",
ex.getMessage());
-
- // MemberId must be non-empty in all requests except for the first one
where it
- // could be empty (epoch != 0).
- ex = assertThrows(InvalidRequestException.class, () ->
context.shareGroupHeartbeat(
- new ShareGroupHeartbeatRequestData()
- .setGroupId("foo")
- .setMemberEpoch(1)));
- assertEquals("MemberId can't be empty.", ex.getMessage());
-
- // RackId must be non-empty if provided in all requests.
- ex = assertThrows(InvalidRequestException.class, () ->
context.shareGroupHeartbeat(
- new ShareGroupHeartbeatRequestData()
- .setMemberId(memberId)
- .setGroupId("foo")
- .setMemberEpoch(1)
- .setRackId("")));
- assertEquals("RackId can't be empty.", ex.getMessage());
- }
-
@Test
public void testShareGroupDescribeRequest() {
GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder().build();
@@ -15488,168 +15339,6 @@ public class GroupMetadataManagerTest {
assertEquals(ShareGroup.ShareGroupState.STABLE,
context.shareGroupState(groupId));
}
- @Test
- public void testStreamsHeartbeatRequestValidation() {
- String memberId = Uuid.randomUuid().toString();
- GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
- .build();
- Exception ex;
-
- // MemberId must be present in all requests.
- ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
- new StreamsGroupHeartbeatRequestData()));
- assertEquals("MemberId can't be empty.", ex.getMessage());
-
- // MemberId can't be all whitespaces.
- ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
- new StreamsGroupHeartbeatRequestData()
- .setMemberId(" ")));
- assertEquals("MemberId can't be empty.", ex.getMessage());
-
- // GroupId must be present in all requests.
- ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
- new StreamsGroupHeartbeatRequestData()
- .setMemberId(memberId)));
- assertEquals("GroupId can't be empty.", ex.getMessage());
-
- // GroupId can't be all whitespaces.
- ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
- new StreamsGroupHeartbeatRequestData()
- .setMemberId(memberId)
- .setGroupId(" ")));
- assertEquals("GroupId can't be empty.", ex.getMessage());
-
- // RebalanceTimeoutMs must be present in the first request (epoch ==
0).
- ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
- new StreamsGroupHeartbeatRequestData()
- .setMemberId(memberId)
- .setGroupId("foo")
- .setMemberEpoch(0)));
- assertEquals("RebalanceTimeoutMs must be provided in first request.",
ex.getMessage());
-
- // ActiveTasks must be present and empty in the first request (epoch
== 0).
- ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
- new StreamsGroupHeartbeatRequestData()
- .setMemberId(memberId)
- .setGroupId("foo")
- .setMemberEpoch(0)
- .setRebalanceTimeoutMs(1500)));
- assertEquals("ActiveTasks must be empty when (re-)joining.",
ex.getMessage());
-
- // StandbyTasks must be present and empty in the first request (epoch
== 0).
- ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
- new StreamsGroupHeartbeatRequestData()
- .setMemberId(memberId)
- .setGroupId("foo")
- .setMemberEpoch(0)
- .setRebalanceTimeoutMs(1500)
- .setActiveTasks(List.of())));
- assertEquals("StandbyTasks must be empty when (re-)joining.",
ex.getMessage());
-
- // WarmupTasks must be present and empty in the first request (epoch
== 0).
- ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
- new StreamsGroupHeartbeatRequestData()
- .setMemberId(memberId)
- .setGroupId("foo")
- .setMemberEpoch(0)
- .setRebalanceTimeoutMs(1500)
- .setActiveTasks(List.of())
- .setStandbyTasks(List.of())));
- assertEquals("WarmupTasks must be empty when (re-)joining.",
ex.getMessage());
-
- // Topology must be present in the first request (epoch == 0).
- ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
- new StreamsGroupHeartbeatRequestData()
- .setMemberId(memberId)
- .setGroupId("foo")
- .setMemberEpoch(0)
- .setRebalanceTimeoutMs(1500)
- .setActiveTasks(List.of())
- .setStandbyTasks(List.of())
- .setWarmupTasks(List.of())));
- assertEquals("Topology must be non-null when (re-)joining.",
ex.getMessage());
-
- // InstanceId must be non-empty if provided in all requests.
- ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
- new StreamsGroupHeartbeatRequestData()
- .setGroupId("foo")
- .setMemberId(memberId)
- .setMemberEpoch(1)
- .setInstanceId("")));
- assertEquals("InstanceId can't be empty.", ex.getMessage());
-
- // RackId must be non-empty if provided in all requests.
- ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
- new StreamsGroupHeartbeatRequestData()
- .setGroupId("foo")
- .setMemberId(memberId)
- .setMemberEpoch(1)
- .setRackId("")));
- assertEquals("RackId can't be empty.", ex.getMessage());
-
- ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
- new StreamsGroupHeartbeatRequestData()
- .setGroupId("foo")
- .setMemberId(memberId)
- .setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
- .setRebalanceTimeoutMs(1500)
- .setTopology(new StreamsGroupHeartbeatRequestData.Topology())
- .setActiveTasks(List.of())
- .setStandbyTasks(List.of())
- .setWarmupTasks(List.of())));
- assertEquals("InstanceId can't be null.", ex.getMessage());
-
- // Member epoch cannot be < -2
- ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
- new StreamsGroupHeartbeatRequestData()
- .setMemberId(memberId)
- .setGroupId("foo")
- .setMemberEpoch(-3)
- .setRebalanceTimeoutMs(1500)
- ));
- assertEquals("MemberEpoch is -3, but must be greater than or equal to
-2.", ex.getMessage());
-
- // Topology must not be present in the later requests (epoch != 0).
- ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
- new StreamsGroupHeartbeatRequestData()
- .setMemberId(memberId)
- .setGroupId("foo")
- .setMemberEpoch(1)
- .setRebalanceTimeoutMs(1500)
- .setActiveTasks(List.of())
- .setStandbyTasks(List.of())
- .setWarmupTasks(List.of())
- .setTopology(new StreamsGroupHeartbeatRequestData.Topology())
- ));
- assertEquals("Topology can only be provided when (re-)joining.",
ex.getMessage());
-
- // Topology must not contain changelog topics with fixed partition
numbers
- StreamsInvalidTopologyException topoEx =
assertThrows(StreamsInvalidTopologyException.class, () ->
context.streamsGroupHeartbeat(
- new StreamsGroupHeartbeatRequestData()
- .setMemberId(memberId)
- .setGroupId("foo")
- .setMemberEpoch(0)
- .setRebalanceTimeoutMs(1500)
- .setActiveTasks(List.of())
- .setStandbyTasks(List.of())
- .setWarmupTasks(List.of())
- .setTopology(new
StreamsGroupHeartbeatRequestData.Topology().setSubtopologies(
- List.of(
- new StreamsGroupHeartbeatRequestData.Subtopology()
- .setStateChangelogTopics(
- List.of(
- new
StreamsGroupHeartbeatRequestData.TopicInfo()
-
.setName("changelog_topic_with_fixed_partition")
- .setPartitions(3)
- )
- )
- )
- ))
- ));
- assertEquals("Changelog topic changelog_topic_with_fixed_partition
must have an undefined partition count, but it is set to 3.",
- topoEx.getMessage());
- }
-
@Test
public void testUnknownStreamsGroupId() {
String groupId = "fooup";