mimaison commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r469964306
##########
File path:
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -3546,12 +3594,54 @@ private void testGetOffsetsForTimesWithUnknownOffset() {
MetadataResponse initialMetadataUpdate =
TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1));
client.updateMetadata(initialMetadataUpdate);
- Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData =
new HashMap<>();
- partitionData.put(tp0, new
ListOffsetResponse.PartitionData(Errors.NONE,
- ListOffsetResponse.UNKNOWN_TIMESTAMP,
ListOffsetResponse.UNKNOWN_OFFSET,
- Optional.empty()));
+ ListOffsetResponseData data = new ListOffsetResponseData()
+ .setThrottleTimeMs(0)
+ .setTopics(Collections.singletonList(new
ListOffsetTopicResponse()
+ .setName(tp0.topic())
+ .setPartitions(Collections.singletonList(new
ListOffsetPartitionResponse()
+ .setPartitionIndex(tp0.partition())
+ .setErrorCode(Errors.NONE.code())
+
.setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+
.setOffset(ListOffsetResponse.UNKNOWN_OFFSET)))));
+
+ client.prepareResponseFrom(new ListOffsetResponse(data),
+ metadata.fetch().leaderFor(tp0));
+
+ Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
+ timestampToSearch.put(tp0, 0L);
+ Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap =
+ fetcher.offsetsForTimes(timestampToSearch,
time.timer(Long.MAX_VALUE));
- client.prepareResponseFrom(new ListOffsetResponse(0, partitionData),
+ assertTrue(offsetAndTimestampMap.containsKey(tp0));
+ assertNull(offsetAndTimestampMap.get(tp0));
+ }
+
+ @Test
+ public void testGetOffsetsForTimesWithUnknownOffsetV0() {
Review comment:
I think so. I used the following test on trunk and it passes:
```
@Test
public void testGetOffsetsForTimesWithUnknownOffsetV0() {
buildFetcher();
client.reset();
// Ensure metadata has both partition.
MetadataResponse initialMetadataUpdate =
TestUtils.metadataUpdateWith(1, singletonMap(topicName, 1));
client.updateMetadata(initialMetadataUpdate);
Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData
= new HashMap<>();
partitionData.put(tp0, new
ListOffsetResponse.PartitionData(Errors.NONE,
Collections.emptyList()));
client.prepareResponseFrom(new ListOffsetResponse(0, partitionData),
metadata.fetch().leaderFor(tp0));
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
timestampToSearch.put(tp0, 0L);
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap =
fetcher.offsetsForTimes(timestampToSearch,
time.timer(Long.MAX_VALUE));
assertTrue(offsetAndTimestampMap.containsKey(tp0));
assertNull(offsetAndTimestampMap.get(tp0));
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]