lianetm commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1949460839
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -846,9 +845,18 @@ private CompletableFuture<Map<TopicPartition,
OffsetAndMetadata>> commit(final C
}
applicationEventHandler.add(commitEvent);
+ awaitOffsetsReady(commitEvent.offsetsReady());
return commitEvent.future();
}
+ private void awaitOffsetsReady(CompletableFuture<Void> offsetsReady) {
Review Comment:
I would add a java doc here to show that this will block until the
background thread has retrieved the allConsumed positions to commit if no
offsets have been specified. This will ensure that the offsets to commit are
not affected by fetches that may start after this.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -846,9 +845,18 @@ private CompletableFuture<Map<TopicPartition,
OffsetAndMetadata>> commit(final C
}
applicationEventHandler.add(commitEvent);
+ awaitOffsetsReady(commitEvent.offsetsReady());
return commitEvent.future();
}
+ private void awaitOffsetsReady(CompletableFuture<Void> offsetsReady) {
+ try {
+ offsetsReady.get();
+ } catch (Throwable e) {
+ throw new KafkaException(e);
+ }
Review Comment:
should we reuse the existing `ConsumerUtils.getResult(future);` ? (and if we
do then maybe simpler to just call it directly and remove this func?)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -1384,7 +1386,7 @@ public void registerStateListener(MemberStateListener
listener) {
*/
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
if (state == MemberState.RECONCILING) {
Review Comment:
should we move this check inside the `maybeReconcile`, now that it's called
from other places too?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -206,8 +207,12 @@ public void process(ApplicationEvent event) {
}
private void process(final PollEvent event) {
+ // In order to ensure certain positions before reconciliation, we only
trigger full process of reconcile by PollEvent
+
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
+ consumerMembershipManager.maybeReconcile(true));
Review Comment:
this means that we will only trigger the commit before revocation from here
(before any fetch starts). So do we still need to use a snapshot to
`maybeAutoCommitSyncBeforeRevocation`? or could simplify and trust the
`subscriptions.allConsumed` as we used to?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -234,7 +239,10 @@ private void process(final AsyncCommitEvent event) {
try {
CommitRequestManager manager =
requestManagers.commitRequestManager.get();
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
manager.commitAsync(event.offsets());
+ Optional<Map<TopicPartition, OffsetAndMetadata>> offsets =
event.offsets().isEmpty() ?
+ Optional.of(subscriptions.allConsumed()) : event.offsets();
+ event.markOffsetsReady();
Review Comment:
I think one improvement is that we need to make this consistent for both
commits, sync and async. They both need to retrieve the `allConsumed` if the
`event.offsets` is empty, and `markOffsetsReady` (this is key, the app thread
will block on this for all commits, and we're not completing it on the sync
path it seems).
Let's please add unit tests to ensure that processing SyncCommitEvent and
AsyncCommitEvents completes the offsetsReady future no matter what.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -818,6 +818,8 @@ void maybeReconcile() {
return;
}
+ if (!canCommit) return;
Review Comment:
should we
```
if (autoCommitEnabled && !canCommit) return;
```
Seems much clearer why we cannot start to reconcile, and less error prone
(to avoid shortcircuiting here by mistake)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -267,7 +268,7 @@ private CompletableFuture<Map<TopicPartition,
OffsetAndMetadata>> requestAutoCom
public void maybeAutoCommitAsync() {
if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) {
OffsetCommitRequestState requestState = createOffsetCommitRequest(
- subscriptions.allConsumed(),
+ latestPartitionOffsets,
Review Comment:
I guess here we could have the same situation @junrao described for the
commit before revocation right below:
> However, when we get here, it's possible that a batch of records have just
been returned to the application thread before the first step, but those
records haven't been processed yet. So latestPartitionOffsets is not up to date
yet. We need to wait for the next setLatestPartitionOffsets() call to happen.
At that point, we know any record returned to the application will have been
processed and no more records can be given to the application. So, it's safe to
commit the offset at that point.
I wonder if we should fix here in the same way we're tackling the commit
before revocation, by fixing **when** we auto-commit, not **what** we
auto-commit?
Before this PR, both auto-commits (on the interval and before revocation)
were triggered freely in the background thread when polling the managers
(opening door for race with fetch). If we move to triggering those commits
before fetching (when processing `PollEvent`), wouldn't we solve the
correctness/race issue, without needing to keep any snapshot of the
`subscriptions.allConsumed`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]