lianetm commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1937527654
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -414,7 +418,16 @@ private void process(final ResetOffsetEvent event) {
*/
private void process(final CheckAndUpdatePositionsEvent event) {
CompletableFuture<Boolean> future =
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
- future.whenComplete(complete(event.future()));
+ final CompletableFuture<Boolean> b = event.future();
+ future.whenComplete((BiConsumer<? super Boolean, ? super Throwable>)
(value, exception) -> {
Review Comment:
do we really need to cast the `(value, exception)` here?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -402,22 +399,20 @@ public void testCommitAsyncWithEmptyOffsets() {
(short) 1,
Errors.NONE)));
- verify(subscriptionState).allConsumed();
verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
assertTrue(future.isDone());
Map<TopicPartition, OffsetAndMetadata> commitOffsets =
assertDoesNotThrow(() -> future.get());
assertEquals(offsets, commitOffsets);
}
@Test
- public void testCommitAsyncWithEmptyAllConsumedOffsets() {
+ public void testCommitAsyncWithEmptyLatestPartitionOffsetsOffsets() {
Review Comment:
I would say the test name still applies as it was (just that allConsumed is
taken when we know it has been returned). The new one seems a bit confusing
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -389,7 +388,7 @@ private void
autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState
* future will be completed with a {@link RetriableCommitFailedException}.
*/
public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
commitAsync(final Optional<Map<TopicPartition, OffsetAndMetadata>> offsets) {
- Map<TopicPartition, OffsetAndMetadata> commitOffsets =
offsets.orElseGet(subscriptions::allConsumed);
+ Map<TopicPartition, OffsetAndMetadata> commitOffsets =
offsets.orElseGet(this::latestPartitionOffsets);
Review Comment:
uhm I don't think we should change here, and it's actually dangerous I
believe. This is my reasoning (please correct me at any point): we have 2 kinds
of commit operations in this manager:
1. commits triggered automatically in the background (commit before
rebalance and auto-commit on the interval)
2. commits triggered by API calls (`commitSync` and `commitAsync`, which are
only triggered by a consumer.commitSync/Async call or consumer.close. Note that
these could be for specific offsets, or for allConsumed)
My take is that with this PR we need to change only 1, which are the ones
affected by the race condition with the fetch happening within a consumer poll
iteration. Those commits that happen automatically cannot take the
`allConsumed` from the subscription state because we could be in the middle of
a consumer poll iteration in the app thread (with positions advanced but the
records not returned yet). So agree with the changes to
`maybeAutoCommitSyncBeforeRevocation` and `maybeAutoCommitAsync` to not use
subscriptionState.allConsumed.
But, the commits grouped in 2 (triggered by consumer API calls), can and
should use the `allConsumed` from the `subscriptionState` I expect, as they
happen outside of poll the loop, so first, they don't land in the race we're
targeting, and most importantly, we cannot even ensure that the commitMgr
latestPartitionOffsets has the positions returned when they are called (this is
the dangerous part).
Ex. single call to poll that returns 5 records + commitSync()/commitAsync()
If that commit takes the `latestPartitionOffsets` from the commitReqMgr,
wouldn't that be 0? the `latestPartitionOffsets` is only incremented on the
next call to poll (if any), which makes sense, because that's the only time,
when running a continuos poll, that we can certainly assume that the records
have been returned (on the previous iteration). Makes sense?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -637,6 +636,14 @@ private void maybeUpdateLastSeenEpochIfNewer(final
Map<TopicPartition, OffsetAnd
});
}
+ public Map<TopicPartition, OffsetAndMetadata> latestPartitionOffsets() {
+ return latestPartitionOffsets;
+ }
+
+ public void setLatestPartitionOffsets(Map<TopicPartition,
OffsetAndMetadata> offsets) {
+ this.latestPartitionOffsets = Collections.unmodifiableMap(offsets);
Review Comment:
should we add a debug log here to know that we're updating the all consumed
positions to be committed? (I expect it will be helpful to track the flow if
needed)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -414,7 +418,16 @@ private void process(final ResetOffsetEvent event) {
*/
private void process(final CheckAndUpdatePositionsEvent event) {
CompletableFuture<Boolean> future =
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
- future.whenComplete(complete(event.future()));
+ final CompletableFuture<Boolean> b = event.future();
Review Comment:
is there a reason why we need this var? (vs using event.future directly to
complete below)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -250,7 +251,10 @@ private void process(final SyncCommitEvent event) {
try {
CommitRequestManager manager =
requestManagers.commitRequestManager.get();
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
manager.commitSync(event.offsets(), event.deadlineMs());
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
manager.commitSync(
+ event.offsets().orElseGet(subscriptions::allConsumed),
+ event.deadlineMs()
+ );
Review Comment:
uhm not sure, relates to comment above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]