lianetm commented on code in PR #19577:
URL: https://github.com/apache/kafka/pull/19577#discussion_r2491415450
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -1667,15 +1726,23 @@ private ClientResponse
buildOffsetCommitClientResponse(final OffsetCommitRespons
);
}
+ private ClientResponse mockOffsetCommitResponseWithTopicId(String topic,
Review Comment:
commit responses with topic IDs will have empty topic name, so we should we
remove this param and just pass empty on the call to
`mockOffsetCommitResponse`?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -760,13 +771,22 @@ public void onResponse(final ClientResponse response) {
boolean failedRequestRegistered = false;
for (OffsetCommitResponseData.OffsetCommitResponseTopic topic :
commitResponse.data().topics()) {
for (OffsetCommitResponseData.OffsetCommitResponsePartition
partition : topic.partitions()) {
- TopicPartition tp = new TopicPartition(topic.name(),
partition.partitionIndex());
+ // Version 10 drop topic name and support to topic id.
Review Comment:
This doesn't read very well, we could maybe rephrase it to
```suggestion
// Version 10 drops topic name, and supports topic id.
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -469,6 +469,63 @@ public void
testCommitSyncFailsWithCommitFailedExceptionOnStaleMemberEpoch() {
assertFutureThrows(CommitFailedException.class, commitResult);
}
+ @Test
+ public void testCommitSyncShouldSucceedWithTopicId() {
+ subscriptionState = mock(SubscriptionState.class);
+ TopicPartition tp = new TopicPartition("topic", 1);
+ Uuid topicId = Uuid.randomUuid();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+ when(metadata.topicIds()).thenReturn(Map.of("topic", topicId));
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0,
Optional.of(1), "");
+ Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(tp,
offsetAndMetadata);
+
+ CommitRequestManager commitRequestManager = create(false, 100);
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
commitRequestManager.commitSync(
+ offsets, time.milliseconds() + defaultApiTimeoutMs);
+ assertEquals(1,
commitRequestManager.unsentOffsetCommitRequests().size());
+ List<NetworkClientDelegate.FutureCompletionHandler> pollResults =
assertPoll(1, commitRequestManager);
+ pollResults.forEach(v ->
v.onComplete(mockOffsetCommitResponseWithTopicId(
+ "topic",
+ topicId,
+ 1,
+ (short) 10,
+ Errors.NONE)));
+
+ verify(subscriptionState, never()).allConsumed();
+ verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+ Map<TopicPartition, OffsetAndMetadata> commitOffsets =
assertDoesNotThrow(() -> future.get());
+ assertTrue(future.isDone());
+ assertEquals(offsets, commitOffsets);
+ }
+
+ @Test
+ public void testCommitSyncShouldSucceedWithUnknownOffsetAndMetadata() {
Review Comment:
we can verify that we never `updateLastSeenEpochIfNewer` on this path
(opposed to the test above where we do expect a call to it)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]