frankvicky commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r1844979923
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -1114,22 +1115,26 @@ private boolean isProtocolTypeInconsistent(String
protocolType) {
*/
@Override
public final void close() {
- close(time.timer(0));
+ close(time.timer(0), GroupMembershipOperation.DEFAULT);
}
/**
* @throws KafkaException if the rebalance callback throws exception
*/
- protected void close(Timer timer) {
+ protected void close(Timer timer, GroupMembershipOperation
membershipOperation) {
try {
closeHeartbeatThread();
} finally {
// Synchronize after closing the heartbeat thread since heartbeat
thread
// needs this lock to complete and terminate after close flag is
set.
synchronized (this) {
- if (rebalanceConfig.leaveGroupOnClose) {
+ // If membershipOperation is REMAIN_IN_GROUP, never send leave
group request.
+ // If membershipOperation is DEFAULT, leave group based on
rebalanceConfig.leaveGroupOnClose.
+ // Otherwise, leave group only if membershipOperation is
LEAVE_GROUP.
+ if (GroupMembershipOperation.REMAIN_IN_GROUP !=
membershipOperation &&
+ (GroupMembershipOperation.LEAVE_GROUP ==
membershipOperation || rebalanceConfig.leaveGroupOnClose)) {
Review Comment:
I'm considering whether we could move the `if` statement into
`maybeLeaveGroup`.
Currently, the `if` statement is wrapped within a `synchronized` block, and
the
`maybeLeaveGroup` method itself is marked as `synchronized`.
I have tested moving the `if` statement into `maybeLeaveGroup`, but this
change
causes the test case `testPrepareJoinAndRejoinAfterFailedRebalance` to fail.
https://github.com/apache/kafka/blob/a8f84cab958761434bcafca2c0fd90f53b52aacf/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1129-L1133
https://github.com/apache/kafka/blob/a8f84cab958761434bcafca2c0fd90f53b52aacf/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1164
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]