kirktrue commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r1884695246
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:
##########
@@ -221,7 +222,17 @@ public NetworkClientDelegate.PollResult poll(long
currentTimeMs) {
*/
@Override
public PollResult pollOnClose(long currentTimeMs) {
- if (membershipManager().isLeavingGroup()) {
+ AbstractMembershipManager<R> membershipManager = membershipManager();
+ GroupMembershipOperation leaveGroupOperation =
membershipManager.leaveGroupOperation();
+
+ if (membershipManager.isLeavingGroup() &&
+ // Default operation: both static and dynamic consumers will send
a leave heartbeat
+ (GroupMembershipOperation.DEFAULT == leaveGroupOperation ||
+ // Leave operation: both static and dynamic consumers will
send a leave heartbeat
+ GroupMembershipOperation.LEAVE_GROUP == leaveGroupOperation ||
+ // Remain in group: only static consumers will send a leave
heartbeat, while dynamic members will not
+ membershipManager.groupInstanceId().isPresent())
+ ) {
Review Comment:
Could we let this logic live inside the `MembershipManager.isLeavingGroup()`
implementation rather than exposing it here? The `MembershipManager` already
manages the group instance ID and `GroupMembershipOperation` values internally.
Then we wouldn't have to add the other `AbstractMembershipManager` changes,
right?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -79,6 +80,11 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
*/
protected final String groupId;
+ /**
+ * Group instance ID to be used by the member, provided when creating the
current membership manager.
+ */
+ protected final Optional<String> groupInstanceId;
Review Comment:
cc @AndrewJSchofield
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CloseOptionInternal.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.GroupMembershipOperation;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * This class represents an internal version of {@link Consumer.CloseOption},
used for internal processing of consumer shutdown options.
+ *
+ * <p>While {@link Consumer.CloseOption} is the user-facing class, {@code
CloseOptionInternal} is intended for accessing internal fields
+ * and performing operations within the Kafka codebase. This class should not
be used directly by users.
+ * It extends {@link Consumer.CloseOption} and provides getters for the {@link
GroupMembershipOperation} and timeout values.</p>
+ */
+public class CloseOptionInternal extends Consumer.CloseOption {
Review Comment:
I agree @ableegoldman. It's not clear to be how we benefit from this split
class approach. 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]