ableegoldman commented on code in PR #17614: URL: https://github.com/apache/kafka/pull/17614#discussion_r1835276259
########## clients/src/main/java/org/apache/kafka/clients/consumer/GroupMembershipOperation.java: ########## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +/** + * Enum to specify the group membership operation upon leaving group. + * {@code LEAVE_GROUP} means the consumer will leave the group. + * {@code REMAIN_IN_GROUP} means the consumer will remain in the group. + * {@code DEFAULT} applies the default behavior, which may depend on whether the consumer is static or dynamic. Review Comment: Should we clarify what the actual behavior is for static vs dynamic? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ########## @@ -1151,24 +1156,30 @@ protected void handlePollTimeoutExpiry() { "either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + "returned in poll() with max.poll.records."); - maybeLeaveGroup("consumer poll timeout has expired."); + maybeLeaveGroup(GroupMembershipOperation.DEFAULT, "consumer poll timeout has expired."); } /** - * Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this member is using static membership or is already - * not part of the group (ie does not have a valid member id, is in the UNJOINED state, or the coordinator is unknown). + * Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this member is using static membership + * with the default consumer group membership operation, or is already not part of the group (i.e., does not have a + * valid member ID, is in the UNJOINED state, or the coordinator is unknown). * + * @param membershipOperation the operation on consumer group membership that the consumer will perform when closing * @param leaveReason the reason to leave the group for logging * @throws KafkaException if the rebalance callback throws exception */ - public synchronized RequestFuture<Void> maybeLeaveGroup(String leaveReason) { + public synchronized RequestFuture<Void> maybeLeaveGroup(GroupMembershipOperation membershipOperation, String leaveReason) { RequestFuture<Void> future = null; - // Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker, - // consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup, - // and the membership expiration is only controlled by session timeout. - if (isDynamicMember() && !coordinatorUnknown() && - state != MemberState.UNJOINED && generation.hasMemberId()) { + // According to KIP-1092, static members can leave the group if the consumer group membership operation is LEAVE_GROUP. + // If the operation is REMAIN_IN_GROUP, this method "maybeLeaveGroup" will not be invoked. + if (GroupMembershipOperation.LEAVE_GROUP.equals(membershipOperation) || Review Comment: This condition is getting pretty large, might make the code easier to read if we extract this to a m`#shouldLeaveGroup` method ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ########## @@ -1114,22 +1115,26 @@ private boolean isProtocolTypeInconsistent(String protocolType) { */ @Override public final void close() { - close(time.timer(0)); + close(time.timer(0), GroupMembershipOperation.DEFAULT); } /** * @throws KafkaException if the rebalance callback throws exception */ - protected void close(Timer timer) { + protected void close(Timer timer, GroupMembershipOperation membershipOperation) { try { closeHeartbeatThread(); } finally { // Synchronize after closing the heartbeat thread since heartbeat thread // needs this lock to complete and terminate after close flag is set. synchronized (this) { - if (rebalanceConfig.leaveGroupOnClose) { + // If membershipOperation is REMAIN_IN_GROUP, never send leave group request. + // If membershipOperation is DEFAULT, leave group based on rebalanceConfig.leaveGroupOnClose. + // Otherwise, leave group only if membershipOperation is LEAVE_GROUP. + if (GroupMembershipOperation.REMAIN_IN_GROUP != membershipOperation && + (GroupMembershipOperation.LEAVE_GROUP == membershipOperation || rebalanceConfig.leaveGroupOnClose)) { Review Comment: the "leaveGroupOnClose" rebalance config is an internal backdoor that we put in for Streams to get around the lack of this API -- so once we update the corresponding KafkaStreams# close method we should remember to remove this config and clean up the code. The secret "internal" config was always a bit hacky so it's nice to finally get rid of it ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ########## @@ -1114,22 +1115,26 @@ private boolean isProtocolTypeInconsistent(String protocolType) { */ @Override public final void close() { - close(time.timer(0)); + close(time.timer(0), GroupMembershipOperation.DEFAULT); } /** * @throws KafkaException if the rebalance callback throws exception */ - protected void close(Timer timer) { + protected void close(Timer timer, GroupMembershipOperation membershipOperation) { try { closeHeartbeatThread(); } finally { // Synchronize after closing the heartbeat thread since heartbeat thread // needs this lock to complete and terminate after close flag is set. synchronized (this) { - if (rebalanceConfig.leaveGroupOnClose) { + // If membershipOperation is REMAIN_IN_GROUP, never send leave group request. + // If membershipOperation is DEFAULT, leave group based on rebalanceConfig.leaveGroupOnClose. + // Otherwise, leave group only if membershipOperation is LEAVE_GROUP. + if (GroupMembershipOperation.REMAIN_IN_GROUP != membershipOperation && + (GroupMembershipOperation.LEAVE_GROUP == membershipOperation || rebalanceConfig.leaveGroupOnClose)) { Review Comment: Also, it's a bit weird to be applying this check in two places. I think we should remove this check entirely and only have the one in #maybeLeaveGroup (Frankly it was kind of weird to put the `leaveGroupOnClose` check here to begin with, imo it should have been part of the #maybeLeaveGroup check to begin with. So I'd just remove the entire `if` condition and move the `leaveGroupOnClose` config check to the `#maybeLeaveGroup` check until we can remove it entirely ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CloseOptionInternal.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.GroupMembershipOperation; + +import java.time.Duration; +import java.util.Optional; + +/** + * This class represents an internal version of {@link Consumer.CloseOption}, used for internal processing of consumer shutdown options. + * + * <p>While {@link Consumer.CloseOption} is the user-facing class, {@code CloseOptionInternal} is intended for accessing internal fields + * and performing operations within the Kafka codebase. This class should not be used directly by users. + * It extends {@link Consumer.CloseOption} and provides getters for the {@link GroupMembershipOperation} and timeout values.</p> + */ +public class CloseOptionInternal extends Consumer.CloseOption { Review Comment: I know this is what we ended up with in the KIP but honestly I don't see a problem with exposing these getter APIs in the public CloseOptions class. The CloseOptions are created by the user themselves so is it a problem if they can read them back out...? Just wondering if the extra complexity is worth it. (Sorry for not catching this during the KIP discussion) ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java: ########## @@ -265,7 +266,7 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121 // we do not need to call `close` at all when `log` is null, which means no internal objects were initialized. if (this.log != null) { - close(Duration.ZERO, true); + close(Duration.ZERO, GroupMembershipOperation.DEFAULT, true); Review Comment: It looks like we use `LEAVE_GROUP` in this case for the AsyncKafkaConsumer, why the difference? Personally I think either option is probably fine since presumably the consumer never got to join the group in the first place if something went wrong in the constructor. I guess using `DEFAULT` is consistent with the current behavior, but I'm wondering if `LEAVE_GROUP` is actually the right choice so we skip trying to send the leave group request to the coordinator? 🤷♀️ (Whichever way we go, we should just make sure to do the same thing for each consumer implementation) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
