lucasbru commented on code in PR #22245:
URL: https://github.com/apache/kafka/pull/22245#discussion_r3272902827
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2148,20 +2234,28 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
// The assignment is only provided in the following cases:
// 1. The member is joining.
// 2. The member's assignment has been updated.
- if (memberEpoch == 0 || hasAssignedTasksChanged(member,
updatedMember)) {
+ boolean newlyJoinOrAssignmentChanged = memberEpoch == 0 ||
hasAssignedTasksChanged(member, updatedMember);
+ boolean hasReplacedStaticMember = maybeOldMember != null &&
!maybeOldMember.memberId().equals(updatedMember.memberId());
+ boolean userEndpointChanged = hasUserEndpointChanged(maybeOldMember,
updatedMember);
+ if (newlyJoinOrAssignmentChanged) {
response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIdsFromEpochs(updatedMember.assignedTasks().activeTasksWithEpochs()));
response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks()));
response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks()));
+ }
+
+ if (newlyJoinOrAssignmentChanged || hasReplacedStaticMember ||
userEndpointChanged) {
group.invalidateCachedEndpointToPartitions(updatedMember.memberId());
- if (updatedMember.userEndpoint().isPresent()) {
- // If no user endpoint is defined, there is no change in the
endpoint information.
- // Otherwise, bump the endpoint information epoch
-
group.setEndpointInformationEpoch(group.endpointInformationEpoch() + 1);
+ if (hasReplacedStaticMember) {
+
group.invalidateCachedEndpointToPartitions(maybeOldMember.memberId());
}
}
+ if (userEndpointChanged || (hasAssignedTasksChanged(member,
updatedMember) && updatedMember.userEndpoint().isPresent())) {
Review Comment:
we already compute hasAssignedTasksChanged as part of
newlyJoinOrAssignmentChanged at line 2237 - could we hoist it to a local and
reuse? full TasksTupleWithEpochs.equals runs twice on every steady-state
heartbeat otherwise
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -311,7 +313,7 @@ private static UpdateTargetAssignmentResult<TasksTuple>
fromLastTargetAssignment
if (member.isPresent()) {
return new UpdateTargetAssignmentResult<>(
group.assignmentEpoch(),
- group.targetAssignment(member.get().memberId())
+ group.targetAssignment(member.get().memberId(),
member.get().instanceId())
Review Comment:
member.get().memberId() is already in hand, so passing instanceId here
triggers a redundant staticMember lookup inside targetAssignment().
Optional.empty() would avoid it - this runs on every steady-state heartbeat
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4485,6 +4723,42 @@ private void replaceMember(
));
}
+ /**
+ * Write records to replace the old member by the new member.
+ *
+ * @param records The list of records to append to.
+ * @param group The streams group.
+ * @param oldMember The old member.
+ * @param newMember The new member.
+ */
+ private void replaceStreamsMember(
+ List<CoordinatorRecord> records,
+ StreamsGroup group,
+ StreamsGroupMember oldMember,
+ StreamsGroupMember newMember
+ ) {
+ String groupId = group.groupId();
+
+ // Remove the member without canceling its timers in case the change
is reverted. If the
+ // change is not reverted, the group validation will fail and the
timer will do nothing.
+ records.addAll(removeStreamsMember(groupId, oldMember.memberId()));
+
+ // Generate records.
+
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(
+ groupId,
+ newMember
+ ));
+
records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(
+ groupId,
+ newMember.memberId(),
+ group.targetAssignment(oldMember.memberId(),
oldMember.instanceId())
Review Comment:
same as in fromLastTargetAssignment - oldMember.memberId() is already the
key, instanceId just does another staticMember lookup. Optional.empty() works
here
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2148,20 +2234,28 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
// The assignment is only provided in the following cases:
// 1. The member is joining.
// 2. The member's assignment has been updated.
- if (memberEpoch == 0 || hasAssignedTasksChanged(member,
updatedMember)) {
+ boolean newlyJoinOrAssignmentChanged = memberEpoch == 0 ||
hasAssignedTasksChanged(member, updatedMember);
Review Comment:
could we just reuse isJoining here? same condition, and the literal 0 reads
a bit jarring since it was swapped for JOIN_GROUP_MEMBER_EPOCH a few lines up
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -8966,6 +9240,35 @@ private Map<String, String>
streamsGroupAssignmentConfigs(String groupId) {
));
}
+ private boolean hasUserEndpointChanged(StreamsGroupMember maybeOldMember,
StreamsGroupMember updatedMember) {
Review Comment:
this is equivalent to: return !Objects.equals(maybeOldMember == null ? null
: maybeOldMember.userEndpoint().orElse(null),
updatedMember.userEndpoint().orElse(null));
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]