kirktrue commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1338844884


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -593,6 +609,9 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
             if (request.subscribedTopicNames() == null || 
request.subscribedTopicNames().isEmpty()) {
                 throw new InvalidRequestException("SubscribedTopicNames must 
be set in first request.");
             }
+        } else if (request.memberEpoch() == -2) {
+            throwIfEmptyString(request.memberId(), "MemberId can't be empty.");

Review Comment:
   This is for the case where the static member is leaving temporarily, right?
   
   Would it be possible to add more detail to these error messages to aid in 
troubleshooting/debugging on the client when this condition is hit?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -918,23 +937,32 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
      * Handles leave request from a consumer group member.
      * @param groupId       The group id from the request.
      * @param memberId      The member id from the request.
+     * @param memberEpoch   The member epoch from the request.
      *
      * @return A Result containing the ConsumerGroupHeartbeat response and
      *         a list of records to update the state machine.
      */
     private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
consumerGroupLeave(
         String groupId,
-        String memberId
+        String memberId,
+        int memberEpoch
     ) throws ApiException {
         ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
         ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
 
-        log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
-
-        List<Record> records = consumerGroupFenceMember(group, member);
+        List<Record> records = new ArrayList<>();
+        // The departing member is a static one. We don't need to fence this 
member because it is
+        // expected to come back within session timeout
+        if (memberEpoch == -2) {
+            log.info("Member {} with instance id {} is a static member and 
will not be fenced from the group", memberId, member.instanceId());

Review Comment:
   Nice! Can we add the Group ID to the 'static member' log message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to