dajac commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1492563558


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
         // 1. The member reported its owned partitions;
         // 2. The member just joined or rejoined to group (epoch equals to 
zero);
         // 3. The member's assignment has been updated.
-        if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+        if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
             response.setAssignment(createResponseAssignment(updatedMember));
         }
 
         return new CoordinatorResult<>(records, response);
     }
 
+    /**
+     * Reconciles the current assignment of the member if needed.
+     *
+     * @param groupId               The group id.
+     * @param member                The member to reconcile.
+     * @param currentPartitionEpoch The function returning the current epoch of
+     *                              a given partition.
+     * @param targetAssignmentEpoch The target assignment epoch.
+     * @param targetAssignment      The target assignment.
+     * @param ownedTopicPartitions  The list of partitions owned by the 
member. This
+     *                              is reported in the ConsumerGroupHeartbeat 
API and
+     *                              it could be null if not provided.
+     * @param records               The list to accumulate any new records.
+     * @return The received member if no changes have been made; or a new
+     *         member containing the new assignment.
+     */
+    private ConsumerGroupMember maybeReconcile(
+        String groupId,
+        ConsumerGroupMember member,
+        BiFunction<Uuid, Integer, Integer> currentPartitionEpoch,
+        int targetAssignmentEpoch,
+        Assignment targetAssignment,
+        List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions,
+        List<Record> records
+    ) {
+        if (member.isReconciledTo(targetAssignmentEpoch)) {
+            return member;
+        }
+
+        ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+            .withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+            .withCurrentPartitionEpoch(currentPartitionEpoch)
+            .withOwnedTopicPartitions(ownedTopicPartitions)
+            .build();
+
+        if (!updatedMember.equals(member)) {
+            records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+            log.info("[GroupId {}] Member {} new assignment state: epoch={}, 
previousEpoch={}, state={}, "
+                     + "assignedPartitions={} and revokedPartitions={}.",
+                groupId, updatedMember.memberId(), 
updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), 
updatedMember.state(),
+                formatAssignment(updatedMember.assignedPartitions()), 
formatAssignment(updatedMember.revokedPartitions()));
+
+            if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
+                scheduleConsumerGroupRebalanceTimeout(
+                    groupId,
+                    updatedMember.memberId(),
+                    updatedMember.memberEpoch(),
+                    updatedMember.rebalanceTimeoutMs()
+                );
+            } else {

Review Comment:
   The rebalance is not complete here because the member has to reach Stable 
for this. However, we only have a timeout on the revocation part. We basically 
want to member to not block revoked partitions forever.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to