AndrewJSchofield commented on code in PR #19026:
URL: https://github.com/apache/kafka/pull/19026#discussion_r1995963682
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java:
##########
@@ -74,6 +83,30 @@ public Set<String> racksForPartition(Uuid topicId, int
partition) {
return Set.of();
}
+ /**
+ * Returns a set of assignable partitions from the topic metadata.
+ * If the allowed partition map is null, all the partitions in the
corresponding
+ * topic metadata are returned for the argument topic id. If allowed map
is empty,
+ * empty set is returned.
+ *
+ * @param topicId The uuid of the topic
+ * @return Set of integers if assignable partitions available, empty
otherwise.
+ * @throws UnknownTopicIdException if the topicId is not found in the
metadata.
Review Comment:
This method does not throw this exception :)
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java:
##########
@@ -287,6 +287,11 @@ protected MemberSubscriptionAndAssignmentImpl
newMemberSubscriptionAndAssignment
*/
private Map<String, String> staticMembers = new HashMap<>();
+ /**
+ * Topic partition allow map.
+ */
+ private Map<Uuid, Set<Integer>> topicPartitionAllowedMap = new HashMap<>();
Review Comment:
I would call it the `topicAssignablePartitionsMap` I think. "Allow" sounds
like there's an authorization aspect to it, I think.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java:
##########
@@ -74,6 +83,30 @@ public Set<String> racksForPartition(Uuid topicId, int
partition) {
return Set.of();
}
+ /**
+ * Returns a set of assignable partitions from the topic metadata.
+ * If the allowed partition map is null, all the partitions in the
corresponding
+ * topic metadata are returned for the argument topic id. If allowed map
is empty,
+ * empty set is returned.
+ *
+ * @param topicId The uuid of the topic
+ * @return Set of integers if assignable partitions available, empty
otherwise.
+ * @throws UnknownTopicIdException if the topicId is not found in the
metadata.
Review Comment:
Actually, you've used `Set.of()` in both cases, which is fine. Just an
observation. I can imagine that returning a `null` might be distasteful to some
people and I won't push it if you prefer it this way.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -443,7 +454,19 @@ public CompletableFuture<ShareGroupHeartbeatResponseData>
shareGroupHeartbeat(
topicPartitionFor(request.groupId()),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.shareGroupHeartbeat(context, request)
- ).exceptionally(exception -> handleOperationException(
+ ).thenCompose(result -> {
+ // This ensures that the previous group write has completed
successfully
+ // before we start the persister initialize phase.
+ if (result.getValue().isPresent()) {
Review Comment:
So, the obvious question here is, what happens if the previous group write
has NOT completed successfully and the result is not yet present?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2323,10 +2346,115 @@ private
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> sh
// 2. The member's assignment has been updated.
boolean isFullRequest = subscribedTopicNames != null;
if (memberEpoch == 0 || isFullRequest ||
hasAssignedPartitionsChanged(member, updatedMember)) {
-
response.setAssignment(createShareGroupResponseAssignment(updatedMember));
+ ShareGroupHeartbeatResponseData.Assignment assignment =
createShareGroupResponseAssignment(updatedMember);
+ response.setAssignment(assignment);
}
- return new CoordinatorResult<>(records, response);
+ return new CoordinatorResult<>(
+ records,
+ Map.entry(
+ response,
+ maybeCreateInitializeShareGroupStateRequest(groupId,
groupEpoch, subscriptionMetadata)
+ )
+ );
+ }
+
+ private boolean initializedAssignmentPending(ShareGroup group) {
+ if (!shareGroupPartitionMetadata.containsKey(group.groupId())) {
+ // No initialized share partitions for the group
Review Comment:
nit: Comment could easily be on a single longer line. General point for this
method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]