frankvicky commented on code in PR #17549:
URL: https://github.com/apache/kafka/pull/17549#discussion_r1820686572
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -944,23 +941,20 @@ public boolean sameRequest(final OffsetFetchRequestState
request) {
public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
OffsetFetchRequest.Builder builder;
- if (memberInfo.memberId.isPresent() &&
memberInfo.memberEpoch.isPresent()) {
- builder = new OffsetFetchRequest.Builder(
- groupId,
- memberInfo.memberId.get(),
- memberInfo.memberEpoch.get(),
- true,
- new ArrayList<>(this.requestedPartitions),
- throwOnFetchStableOffsetUnsupported);
- } else {
- // Building request without passing member ID/epoch to leave
the logic to choose
- // default values when not present on the request builder.
- builder = new OffsetFetchRequest.Builder(
- groupId,
- true,
- new ArrayList<>(this.requestedPartitions),
- throwOnFetchStableOffsetUnsupported);
- }
+ // Building request without passing member ID/epoch to leave the
logic to choose
+ // default values when not present on the request builder.
+ builder = memberInfo.memberEpoch.map(epoch -> new
OffsetFetchRequest.Builder(
+ groupId,
+ memberInfo.memberId,
+ epoch,
+ true,
+ new ArrayList<>(this.requestedPartitions),
+ throwOnFetchStableOffsetUnsupported))
+ .orElseGet(() -> new OffsetFetchRequest.Builder(
+ groupId,
+ true,
+ new ArrayList<>(this.requestedPartitions),
+ throwOnFetchStableOffsetUnsupported));
Review Comment:
I'm not sure about this issue is related to following code or not. [0]
If so, I think yes,we could call the method if `memberEpoch` is present.
As my previous comment[1],the `groupMetadata` should represent the broker
acknowledgements since we're moving coordinator logic to broker side.
If I don't misunderstand, I will update it in next commit.
[0]
https://github.com/apache/kafka/blob/8c071b02e9908d9facf10c0a18e7e0f9d1b0825f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L636-L647
[1]
>Hi @lianetm,
>
>Here is my understanding. Please correct me if I’m wrong:
>Since the group management has now moved to the server-side, the
ConsumerGroupMetadata basically represents the data acknowledged by the broker.
In this case, I think we should keep the ConsumerGroupMetadata.memberId empty
at startup. This is because, at that moment, the broker still has no knowledge
of the memberId. Once the consumer polls, the broker will then recognize the
memberId and update >the assignment, allowing the consumer to update the
ConsumerGroupMetadata via MemberStateListener. After >that,
ConsumerGroupMetadata will have the memberId.
>
> Does this make sense ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]