[ 
https://issues.apache.org/jira/browse/KAFKA-13581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17477859#comment-17477859
 ] 

Kvicii.Yu commented on KAFKA-13581:
-----------------------------------

[~chenzy] 
thanks for your reporting.
In fact, I agree with this suggestion, but what are the benefits of doing this? 
please let me know


> Error getting old protocol
> --------------------------
>
>                 Key: KAFKA-13581
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13581
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 2.5.0
>            Reporter: chenzhongyu
>            Priority: Minor
>             Fix For: 2.5.0
>
>
> In this case,oldProtocols will always be the protocols,because 
> knownStaticMember is updated before.So, I think oldProtocol should be 
> assigned before updateMember.
> {code:java}
> private def updateStaticMemberAndRebalance(group: GroupMetadata,
>                                            newMemberId: String,
>                                            groupInstanceId: Option[String],
>                                            protocols: List[(String, 
> Array[Byte])],
>                                            responseCallback: JoinCallback): 
> Unit = {
>   val oldMemberId = group.getStaticMemberId(groupInstanceId)
>   info(s"Static member $groupInstanceId of group ${group.groupId} with 
> unknown member id rejoins, assigning new member id $newMemberId, while " +
>     s"old member id $oldMemberId will be removed.")
>   val currentLeader = group.leaderOrNull
>   val member = group.replaceGroupInstance(oldMemberId, newMemberId, 
> groupInstanceId)
>   // Heartbeat of old member id will expire without effect since the group no 
> longer contains that member id.
>   // New heartbeat shall be scheduled with new member id.
>   completeAndScheduleNextHeartbeatExpiration(group, member)
>   val knownStaticMember = group.get(newMemberId)
>   group.updateMember(knownStaticMember, protocols, responseCallback)
>   val oldProtocols = knownStaticMember.supportedProtocols
>   group.currentState match {
>     case Stable =>
>       // check if group's selectedProtocol of next generation will change, if 
> not, simply store group to persist the
>       // updated static member, if yes, rebalance should be triggered to let 
> the group's assignment and selectProtocol consistent
>       val selectedProtocolOfNextGeneration = group.selectProtocol
>       if (group.protocolName.contains(selectedProtocolOfNextGeneration)) {
>         info(s"Static member which joins during Stable stage and doesn't 
> affect selectProtocol will not trigger rebalance.")
>         val groupAssignment: Map[String, Array[Byte]] = 
> group.allMemberMetadata.map(member => member.memberId -> 
> member.assignment).toMap
>         groupManager.storeGroup(group, groupAssignment, error => {
>           if (error != Errors.NONE) {
>             warn(s"Failed to persist metadata for group ${group.groupId}: 
> ${error.message}")
>             // Failed to persist member.id of the given static member, revert 
> the update of the static member in the group.
>             group.updateMember(knownStaticMember, oldProtocols, null)
>             val oldMember = group.replaceGroupInstance(newMemberId, 
> oldMemberId, groupInstanceId)
>             completeAndScheduleNextHeartbeatExpiration(group, oldMember)
>             responseCallback(JoinGroupResult(
>               List.empty,
>               memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
>               generationId = group.generationId,
>               protocolType = group.protocolType,
>               protocolName = group.protocolName,
>               leaderId = currentLeader,
>               error = error
>             ))
>           } else {
>             group.maybeInvokeJoinCallback(member, JoinGroupResult(
>               members = List.empty,
>               memberId = newMemberId,
>               generationId = group.generationId,
>               protocolType = group.protocolType,
>               protocolName = group.protocolName,
>               // We want to avoid current leader performing trivial 
> assignment while the group
>               // is in stable stage, because the new assignment in leader's 
> next sync call
>               // won't be broadcast by a stable group. This could be 
> guaranteed by
>               // always returning the old leader id so that the current 
> leader won't assume itself
>               // as a leader based on the returned message, since the new 
> member.id won't match
>               // returned leader id, therefore no assignment will be 
> performed.
>               leaderId = currentLeader,
>               error = Errors.NONE))
>           }
>         })
>       } else {
>         maybePrepareRebalance(group, s"Group's selectedProtocol will change 
> because static member ${member.memberId} with instance id $groupInstanceId 
> joined with change of protocol")
>       }
>     case CompletingRebalance =>
>       // if the group is in after-sync stage, upon getting a new join-group 
> of a known static member
>       // we should still trigger a new rebalance, since the old member may 
> already be sent to the leader
>       // for assignment, and hence when the assignment gets back there would 
> be a mismatch of the old member id
>       // with the new replaced member id. As a result the new member id would 
> not get any assignment.
>       prepareRebalance(group, s"Updating metadata for static member 
> ${member.memberId} with instance id $groupInstanceId")
>     case Empty | Dead =>
>       throw new IllegalStateException(s"Group ${group.groupId} was not 
> supposed to be " +
>         s"in the state ${group.currentState} when the unknown static member 
> $groupInstanceId rejoins.")
>     case PreparingRebalance =>
>   }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to