lianetm commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1955228852
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -750,7 +750,7 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
do {
// Make sure to let the background thread know that we are
still polling.
Review Comment:
Should we update this comment? Maybe worth calling out that this will
trigger async auto-commits of the consumed positions if needed (on the interval
and for revoking partitions)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -566,7 +563,7 @@ private boolean
isStaleEpochErrorAndValidEpochAvailable(Throwable error) {
return error instanceof StaleMemberEpochException &&
memberInfo.memberEpoch.isPresent();
}
- public void updateAutoCommitTimer(final long currentTimeMs) {
+ void updateAutoCommitTimer(final long currentTimeMs) {
Review Comment:
then this could be just private too
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -264,7 +263,7 @@ private CompletableFuture<Map<TopicPartition,
OffsetAndMetadata>> requestAutoCom
* In that case, the next auto-commit request will be sent on the next
call to poll, after a
* response for the in-flight is received.
*/
- public void maybeAutoCommitAsync() {
+ void maybeAutoCommitAsync() {
Review Comment:
since we added the new `updateTimerAndMaybeCommit` to encapsulate this,
should this be private now? I checked and it's only used in this same class and
the test (but I would say we should update the test to just use
updateTimerAndMaybeCommit instead of updating the timer and calling the
maybeAutoCommitAsync separately)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -637,6 +634,11 @@ private void maybeUpdateLastSeenEpochIfNewer(final
Map<TopicPartition, OffsetAnd
});
}
+ public void updateTimerAndMaybeCommit(final long currentTimeMs) {
Review Comment:
could we add the java doc? Mainly to point out that this is a non-blocking
func, updates the timer and triggers the async auto-commit of all consumed is
needed.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -320,27 +321,17 @@ public void testCommitSync() {
public void testCommitSyncWithEmptyOffsets() {
subscriptionState = mock(SubscriptionState.class);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
- TopicPartition tp = new TopicPartition("topic", 1);
- OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0,
Optional.of(1), "");
- Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp,
offsetAndMetadata);
- doReturn(offsets).when(subscriptionState).allConsumed();
CommitRequestManager commitRequestManager = create(false, 100);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitSync(
- Optional.empty(), time.milliseconds() + defaultApiTimeoutMs);
- assertEquals(1,
commitRequestManager.unsentOffsetCommitRequests().size());
- List<NetworkClientDelegate.FutureCompletionHandler> pollResults =
assertPoll(1, commitRequestManager);
- pollResults.forEach(v -> v.onComplete(mockOffsetCommitResponse(
- "topic",
- 1,
- (short) 1,
- Errors.NONE)));
+ Collections.emptyMap(), time.milliseconds() + defaultApiTimeoutMs);
+ assertEquals(0,
commitRequestManager.unsentOffsetCommitRequests().size());
+ assertPoll(0, commitRequestManager);
- verify(subscriptionState).allConsumed();
- verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+ verify(metadata, never()).updateLastSeenEpochIfNewer(any(), anyInt());
Map<TopicPartition, OffsetAndMetadata> commitOffsets =
assertDoesNotThrow(() -> future.get());
assertTrue(future.isDone());
- assertEquals(offsets, commitOffsets);
Review Comment:
this was testing something different before (commit sync with empty offsets
should use the allConsumed). That responsibility is not in this CommitMgr
anymore, so ok to change this test, but we need to add coverage elsewhere (in
the AppProcessorTest I expect, to test that process
AsyncCommitEvent/SyncCommitEvent with empty offset retrieves the allConsumed
and calls the commitMgr with them).
Also, with this change, the `testCommitSyncWithEmptyAllConsumedOffsets`
right below seems to be a duplicate now?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -393,7 +383,7 @@ public void testCommitAsyncWithEmptyOffsets() {
doReturn(offsets).when(subscriptionState).allConsumed();
CommitRequestManager commitRequestManager = create(true, 100);
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitAsync(Optional.empty());
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitAsync(offsets);
Review Comment:
with this change I guess this test ends up being the same as the above one?
we just need one, I would probably remove this one that deals with the
subcsription state and that doesn't apply here anymore (and add the coverage in
the AppEventProcessor for the case of allConsumed).
Also, the following `testCommitAsyncWithEmptyAllConsumedOffsets` should be
renamed to `testCommitAsyncWithEmptyOffsets`.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -415,9 +404,8 @@ public void testCommitAsyncWithEmptyAllConsumedOffsets() {
doReturn(Map.of()).when(subscriptionState).allConsumed();
Review Comment:
I would expect we don't need this anymore?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -475,20 +477,24 @@ public void testSyncCommitEventWithException() {
doReturn(future).when(commitRequestManager).commitSync(any(),
anyLong());
processor.process(event);
- verify(commitRequestManager).commitSync(Optional.empty(), 12345);
+ verify(commitRequestManager).commitSync(Collections.emptyMap(), 12345);
+ assertTrue(event.offsetsReady.isDone());
assertFutureThrows(event.future(), IllegalStateException.class);
}
@ParameterizedTest
@MethodSource("offsetsGenerator")
public void testAsyncCommitEventWithOffsets(Optional<Map<TopicPartition,
OffsetAndMetadata>> offsets) {
Review Comment:
ditto
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -446,14 +445,17 @@ public void
testR2JPatternSubscriptionEventFailureWithMixedSubscriptionType() {
@MethodSource("offsetsGenerator")
public void testSyncCommitEvent(Optional<Map<TopicPartition,
OffsetAndMetadata>> offsets) {
Review Comment:
this needs to be extended (or split) to ensure that if offsets are empty, it
will use the subscriptions.allConsumed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]