jolshan commented on code in PR #15364: URL: https://github.com/apache/kafka/pull/15364#discussion_r1520414314
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ########## @@ -170,72 +127,122 @@ public CurrentAssignmentBuilder withOwnedTopicPartitions( * @return A new ConsumerGroupMember or the current one. */ public ConsumerGroupMember build() { - // A new target assignment has been installed, we need to restart - // the reconciliation loop from the beginning. - if (targetAssignmentEpoch != member.targetMemberEpoch()) { - return transitionToNewTargetAssignmentState(); - } - switch (member.state()) { - // Check if the partitions have been revoked by the member. - case REVOKING: - return maybeTransitionFromRevokingToAssigningOrStable(); + case STABLE: + // When the member is in the STABLE state, we verify if a newer + // epoch (or target assignment) is available. If it is, we can + // reconcile the member towards it. Otherwise, we return. + if (member.memberEpoch() != targetAssignmentEpoch) { + return computeNextAssignment( + member.memberEpoch(), + member.assignedPartitions() + ); + } else { + return member; + } - // Check if pending partitions have been freed up. - case ASSIGNING: - return maybeTransitionFromAssigningToAssigningOrStable(); + case UNREVOKED_PARTITIONS: + // When the member is in the UNREVOKED_PARTITIONS state, we wait + // until the member has revoked the necessary partitions. They are + // considered revoked when they are not anymore reported in the + // owned partitions set in the ConsumerGroupHeartbeat API. - // Nothing to do. - case STABLE: - return member; + // If the member does not provide its owned partitions. We cannot + // progress. + if (ownedTopicPartitions == null) { + return member; + } + + // If the member provides its owned partitions. We verify if it still + // owns any of the revoked partitions. If it does, we cannot progress. + for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions : ownedTopicPartitions) { + for (Integer partitionId : topicPartitions.partitions()) { + boolean stillHasRevokedPartition = member + .partitionsPendingRevocation() + .getOrDefault(topicPartitions.topicId(), Collections.emptySet()) + .contains(partitionId); + if (stillHasRevokedPartition) { + return member; + } + } + } + + // When the member has revoked all the pending partitions, it can + // transition to the next epoch (current + 1) and we can reconcile + // its state towards the latest target assignment. + return computeNextAssignment( + member.memberEpoch() + 1, + member.assignedPartitions() + ); + + case UNRELEASED_PARTITIONS: + // When the member is in the UNRELEASED_PARTITIONS, we reconcile the + // member towards the latest target assignment. This will assign any + // of the unreleased partitions when they become available. + return computeNextAssignment( + member.memberEpoch(), + member.assignedPartitions() + ); + + case UNKNOWN: + // We could only end up in this state if a new state is added in the + // future and the group coordinator is downgraded. In this case, the + // best option is to fence the member to force it to rejoin the group + // without any partitions and to reconcile it again from scratch. + if (ownedTopicPartitions == null || !ownedTopicPartitions.isEmpty()) { + throw new FencedMemberEpochException("The consumer group member is in a unknown state. " + + "The member must abandon all its partitions and rejoin."); + } + + return computeNextAssignment( Review Comment: Is the idea we only hit this case below on restart? Ie, we fence and force the member out of the group, but it will still be unknown on rejoining (just with no owned partitions) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org