[ https://issues.apache.org/jira/browse/KAFKA-13581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17477859#comment-17477859 ]
Kvicii.Yu commented on KAFKA-13581: ----------------------------------- [~chenzy] thanks for your reporting. In fact, I agree with this suggestion, but what are the benefits of doing this? please let me know > Error getting old protocol > -------------------------- > > Key: KAFKA-13581 > URL: https://issues.apache.org/jira/browse/KAFKA-13581 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 2.5.0 > Reporter: chenzhongyu > Priority: Minor > Fix For: 2.5.0 > > > In this case,oldProtocols will always be the protocols,because > knownStaticMember is updated before.So, I think oldProtocol should be > assigned before updateMember. > {code:java} > private def updateStaticMemberAndRebalance(group: GroupMetadata, > newMemberId: String, > groupInstanceId: Option[String], > protocols: List[(String, > Array[Byte])], > responseCallback: JoinCallback): > Unit = { > val oldMemberId = group.getStaticMemberId(groupInstanceId) > info(s"Static member $groupInstanceId of group ${group.groupId} with > unknown member id rejoins, assigning new member id $newMemberId, while " + > s"old member id $oldMemberId will be removed.") > val currentLeader = group.leaderOrNull > val member = group.replaceGroupInstance(oldMemberId, newMemberId, > groupInstanceId) > // Heartbeat of old member id will expire without effect since the group no > longer contains that member id. > // New heartbeat shall be scheduled with new member id. > completeAndScheduleNextHeartbeatExpiration(group, member) > val knownStaticMember = group.get(newMemberId) > group.updateMember(knownStaticMember, protocols, responseCallback) > val oldProtocols = knownStaticMember.supportedProtocols > group.currentState match { > case Stable => > // check if group's selectedProtocol of next generation will change, if > not, simply store group to persist the > // updated static member, if yes, rebalance should be triggered to let > the group's assignment and selectProtocol consistent > val selectedProtocolOfNextGeneration = group.selectProtocol > if (group.protocolName.contains(selectedProtocolOfNextGeneration)) { > info(s"Static member which joins during Stable stage and doesn't > affect selectProtocol will not trigger rebalance.") > val groupAssignment: Map[String, Array[Byte]] = > group.allMemberMetadata.map(member => member.memberId -> > member.assignment).toMap > groupManager.storeGroup(group, groupAssignment, error => { > if (error != Errors.NONE) { > warn(s"Failed to persist metadata for group ${group.groupId}: > ${error.message}") > // Failed to persist member.id of the given static member, revert > the update of the static member in the group. > group.updateMember(knownStaticMember, oldProtocols, null) > val oldMember = group.replaceGroupInstance(newMemberId, > oldMemberId, groupInstanceId) > completeAndScheduleNextHeartbeatExpiration(group, oldMember) > responseCallback(JoinGroupResult( > List.empty, > memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID, > generationId = group.generationId, > protocolType = group.protocolType, > protocolName = group.protocolName, > leaderId = currentLeader, > error = error > )) > } else { > group.maybeInvokeJoinCallback(member, JoinGroupResult( > members = List.empty, > memberId = newMemberId, > generationId = group.generationId, > protocolType = group.protocolType, > protocolName = group.protocolName, > // We want to avoid current leader performing trivial > assignment while the group > // is in stable stage, because the new assignment in leader's > next sync call > // won't be broadcast by a stable group. This could be > guaranteed by > // always returning the old leader id so that the current > leader won't assume itself > // as a leader based on the returned message, since the new > member.id won't match > // returned leader id, therefore no assignment will be > performed. > leaderId = currentLeader, > error = Errors.NONE)) > } > }) > } else { > maybePrepareRebalance(group, s"Group's selectedProtocol will change > because static member ${member.memberId} with instance id $groupInstanceId > joined with change of protocol") > } > case CompletingRebalance => > // if the group is in after-sync stage, upon getting a new join-group > of a known static member > // we should still trigger a new rebalance, since the old member may > already be sent to the leader > // for assignment, and hence when the assignment gets back there would > be a mismatch of the old member id > // with the new replaced member id. As a result the new member id would > not get any assignment. > prepareRebalance(group, s"Updating metadata for static member > ${member.memberId} with instance id $groupInstanceId") > case Empty | Dead => > throw new IllegalStateException(s"Group ${group.groupId} was not > supposed to be " + > s"in the state ${group.currentState} when the unknown static member > $groupInstanceId rejoins.") > case PreparingRebalance => > } > } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)