zou shengfu created KAFKA-14832:
-----------------------------------
Summary: Thread unsafe for GroupMetadata
Key: KAFKA-14832
URL: https://issues.apache.org/jira/browse/KAFKA-14832
Project: Kafka
Issue Type: Bug
Components: core
Affects Versions: 3.3.2
Reporter: zou shengfu
Assignee: zou shengfu
groupManager.storeGroup(group, groupAssignment, error => {
if (error != Errors.NONE) {
warn(s"Failed to persist metadata for group ${group.groupId}:
${error.message}")
// Failed to persist member.id of the given static member, revert
the update of the static member in the group.
group.updateMember(knownStaticMember, oldProtocols,
oldRebalanceTimeoutMs, oldSessionTimeoutMs, null)
val oldMember = group.replaceStaticMember(groupInstanceId,
newMemberId, oldMemberId)
completeAndScheduleNextHeartbeatExpiration(group, oldMember)
responseCallback(JoinGroupResult(
List.empty,
memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = currentLeader,
skipAssignment = false,
error = error
))
} else if (supportSkippingAssignment) {
// Starting from version 9 of the JoinGroup API, static members
are able to
// skip running the assignor based on the `SkipAssignment` field.
We leverage
// this to tell the leader that it is the leader of the group but
by skipping
// running the assignor while the group is in stable state.
// Notes:
// 1) This allows the leader to continue monitoring metadata
changes for the
// group. Note that any metadata changes happening while the
static leader is
// down won't be noticed.
// 2) The assignors are not idempotent nor free from side
effects. This is why
// we skip entirely the assignment step as it could generate a
different group
// assignment which would be ignored by the group coordinator
because the group
// is the stable state.
val isLeader = group.isLeader(newMemberId)
group.maybeInvokeJoinCallback(member, JoinGroupResult(
members = if (isLeader) {
group.currentMemberMetadata
} else {
List.empty
},
memberId = newMemberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull,
skipAssignment = isLeader,
error = Errors.NONE
))
} else {
// Prior to version 9 of the JoinGroup API, we wanted to avoid
current leader
// performing trivial assignment while the group is in stable
stage, because
// the new assignment in leader's next sync call won't be
broadcast by a stable group.
// This could be guaranteed by always returning the old leader id
so that the current
// leader won't assume itself as a leader based on the returned
message, since the new
// member.id won't match returned leader id, therefore no
assignment will be performed.
group.maybeInvokeJoinCallback(member, JoinGroupResult(
members = List.empty,
memberId = newMemberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = currentLeader,
skipAssignment = false,
error = Errors.NONE
))
}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)