lianetm commented on code in PR #17549:
URL: https://github.com/apache/kafka/pull/17549#discussion_r1813043564


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -1234,8 +1245,8 @@ protected void resetEpoch() {
     protected void updateMemberEpoch(int newEpoch) {
         boolean newEpochReceived = this.memberEpoch != newEpoch;
         this.memberEpoch = newEpoch;
-        // Simply notify based on epoch change only, given that the member 
will never receive a
-        // new member ID without an epoch (member ID is only assigned when it 
joins the group).
+        // Simply notify based on epoch changes only, since the member will 
generate a member ID
+        // at startup, and it will remain unchanged for its entire lifetime.
         if (newEpochReceived) {

Review Comment:
   I think we should update it to align, and it's actually a simplification 
here and in the `CommitRequestManager` now that the memberId is always known. 
   
   The `CommitRequestManager` keeps a reference to the `memberId` and 
`memberEpoch`, and both were optional given that we could land here without 
having received the memberId from the broker. Every time the epoch changed, we 
provided both (to include them in the requests). Now we can probably simply 
have a memberId in the commitMgr as a String, not optional, here:
   
https://github.com/apache/kafka/blob/2d896d9130f121e75ccba2d913bdffa358cf3867/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L1296
   and then in this membershipMgr.notifyEpochChange we always provide the known 
memberId and the optional epoch. 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1317,6 +1319,9 @@ private void 
throwIfConsumerGroupHeartbeatRequestIsInvalid(
             if (request.subscribedTopicNames() == null || 
request.subscribedTopicNames().isEmpty()) {
                 throw new InvalidRequestException("SubscribedTopicNames must 
be set in first request.");
             }
+            if (apiVersion >= CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION) {
+                throwIfEmptyString(request.memberId(), "MemberId can't be 
empty.");

Review Comment:
   don't we need to `throwIfNull` also? if memberId is not included in the 
request, the `throwIfEmptyString` will not throw. 
   
   Also, with this addition, we end up validating the member ID in 3 places 
here (depending on the epoch), but couldn't we simplify and validate once at 
the beginning of this func? (checking the api version). In the end, with this 
KIP, if version>1 the member ID must be included in all requests, no matter the 
epoch.  



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1351,6 +1355,7 @@ private void throwIfShareGroupHeartbeatRequestIsInvalid(
             if (request.subscribedTopicNames() == null || 
request.subscribedTopicNames().isEmpty()) {
                 throw new InvalidRequestException("SubscribedTopicNames must 
be set in first request.");
             }
+            throwIfEmptyString(request.memberId(), "MemberId can't be empty.");

Review Comment:
   makes sense to me, and it aligns with my comment above for the consumer. 
Seems sensible to check at the beginning since we require member ID for all 
requests, regardless the epoch. And for the share case we shoudn't need the 
version check given that the RPC is still at v0 unstable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to