lianetm commented on code in PR #17549:
URL: https://github.com/apache/kafka/pull/17549#discussion_r1813043564
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -1234,8 +1245,8 @@ protected void resetEpoch() {
protected void updateMemberEpoch(int newEpoch) {
boolean newEpochReceived = this.memberEpoch != newEpoch;
this.memberEpoch = newEpoch;
- // Simply notify based on epoch change only, given that the member
will never receive a
- // new member ID without an epoch (member ID is only assigned when it
joins the group).
+ // Simply notify based on epoch changes only, since the member will
generate a member ID
+ // at startup, and it will remain unchanged for its entire lifetime.
if (newEpochReceived) {
Review Comment:
I think we should update it to align, and it's actually a simplification
here and in the `CommitRequestManager` now that the memberId is always known.
The `CommitRequestManager` keeps a reference to the `memberId` and
`memberEpoch`, and both were optional given that we could land here without
having received the memberId from the broker. Every time the epoch changed, we
provided both (to include them in the requests). Now we can probably simply
have a memberId in the commitMgr as a String, not optional, here:
https://github.com/apache/kafka/blob/2d896d9130f121e75ccba2d913bdffa358cf3867/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L1296
and then in this membershipMgr.notifyEpochChange we always provide the known
memberId and the optional epoch.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1317,6 +1319,9 @@ private void
throwIfConsumerGroupHeartbeatRequestIsInvalid(
if (request.subscribedTopicNames() == null ||
request.subscribedTopicNames().isEmpty()) {
throw new InvalidRequestException("SubscribedTopicNames must
be set in first request.");
}
+ if (apiVersion >= CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION) {
+ throwIfEmptyString(request.memberId(), "MemberId can't be
empty.");
Review Comment:
don't we need to `throwIfNull` also? if memberId is not included in the
request, the `throwIfEmptyString` will not throw.
Also, with this addition, we end up validating the member ID in 3 places
here (depending on the epoch), but couldn't we simplify and validate once at
the beginning of this func? (checking the api version). In the end, with this
KIP, if version>1 the member ID must be included in all requests, no matter the
epoch.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1351,6 +1355,7 @@ private void throwIfShareGroupHeartbeatRequestIsInvalid(
if (request.subscribedTopicNames() == null ||
request.subscribedTopicNames().isEmpty()) {
throw new InvalidRequestException("SubscribedTopicNames must
be set in first request.");
}
+ throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
Review Comment:
makes sense to me, and it aligns with my comment above for the consumer.
Seems sensible to check at the beginning since we require member ID for all
requests, regardless the epoch. And for the share case we shoudn't need the
version check given that the RPC is still at v0 unstable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]