lianetm commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1956606014
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -386,38 +360,10 @@ public void testCommitAsync() {
@Test
public void testCommitAsyncWithEmptyOffsets() {
subscriptionState = mock(SubscriptionState.class);
-
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
- TopicPartition tp = new TopicPartition("topic", 1);
- OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0,
Optional.of(1), "");
- Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp,
offsetAndMetadata);
- doReturn(offsets).when(subscriptionState).allConsumed();
-
- CommitRequestManager commitRequestManager = create(true, 100);
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitAsync(Optional.empty());
- assertEquals(1,
commitRequestManager.unsentOffsetCommitRequests().size());
- List<NetworkClientDelegate.FutureCompletionHandler> pollResults =
assertPoll(1, commitRequestManager);
- pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
- "topic",
- 1,
- (short) 1,
- Errors.NONE)));
-
- verify(subscriptionState).allConsumed();
- verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
- assertTrue(future.isDone());
- Map<TopicPartition, OffsetAndMetadata> commitOffsets =
assertDoesNotThrow(() -> future.get());
- assertEquals(offsets, commitOffsets);
- }
-
- @Test
- public void testCommitAsyncWithEmptyAllConsumedOffsets() {
- subscriptionState = mock(SubscriptionState.class);
- doReturn(Map.of()).when(subscriptionState).allConsumed();
CommitRequestManager commitRequestManager = create(true, 100);
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitAsync(Optional.empty());
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitAsync(Collections.emptyMap());
- verify(subscriptionState).allConsumed();
assertTrue(future.isDone());
Review Comment:
should we add the check to make sure that no request is added to the queue
if empty offsets? (could happen even is the future isDone)
`assertPoll(0, commitRequestManager);`
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -637,6 +634,24 @@ private void maybeUpdateLastSeenEpochIfNewer(final
Map<TopicPartition, OffsetAnd
});
}
+ /**
+ * This is a non-blocking method to update timer and trigger async
auto-commit.
+ * <p>
+ * This method performs two main tasks:
+ * <ol>
+ * <li>Updates the internal timer with the current time.</li>
+ * <li>Initiate an asynchronous auto-commit operation for all consumed
messages if needed.</li>
Review Comment:
```suggestion
* <li>Initiate an asynchronous auto-commit operation for all consumed
positions if needed.</li>
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1515,6 +1522,8 @@ public void
testGroupRemoteAssignorUsedInConsumerProtocol() {
consumer = newConsumer(config);
assertFalse(config.unused().contains(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
+ // To unblock commitSyncAllConsumed on close
+ markOffsetsReadyForCommitEvent();
Review Comment:
ditto
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1539,6 +1548,8 @@ public void testGroupIdNotNullAndValid() {
assertTrue(config.unused().contains(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
assertTrue(config.unused().contains(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED));
+ // To unblock commitSyncAllConsumed on close
+ markOffsetsReadyForCommitEvent();
Review Comment:
makes sense, but given that this test is unrelated to commits, maybe a
cleaner approach is to simply disable set ENABLE_AUTO_COMMIT_CONFIG=false?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -595,18 +540,20 @@ public void testAutocommitEnsureOnlyOneInflightRequest() {
CommitRequestManager commitRequestManager = create(true, 100);
time.sleep(100);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
List<NetworkClientDelegate.FutureCompletionHandler> futures =
assertPoll(1, commitRequestManager);
time.sleep(100);
- commitRequestManager.updateAutoCommitTimer(time.milliseconds());
+ commitRequestManager.updateTimerAndMaybeCommit(time.milliseconds());
// We want to make sure we don't resend autocommit if the previous
request has not been
// completed, even if the interval expired
assertPoll(0, commitRequestManager);
assertEmptyPendingRequests(commitRequestManager);
// complete the unsent request and re-poll
futures.get(0).onComplete(buildOffsetCommitClientResponse(new
OffsetCommitResponse(0, new HashMap<>())));
+ time.sleep(100);
Review Comment:
why do we need to wait for the interval here again? It expired on ln 546
above, so the next one should be generated as soon as we receive a response and
have a pollEvent I expect, no more waiting.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -1383,9 +1398,7 @@ public void registerStateListener(MemberStateListener
listener) {
* time-sensitive operations should be performed
*/
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
- if (state == MemberState.RECONCILING) {
- maybeReconcile();
- }
+ maybeReconcile(false);
Review Comment:
I couldn't find coverage for this and it would be a nasty regression
(messing committed offsets). Let's pls add a simple test to check that this
mananger.poll calls reconcile with canCommit false.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -206,13 +206,25 @@ public void process(ApplicationEvent event) {
}
private void process(final PollEvent event) {
+ // Trigger a reconciliation that can safely commit offsets if needed
to revoke partitions,
+ // as we're processing before any new fetching starts in the app thread
+
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
+ consumerMembershipManager.maybeReconcile(true));
if (requestManagers.commitRequestManager.isPresent()) {
- requestManagers.commitRequestManager.ifPresent(m ->
m.updateAutoCommitTimer(event.pollTimeMs()));
+ CommitRequestManager commitRequestManager =
requestManagers.commitRequestManager.get();
+ commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
+ // all commit request generation points have been passed,
+ // so it's safe to notify the app thread could start the next poll
cycle
Review Comment:
```suggestion
// so it's safe to notify the app thread could proceed and start
fetching
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]