frankvicky commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r1845010100
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -1114,22 +1115,26 @@ private boolean isProtocolTypeInconsistent(String
protocolType) {
*/
@Override
public final void close() {
- close(time.timer(0));
+ close(time.timer(0), GroupMembershipOperation.DEFAULT);
}
/**
* @throws KafkaException if the rebalance callback throws exception
*/
- protected void close(Timer timer) {
+ protected void close(Timer timer, GroupMembershipOperation
membershipOperation) {
try {
closeHeartbeatThread();
} finally {
// Synchronize after closing the heartbeat thread since heartbeat
thread
// needs this lock to complete and terminate after close flag is
set.
synchronized (this) {
- if (rebalanceConfig.leaveGroupOnClose) {
+ // If membershipOperation is REMAIN_IN_GROUP, never send leave
group request.
+ // If membershipOperation is DEFAULT, leave group based on
rebalanceConfig.leaveGroupOnClose.
+ // Otherwise, leave group only if membershipOperation is
LEAVE_GROUP.
+ if (GroupMembershipOperation.REMAIN_IN_GROUP !=
membershipOperation &&
+ (GroupMembershipOperation.LEAVE_GROUP ==
membershipOperation || rebalanceConfig.leaveGroupOnClose)) {
Review Comment:
Am I correct in thinking that if we decide to move the `if` condition into
`maybeLeaveGroup`,
we should also move `onLeavePrepare` into `maybeLeaveGroup`?
If that’s the case, it might be tricky.
Currently, I assume that `onLeavePrepare` and `maybeLeaveGroup` should be
invoked consecutively,
since they share the same `if` condition. However, at the moment, there are
many places
in both test code and production code that only invoke `maybeLeaveGroup`.
If we proceed with this refactor, we will need to ensure that we do not
break the current behavior.
Here’s a rough idea:
```
public synchronized RequestFuture<Void>
maybeLeaveGroup(GroupMembershipOperation membershipOperation, String
leaveReason) {
// If membershipOperation is REMAIN_IN_GROUP, never send leave group
request.
if (GroupMembershipOperation.REMAIN_IN_GROUP == membershipOperation)
{
return null;
}
// If membershipOperation is DEFAULT, leave group based on
rebalanceConfig.leaveGroupOnClose.
// Otherwise, leave group only if membershipOperation is LEAVE_GROUP.
if (GroupMembershipOperation.LEAVE_GROUP != membershipOperation &&
!rebalanceConfig.leaveGroupOnClose) {
return null;
}
onLeavePrepare();
return leaveGroup(membershipOperation, leaveReason);
}
public RequestFuture<Void> leaveGroup(GroupMembershipOperation
membershipOperation, String leaveReason) {
RequestFuture<Void> future = null;
if (shouldSendLeaveGroupRequest(membershipOperation)) {
log.info("Member {} sending LeaveGroup request to coordinator {}
due to {}",
generation.memberId, coordinator, leaveReason);
LeaveGroupRequest.Builder request = new
LeaveGroupRequest.Builder(
rebalanceConfig.groupId,
List.of(new
MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason)))
);
future = client.send(coordinator, request).compose(new
LeaveGroupResponseHandler(generation));
client.pollNoWakeup();
}
resetGenerationOnLeaveGroup();
return future;
}
```
We can move the `if` condition and `onLeavePrepare` into `maybeLeaveGroup`,
while extracting
the original logic of `maybeLeaveGroup` into a new public helper method.
Code that previously invoked `maybeLeaveGroup` directly will instead call
the new helper method.
WDYT ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]