guozhangwang commented on code in PR #13380:
URL: https://github.com/apache/kafka/pull/13380#discussion_r1134383599
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -207,6 +266,85 @@ public NetworkClientDelegate.UnsentRequest
toUnsentRequest() {
}
}
+ static class UnsentOffsetFetchRequestState extends RequestState {
+ public final Set<TopicPartition> requestedPartitions;
+ public final GroupState.Generation requestedGeneration;
+ public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
future;
+
+ public UnsentOffsetFetchRequestState(final Set<TopicPartition>
partitions,
+ final GroupState.Generation
generation,
+ final
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future,
+ final long retryBackoffMs) {
+ super(retryBackoffMs);
+ this.requestedPartitions = partitions;
+ this.requestedGeneration = generation;
+ this.future = future;
+ }
+
+ public boolean sameRequest(final UnsentOffsetFetchRequestState
request) {
+ return Objects.equals(requestedGeneration,
request.requestedGeneration) &&
requestedPartitions.equals(request.requestedPartitions);
+ }
+ }
+
+ /**
+ * <p>This is used to support the committed() API. Here we use a Java
Collections, {@code unsentRequests}, to
+ * track
+ * the OffsetFetchRequests that haven't been sent, to prevent sending the
same requests in the same batch.
+ *
+ * <p>If the request is new. It will be enqueued to the {@code
unsentRequest}, and will be sent upon the next
+ * poll.
+ *
+ * <p>If the same request has been sent, the request's {@code
CompletableFuture} will be completed upon the
+ * completion of the existing one.
+ *
+ * TODO: There is an optimization to present duplication to the sent but
incompleted requests. I'm not sure if we
+ * need that.
+ */
+ class UnsentOffsetFetchRequests {
+ private List<UnsentOffsetFetchRequestState> unsentRequests = new
ArrayList<>();
+
+ public boolean canSendOffsetFetch() {
Review Comment:
I think the func name could be misleading in the long run. Let's just use
more specific name like `hasUnsentRequests`.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -140,6 +158,24 @@ CompletableFuture<ClientResponse> sendAutoCommit(final
Map<TopicPartition, Offse
return future;
}
+ /**
+ * Handles {@link
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent}.
It creates an
+ * {@link UnsentOffsetFetchRequestState} and enqueue it to send later.
+ */
+ public void addCommittedRequest(final Set<TopicPartition> partitions,
Review Comment:
nit: let's rename the other `add` func to `addCommitOffsetRequest` to be
more distinguishable with this one?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -207,6 +266,85 @@ public NetworkClientDelegate.UnsentRequest
toUnsentRequest() {
}
}
+ static class UnsentOffsetFetchRequestState extends RequestState {
+ public final Set<TopicPartition> requestedPartitions;
+ public final GroupState.Generation requestedGeneration;
+ public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
future;
+
+ public UnsentOffsetFetchRequestState(final Set<TopicPartition>
partitions,
+ final GroupState.Generation
generation,
+ final
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future,
+ final long retryBackoffMs) {
+ super(retryBackoffMs);
+ this.requestedPartitions = partitions;
+ this.requestedGeneration = generation;
+ this.future = future;
+ }
+
+ public boolean sameRequest(final UnsentOffsetFetchRequestState
request) {
+ return Objects.equals(requestedGeneration,
request.requestedGeneration) &&
requestedPartitions.equals(request.requestedPartitions);
+ }
+ }
+
+ /**
+ * <p>This is used to support the committed() API. Here we use a Java
Collections, {@code unsentRequests}, to
+ * track
+ * the OffsetFetchRequests that haven't been sent, to prevent sending the
same requests in the same batch.
+ *
+ * <p>If the request is new. It will be enqueued to the {@code
unsentRequest}, and will be sent upon the next
+ * poll.
+ *
+ * <p>If the same request has been sent, the request's {@code
CompletableFuture} will be completed upon the
+ * completion of the existing one.
+ *
+ * TODO: There is an optimization to present duplication to the sent but
incompleted requests. I'm not sure if we
+ * need that.
+ */
+ class UnsentOffsetFetchRequests {
Review Comment:
The original flag is used not only to cover the period when there's already
a request not yet sent, but also during the time when the request is sent but
the response is not yet received. It seems that we would not cover the second
period i.e. if a request is sent and response not yet received, we may send the
same request again.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -149,6 +185,29 @@ Queue<StagedCommit> stagedCommits() {
return this.stagedCommits;
}
+ /**
+ * Get all the sendable requests, and create a list of UnsentRequest.
+ */
+ private List<NetworkClientDelegate.UnsentRequest>
sendOffsetFetchRequests(final long currentTimeMs) {
Review Comment:
Again the func name is a bit misleading. What about
`constructOffsetFetchUnsentRequests`.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -140,6 +158,24 @@ CompletableFuture<ClientResponse> sendAutoCommit(final
Map<TopicPartition, Offse
return future;
}
+ /**
+ * Handles {@link
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent}.
It creates an
+ * {@link UnsentOffsetFetchRequestState} and enqueue it to send later.
+ */
+ public void addCommittedRequest(final Set<TopicPartition> partitions,
+ final
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
+ if (partitions.isEmpty()) {
+ future.complete(new HashMap<>());
+ return;
+ }
+
+ unsentOffsetFetchRequests.enqueue(new UnsentOffsetFetchRequestState(
Review Comment:
This one reminds me: if we can directly construct the `RequestState` here,
we we cannot do the same for commit request and instead have to add to a
`stagedCommit` first?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -149,6 +185,29 @@ Queue<StagedCommit> stagedCommits() {
return this.stagedCommits;
}
+ /**
+ * Get all the sendable requests, and create a list of UnsentRequest.
+ */
+ private List<NetworkClientDelegate.UnsentRequest>
sendOffsetFetchRequests(final long currentTimeMs) {
+ List<UnsentOffsetFetchRequestState> requests =
unsentOffsetFetchRequests.sendableRequests(currentTimeMs);
+ List<NetworkClientDelegate.UnsentRequest> pollResults = new
ArrayList<>();
+ requests.forEach(req -> {
+ OffsetFetchRequest.Builder builder = new
OffsetFetchRequest.Builder(
+ groupState.groupId, true,
+ new ArrayList<>(req.requestedPartitions),
+ throwOnFetchStableOffsetUnsupported);
+ NetworkClientDelegate.UnsentRequest unsentRequest = new
NetworkClientDelegate.UnsentRequest(
+ builder,
+ coordinatorRequestManager.coordinator());
+ FetchCommittedOffsetResponseHandler cb = new
FetchCommittedOffsetResponseHandler(req);
+ unsentRequest.future().whenComplete((r, t) -> {
Review Comment:
Why we use a `FetchCommittedOffsetResponseHandler` for committed API, while
using a general `FutureCompletionHandler` for commit API? Is this inconsistency
intentional?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -306,13 +310,28 @@ public OffsetAndMetadata committed(TopicPartition
partition, Duration timeout) {
}
@Override
- public Map<TopicPartition, OffsetAndMetadata>
committed(Set<TopicPartition> partitions) {
- throw new KafkaException("method not implemented");
+ public Map<TopicPartition, OffsetAndMetadata> committed(final
Set<TopicPartition> partitions) {
+ return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs));
}
@Override
- public Map<TopicPartition, OffsetAndMetadata>
committed(Set<TopicPartition> partitions, Duration timeout) {
- throw new KafkaException("method not implemented");
+ public Map<TopicPartition, OffsetAndMetadata> committed(final
Set<TopicPartition> partitions,
+ final Duration
timeout) {
+ maybeThrowInvalidGroupIdException();
+ final OffsetFetchApplicationEvent event = new
OffsetFetchApplicationEvent(partitions);
+ eventHandler.add(event);
+ try {
+ return event.complete(Duration.ofMillis(100));
+ } catch (ExecutionException | InterruptedException | TimeoutException
e) {
+ throw new KafkaException(e);
Review Comment:
I think it's not... why not read through the current code and make the
throwing behavior more consistent?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -80,16 +97,18 @@ public CommitRequestManager(
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
- maybeAutoCommit(currentTimeMs);
+ maybeAutoCommit();
+ List<NetworkClientDelegate.UnsentRequest> unsentRequests = new
ArrayList<>();
- if (stagedCommits.isEmpty()) {
- return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new
ArrayList<>());
+ if (!stagedCommits.isEmpty()) {
Review Comment:
I think with the new `unsentOffsetFetchRequests` the `stagedCommits` can be
merged into it, since it seems unnecessary to keep two sets of unsent requests
just to pour one of its content into another periodically.
Also I feel that it's not necessarily to keep a second placeholder for
`UnsentRequest` in `UnsentOffsetFetchRequestState`.
More specifically, upon `manager.add` we can check whether there's a dup and
if not construct an `UnsentRequest` with the handler hooked right away and add
to the `unsentOffsetFetchRequests`.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -140,6 +158,24 @@ CompletableFuture<ClientResponse> sendAutoCommit(final
Map<TopicPartition, Offse
return future;
}
+ /**
+ * Handles {@link
org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent}.
It creates an
+ * {@link UnsentOffsetFetchRequestState} and enqueue it to send later.
+ */
+ public void addCommittedRequest(final Set<TopicPartition> partitions,
+ final
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
+ if (partitions.isEmpty()) {
+ future.complete(new HashMap<>());
+ return;
+ }
+
+ unsentOffsetFetchRequests.enqueue(new UnsentOffsetFetchRequestState(
Review Comment:
Okay this train of thought leads me to suggest merging the two as well as
merging the `UnsentOffsetFetchRequestState` with `UnsentRequest` as well (see
other detailed comments). LMK what do you think.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -80,16 +97,18 @@ public CommitRequestManager(
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
- maybeAutoCommit(currentTimeMs);
+ maybeAutoCommit();
+ List<NetworkClientDelegate.UnsentRequest> unsentRequests = new
ArrayList<>();
- if (stagedCommits.isEmpty()) {
- return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new
ArrayList<>());
+ if (!stagedCommits.isEmpty()) {
Review Comment:
Of course that means we need the `UnsentRequest` to also include a field of
`RequestState` to be able to check `canSendRequest` and mark attempt timestamp
etc.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -207,6 +266,85 @@ public NetworkClientDelegate.UnsentRequest
toUnsentRequest() {
}
}
+ static class UnsentOffsetFetchRequestState extends RequestState {
+ public final Set<TopicPartition> requestedPartitions;
+ public final GroupState.Generation requestedGeneration;
+ public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
future;
+
+ public UnsentOffsetFetchRequestState(final Set<TopicPartition>
partitions,
+ final GroupState.Generation
generation,
+ final
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future,
+ final long retryBackoffMs) {
+ super(retryBackoffMs);
+ this.requestedPartitions = partitions;
+ this.requestedGeneration = generation;
+ this.future = future;
+ }
+
+ public boolean sameRequest(final UnsentOffsetFetchRequestState
request) {
+ return Objects.equals(requestedGeneration,
request.requestedGeneration) &&
requestedPartitions.equals(request.requestedPartitions);
+ }
+ }
+
+ /**
+ * <p>This is used to support the committed() API. Here we use a Java
Collections, {@code unsentRequests}, to
+ * track
+ * the OffsetFetchRequests that haven't been sent, to prevent sending the
same requests in the same batch.
+ *
+ * <p>If the request is new. It will be enqueued to the {@code
unsentRequest}, and will be sent upon the next
+ * poll.
+ *
+ * <p>If the same request has been sent, the request's {@code
CompletableFuture} will be completed upon the
+ * completion of the existing one.
+ *
+ * TODO: There is an optimization to present duplication to the sent but
incompleted requests. I'm not sure if we
Review Comment:
Yeah I think we can defer that for future work.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -235,4 +373,105 @@ public void ack(final long currentTimeMs) {
this.timer.update(currentTimeMs);
}
}
+
+ private class FetchCommittedOffsetResponseHandler {
Review Comment:
Assuming this is just copy-paste here, without any handler logic changes, so
I skipped this class. @philipnee let me know if otherwise.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -149,6 +185,29 @@ Queue<StagedCommit> stagedCommits() {
return this.stagedCommits;
}
+ /**
+ * Get all the sendable requests, and create a list of UnsentRequest.
+ */
+ private List<NetworkClientDelegate.UnsentRequest>
sendOffsetFetchRequests(final long currentTimeMs) {
+ List<UnsentOffsetFetchRequestState> requests =
unsentOffsetFetchRequests.sendableRequests(currentTimeMs);
+ List<NetworkClientDelegate.UnsentRequest> pollResults = new
ArrayList<>();
+ requests.forEach(req -> {
+ OffsetFetchRequest.Builder builder = new
OffsetFetchRequest.Builder(
+ groupState.groupId, true,
+ new ArrayList<>(req.requestedPartitions),
+ throwOnFetchStableOffsetUnsupported);
+ NetworkClientDelegate.UnsentRequest unsentRequest = new
NetworkClientDelegate.UnsentRequest(
+ builder,
+ coordinatorRequestManager.coordinator());
+ FetchCommittedOffsetResponseHandler cb = new
FetchCommittedOffsetResponseHandler(req);
Review Comment:
I think it is actually fine :) But just to echo my other comments, I think
we can merge `stagedCommits` with this class as well as eliminating
`UnsentOffsetFetchRequestState`, in which case we will always construct the
`UnsentRequest` right away and add to the list, so that we do not need to do it
here.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -437,29 +456,24 @@ public void wakeup() {
*/
@Override
public void commitSync(final Duration timeout) {
- final CommitApplicationEvent commitEvent = new
CommitApplicationEvent(subscriptions.allConsumed());
- eventHandler.add(commitEvent);
-
- final CompletableFuture<Void> commitFuture = commitEvent.future();
- try {
- commitFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
- } catch (final TimeoutException e) {
- throw new org.apache.kafka.common.errors.TimeoutException(
- "timeout");
- } catch (final Exception e) {
- // handle exception here
- throw new RuntimeException(e);
- }
+ commitSync(subscriptions.allConsumed(), timeout);
}
@Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
- throw new KafkaException("method not implemented");
+ commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
}
@Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets,
Duration timeout) {
- throw new KafkaException("method not implemented");
+ CompletableFuture<Void> commitFuture = commit(offsets);
+ try {
+ commitFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (final TimeoutException e) {
Review Comment:
Ditto here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]