dajac commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1523572950
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -1211,13 +1192,99 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr // 1. The member reported its owned partitions; // 2. The member just joined or rejoined to group (epoch equals to zero); // 3. The member's assignment has been updated. - if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) { + if (ownedTopicPartitions != null || memberEpoch == 0 || hasAssignedPartitionsChanged(member, updatedMember)) { response.setAssignment(createResponseAssignment(updatedMember)); } return new CoordinatorResult<>(records, response); } + /** + * Reconciles the current assignment of the member if needed. + * + * @param groupId The group id. + * @param member The member to reconcile. + * @param currentPartitionEpoch The function returning the current epoch of + * a given partition. + * @param targetAssignmentEpoch The target assignment epoch. + * @param targetAssignment The target assignment. + * @param ownedTopicPartitions The list of partitions owned by the member. This + * is reported in the ConsumerGroupHeartbeat API and + * it could be null if not provided. + * @param records The list to accumulate any new records. + * @return The received member if no changes have been made; or a new + * member containing the new assignment. + */ + private ConsumerGroupMember maybeReconcile( + String groupId, + ConsumerGroupMember member, + BiFunction<Uuid, Integer, Integer> currentPartitionEpoch, + int targetAssignmentEpoch, + Assignment targetAssignment, + List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions, + List<Record> records + ) { + if (member.isReconciledTo(targetAssignmentEpoch)) { + return member; + } + + ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member) + .withTargetAssignment(targetAssignmentEpoch, targetAssignment) + .withCurrentPartitionEpoch(currentPartitionEpoch) + .withOwnedTopicPartitions(ownedTopicPartitions) + .build(); + + if (!updatedMember.equals(member)) { + records.add(newCurrentAssignmentRecord(groupId, updatedMember)); + + log.info("[GroupId {}] Member {} new assignment state: epoch={}, previousEpoch={}, state={}, " + + "assignedPartitions={} and revokedPartitions={}.", + groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(), + formatAssignment(updatedMember.assignedPartitions()), formatAssignment(updatedMember.revokedPartitions())); + + if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) { + scheduleConsumerGroupRebalanceTimeout( + groupId, + updatedMember.memberId(), + updatedMember.memberEpoch(), + updatedMember.rebalanceTimeoutMs() + ); + } else { Review Comment: Yeah, this could potentially happen. That's the tradeoff that we have to make here. However, I would like to point out that on the client, the partition is considered assigned even if the client may not be able to fetch from it (e.g. in case of stale metadata) so the contract is loose anyway here. If it turns out to be an issue in the future, I think that we could have a timer for individual partitions. I refrained myself from doing this because it brings a lot of complexity and it does not seem worth it at the moment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org