This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 4e0578fb814 KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion (#15525) 4e0578fb814 is described below commit 4e0578fb814e99da2a8f4d6adc123fe87c8f24f3 Author: Philip Nee <p...@confluent.io> AuthorDate: Mon Apr 8 04:04:58 2024 -0700 KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion (#15525) A subtle difference in the behavior of the two API causes the failures with Invalid negative timestamp. In this PR, the list offsets response will be processed differently based on the API. For beginingOffsets/endOffsets - the offset response should be directly returned. For offsetsForTimes - A OffsetAndTimestamp object is constructed for each requested TopicPartition before being returned. The reason beginningOffsets and endOffsets - We are expecting a -1 timestamp from the response which subsequently causes the invalid timestamp exception because the original code tries to construct an OffsetAndTimestamp object upon returning. In this PR, the following missing tasks are added: short-circuit both beginningOrEndOffsets Test both API (beginningOrEndOffsets, OffsetsForTime) Seems like we don't have tests for this API: Note it is presented in other IntegrationTests but they are added to test Async consumer Reviewers: Lucas Brutschy <lbruts...@confluent.io>, Lianet Magrans <liane...@gmail.com> --- .../consumer/internals/AsyncKafkaConsumer.java | 52 ++++--- .../internals/OffsetAndTimestampInternal.java | 81 +++++++++++ .../consumer/internals/OffsetFetcherUtils.java | 45 ++++-- .../consumer/internals/OffsetsRequestManager.java | 47 +++--- .../events/ApplicationEventProcessor.java | 10 +- .../internals/events/ListOffsetsEvent.java | 18 +-- .../consumer/internals/AsyncKafkaConsumerTest.java | 70 +++++---- .../internals/ConsumerNetworkThreadTest.java | 9 +- .../internals/OffsetsRequestManagerTest.java | 160 ++++++++++----------- .../kafka/api/PlaintextConsumerTest.scala | 56 +++++++- 10 files changed, 367 insertions(+), 181 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index fcd57469c2a..74e8866242e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1093,17 +1093,22 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { return Collections.emptyMap(); } final Timer timer = time.timer(timeout); - final ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent( - timestampsToSearch, - true, - timer); + ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent( + timestampsToSearch, + timer, + true); // If timeout is set to zero return empty immediately; otherwise try to get the results // and throw timeout exception if it cannot complete in time. if (timeout.toMillis() == 0L) - return listOffsetsEvent.emptyResult(); - - return applicationEventHandler.addAndGet(listOffsetsEvent, timer); + return listOffsetsEvent.emptyResults(); + + return applicationEventHandler.addAndGet(listOffsetsEvent, timer) + .entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().buildOffsetAndTimestamp())); } finally { release(); } @@ -1141,21 +1146,30 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { if (partitions.isEmpty()) { return Collections.emptyMap(); } + Map<TopicPartition, Long> timestampToSearch = partitions - .stream() - .collect(Collectors.toMap(Function.identity(), tp -> timestamp)); + .stream() + .collect(Collectors.toMap(Function.identity(), tp -> timestamp)); Timer timer = time.timer(timeout); ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent( - timestampToSearch, - false, - timer); - Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = applicationEventHandler.addAndGet( - listOffsetsEvent, - timer); - return offsetAndTimestampMap - .entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); + timestampToSearch, + timer, + false); + + Map<TopicPartition, OffsetAndTimestampInternal> offsetAndTimestampMap; + if (timeout.isZero()) { + // Return an empty results but also send a request to update the highwatermark. + applicationEventHandler.add(listOffsetsEvent); + return listOffsetsEvent.emptyResults(); + } + offsetAndTimestampMap = applicationEventHandler.addAndGet( + listOffsetsEvent, + timer); + return offsetAndTimestampMap.entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().offset())); } finally { release(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java new file mode 100644 index 00000000000..08d451da477 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; + +import java.util.Optional; + +/** + * Internal representation of {@link OffsetAndTimestamp} to allow negative timestamps and offset. + */ +public class OffsetAndTimestampInternal { + private final long timestamp; + private final long offset; + private final Optional<Integer> leaderEpoch; + + public OffsetAndTimestampInternal(long offset, long timestamp, Optional<Integer> leaderEpoch) { + this.offset = offset; + this.timestamp = timestamp; + this.leaderEpoch = leaderEpoch; + } + + long offset() { + return offset; + } + + long timestamp() { + return timestamp; + } + + Optional<Integer> leaderEpoch() { + return leaderEpoch; + } + + public OffsetAndTimestamp buildOffsetAndTimestamp() { + return new OffsetAndTimestamp(offset, timestamp, leaderEpoch); + } + + @Override + public int hashCode() { + int result = (int) (timestamp ^ (timestamp >>> 32)); + result = 31 * result + (int) (offset ^ (offset >>> 32)); + result = 31 * result + leaderEpoch.hashCode(); + return result; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof OffsetAndTimestampInternal)) return false; + + OffsetAndTimestampInternal that = (OffsetAndTimestampInternal) o; + + if (timestamp != that.timestamp) return false; + if (offset != that.offset) return false; + return leaderEpoch.equals(that.leaderEpoch); + } + + @Override + public String toString() { + return "OffsetAndTimestampInternal{" + + "timestamp=" + timestamp + + ", offset=" + offset + + ", leaderEpoch=" + leaderEpoch + + '}'; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java index 9239811f7d6..ac5d8a4acff 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java @@ -50,6 +50,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; @@ -240,20 +241,48 @@ class OffsetFetcherUtils { return offsetResetTimestamps; } - static Map<TopicPartition, OffsetAndTimestamp> buildOffsetsForTimesResult(final Map<TopicPartition, Long> timestampsToSearch, - final Map<TopicPartition, ListOffsetData> fetchedOffsets) { - HashMap<TopicPartition, OffsetAndTimestamp> offsetsByTimes = new HashMap<>(timestampsToSearch.size()); + static Map<TopicPartition, OffsetAndTimestamp> buildListOffsetsResult( + final Map<TopicPartition, Long> timestampsToSearch, + final Map<TopicPartition, ListOffsetData> fetchedOffsets, + BiFunction<TopicPartition, ListOffsetData, OffsetAndTimestamp> resultMapper) { + + HashMap<TopicPartition, OffsetAndTimestamp> offsetsResults = new HashMap<>(timestampsToSearch.size()); for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) - offsetsByTimes.put(entry.getKey(), null); + offsetsResults.put(entry.getKey(), null); for (Map.Entry<TopicPartition, ListOffsetData> entry : fetchedOffsets.entrySet()) { - // 'entry.getValue().timestamp' will not be null since we are guaranteed - // to work with a v1 (or later) ListOffset request ListOffsetData offsetData = entry.getValue(); - offsetsByTimes.put(entry.getKey(), new OffsetAndTimestamp(offsetData.offset, offsetData.timestamp, offsetData.leaderEpoch)); + offsetsResults.put(entry.getKey(), resultMapper.apply(entry.getKey(), offsetData)); } - return offsetsByTimes; + return offsetsResults; + } + + static Map<TopicPartition, OffsetAndTimestamp> buildOffsetsForTimesResult( + final Map<TopicPartition, Long> timestampsToSearch, + final Map<TopicPartition, ListOffsetData> fetchedOffsets) { + return buildListOffsetsResult(timestampsToSearch, fetchedOffsets, + (topicPartition, offsetData) -> new OffsetAndTimestamp( + offsetData.offset, + offsetData.timestamp, + offsetData.leaderEpoch)); + } + + static Map<TopicPartition, OffsetAndTimestampInternal> buildOffsetsForTimeInternalResult( + final Map<TopicPartition, Long> timestampsToSearch, + final Map<TopicPartition, ListOffsetData> fetchedOffsets) { + HashMap<TopicPartition, OffsetAndTimestampInternal> offsetsResults = new HashMap<>(timestampsToSearch.size()); + for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) { + offsetsResults.put(entry.getKey(), null); + } + for (Map.Entry<TopicPartition, ListOffsetData> entry : fetchedOffsets.entrySet()) { + ListOffsetData offsetData = entry.getValue(); + offsetsResults.put(entry.getKey(), new OffsetAndTimestampInternal( + offsetData.offset, + offsetData.timestamp, + offsetData.leaderEpoch)); + } + return offsetsResults; } private Long offsetResetStrategyTimestamp(final TopicPartition partition) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java index c5156e9e0b9..22e56111b47 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java @@ -151,14 +151,13 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis * found .The future will complete when the requests responses are received and * processed, following a call to {@link #poll(long)} */ - public CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsets( - final Map<TopicPartition, Long> timestampsToSearch, - final boolean requireTimestamps) { + public CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsets( + Map<TopicPartition, Long> timestampsToSearch, + boolean requireTimestamps) { if (timestampsToSearch.isEmpty()) { return CompletableFuture.completedFuture(Collections.emptyMap()); } metadata.addTransientTopics(OffsetFetcherUtils.topicsForPartitions(timestampsToSearch.keySet())); - ListOffsetsRequestState listOffsetsRequestState = new ListOffsetsRequestState( timestampsToSearch, requireTimestamps, @@ -175,10 +174,11 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis } }); - fetchOffsetsByTimes(timestampsToSearch, requireTimestamps, listOffsetsRequestState); - - return listOffsetsRequestState.globalResult.thenApply(result -> - OffsetFetcherUtils.buildOffsetsForTimesResult(timestampsToSearch, result.fetchedOffsets)); + prepareFetchOffsetsRequests(timestampsToSearch, requireTimestamps, listOffsetsRequestState); + return listOffsetsRequestState.globalResult.thenApply( + result -> OffsetFetcherUtils.buildOffsetsForTimeInternalResult( + timestampsToSearch, + result.fetchedOffsets)); } /** @@ -235,14 +235,9 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis * Generate requests for partitions with known leaders. Update the listOffsetsRequestState by adding * partitions with unknown leader to the listOffsetsRequestState.remainingToSearch */ - private void fetchOffsetsByTimes(final Map<TopicPartition, Long> timestampsToSearch, - final boolean requireTimestamps, - final ListOffsetsRequestState listOffsetsRequestState) { - if (timestampsToSearch.isEmpty()) { - // Early return if empty map to avoid wrongfully raising StaleMetadataException on - // empty grouping - return; - } + private void prepareFetchOffsetsRequests(final Map<TopicPartition, Long> timestampsToSearch, + final boolean requireTimestamps, + final ListOffsetsRequestState listOffsetsRequestState) { try { List<NetworkClientDelegate.UnsentRequest> unsentRequests = buildListOffsetsRequests( timestampsToSearch, requireTimestamps, listOffsetsRequestState); @@ -263,7 +258,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis Map<TopicPartition, Long> timestampsToSearch = new HashMap<>(requestState.remainingToSearch); requestState.remainingToSearch.clear(); - fetchOffsetsByTimes(timestampsToSearch, requestState.requireTimestamps, requestState); + prepareFetchOffsetsRequests(timestampsToSearch, requestState.requireTimestamps, requestState); }); } @@ -298,7 +293,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis offsetFetcherUtils.updateSubscriptionState(multiNodeResult.fetchedOffsets, isolationLevel); - if (listOffsetsRequestState.remainingToSearch.size() == 0) { + if (listOffsetsRequestState.remainingToSearch.isEmpty()) { ListOffsetResult listOffsetResult = new ListOffsetResult(listOffsetsRequestState.fetchedOffsets, listOffsetsRequestState.remainingToSearch.keySet()); @@ -314,7 +309,6 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis for (Map.Entry<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> entry : timestampsToSearchByNode.entrySet()) { Node node = entry.getKey(); - CompletableFuture<ListOffsetResult> partialResult = buildListOffsetRequestToNode( node, entry.getValue(), @@ -364,8 +358,7 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis ListOffsetsResponse lor = (ListOffsetsResponse) response.responseBody(); log.trace("Received ListOffsetResponse {} from broker {}", lor, node); try { - ListOffsetResult listOffsetResult = - offsetFetcherUtils.handleListOffsetResponse(lor); + ListOffsetResult listOffsetResult = offsetFetcherUtils.handleListOffsetResponse(lor); result.complete(listOffsetResult); } catch (RuntimeException e) { result.completeExceptionally(e); @@ -423,11 +416,11 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis }); }); - if (unsentRequests.size() > 0) { + if (unsentRequests.isEmpty()) { + globalResult.complete(null); + } else { expectedResponses.set(unsentRequests.size()); requestsToSend.addAll(unsentRequests); - } else { - globalResult.complete(null); } return globalResult; @@ -503,11 +496,11 @@ public class OffsetsRequestManager implements RequestManager, ClusterResourceLis }); }); - if (unsentRequests.size() > 0) { + if (unsentRequests.isEmpty()) { + globalResult.complete(null); + } else { expectedResponses.set(unsentRequests.size()); requestsToSend.addAll(unsentRequests); - } else { - globalResult.complete(null); } return globalResult; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 33825307465..d5cb1c04b38 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -17,12 +17,12 @@ package org.apache.kafka.clients.consumer.internals.events; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.internals.CachedSupplier; import org.apache.kafka.clients.consumer.internals.CommitRequestManager; import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; import org.apache.kafka.clients.consumer.internals.MembershipManager; +import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal; import org.apache.kafka.clients.consumer.internals.RequestManagers; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; @@ -197,10 +197,12 @@ public class ApplicationEventProcessor extends EventProcessor<ApplicationEvent> manager.maybeAutoCommitAsync(); } + /** + * Handles ListOffsetsEvent by fetching the offsets for the given partitions and timestamps. + */ private void process(final ListOffsetsEvent event) { - final CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> future = - requestManagers.offsetsRequestManager.fetchOffsets(event.timestampsToSearch(), - event.requireTimestamps()); + final CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> future = + requestManagers.offsetsRequestManager.fetchOffsets(event.timestampsToSearch(), event.requireTimestamps()); future.whenComplete(complete(event.future())); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java index e218705846e..3df4719a7b0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer.internals.events; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Timer; @@ -32,12 +33,13 @@ import java.util.Map; * {@link OffsetAndTimestamp} found (offset of the first message whose timestamp is greater than * or equals to the target timestamp) */ -public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestamp>> { - +public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestampInternal>> { private final Map<TopicPartition, Long> timestampsToSearch; private final boolean requireTimestamps; - public ListOffsetsEvent(final Map<TopicPartition, Long> timestampToSearch, final boolean requireTimestamps, final Timer timer) { + public ListOffsetsEvent(Map<TopicPartition, Long> timestampToSearch, + Timer timer, + boolean requireTimestamps) { super(Type.LIST_OFFSETS, timer); this.timestampsToSearch = Collections.unmodifiableMap(timestampToSearch); this.requireTimestamps = requireTimestamps; @@ -49,11 +51,10 @@ public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicParti * @return Map containing all the partitions the event was trying to get offsets for, and * null {@link OffsetAndTimestamp} as value */ - public Map<TopicPartition, OffsetAndTimestamp> emptyResult() { - HashMap<TopicPartition, OffsetAndTimestamp> offsetsByTimes = new HashMap<>(timestampsToSearch.size()); - for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) - offsetsByTimes.put(entry.getKey(), null); - return offsetsByTimes; + public <T> Map<TopicPartition, T> emptyResults() { + Map<TopicPartition, T> result = new HashMap<>(); + timestampsToSearch.keySet().forEach(tp -> result.put(tp, null)); + return result; } public Map<TopicPartition, Long> timestampsToSearch() { @@ -70,5 +71,4 @@ public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicParti ", timestampsToSearch=" + timestampsToSearch + ", requireTimestamps=" + requireTimestamps; } - } \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index fa413eed36a..51fcd2e44eb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -102,7 +102,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.Arrays.asList; @@ -309,7 +308,7 @@ public class AsyncKafkaConsumerTest { @Test public void testCommitAsyncWithFencedException() { consumer = newConsumer(); - final HashMap<TopicPartition, OffsetAndMetadata> offsets = mockTopicPartitionOffset(); + final Map<TopicPartition, OffsetAndMetadata> offsets = mockTopicPartitionOffset(); MockCommitCallback callback = new MockCommitCallback(); assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback)); @@ -841,18 +840,17 @@ public class AsyncKafkaConsumerTest { @Test public void testBeginningOffsets() { consumer = newConsumer(); - Map<TopicPartition, OffsetAndTimestamp> expectedOffsetsAndTimestamp = - mockOffsetAndTimestamp(); - Set<TopicPartition> partitions = expectedOffsetsAndTimestamp.keySet(); - doReturn(expectedOffsetsAndTimestamp).when(applicationEventHandler).addAndGet(any(), any()); - Map<TopicPartition, Long> result = - assertDoesNotThrow(() -> consumer.beginningOffsets(partitions, - Duration.ofMillis(1))); - Map<TopicPartition, Long> expectedOffsets = expectedOffsetsAndTimestamp.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); - assertEquals(expectedOffsets, result); - verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class), - ArgumentMatchers.isA(Timer.class)); + Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = mockOffsetAndTimestamp(); + + doReturn(expectedOffsets).when(applicationEventHandler).addAndGet(any(), any()); + + Map<TopicPartition, Long> result = assertDoesNotThrow(() -> consumer.beginningOffsets(expectedOffsets.keySet(), Duration.ofMillis(1))); + + expectedOffsets.forEach((key, value) -> { + assertTrue(result.containsKey(key)); + assertEquals(value.offset(), result.get(key)); + }); + verify(applicationEventHandler).addAndGet(any(ListOffsetsEvent.class), any(Timer.class)); } @Test @@ -913,13 +911,16 @@ public class AsyncKafkaConsumerTest { @Test public void testOffsetsForTimes() { consumer = newConsumer(); - Map<TopicPartition, OffsetAndTimestamp> expectedResult = mockOffsetAndTimestamp(); + Map<TopicPartition, OffsetAndTimestampInternal> expectedResult = mockOffsetAndTimestamp(); Map<TopicPartition, Long> timestampToSearch = mockTimestampToSearch(); doReturn(expectedResult).when(applicationEventHandler).addAndGet(any(), any()); Map<TopicPartition, OffsetAndTimestamp> result = assertDoesNotThrow(() -> consumer.offsetsForTimes(timestampToSearch, Duration.ofMillis(1))); - assertEquals(expectedResult, result); + expectedResult.forEach((key, value) -> { + OffsetAndTimestamp expected = value.buildOffsetAndTimestamp(); + assertEquals(expected, result.get(key)); + }); verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class), ArgumentMatchers.isA(Timer.class)); } @@ -927,17 +928,26 @@ public class AsyncKafkaConsumerTest { // This test ensures same behaviour as the current consumer when offsetsForTimes is called // with 0 timeout. It should return map with all requested partitions as keys, with null // OffsetAndTimestamp as value. + @Test + public void testBeginningOffsetsWithZeroTimeout() { + consumer = newConsumer(); + TopicPartition tp = new TopicPartition("topic1", 0); + Map<TopicPartition, Long> result = + assertDoesNotThrow(() -> consumer.beginningOffsets(Collections.singletonList(tp), Duration.ZERO)); + // The result should be {tp=null} + assertTrue(result.containsKey(tp)); + assertNull(result.get(tp)); + verify(applicationEventHandler).add(ArgumentMatchers.isA(ListOffsetsEvent.class)); + } + @Test public void testOffsetsForTimesWithZeroTimeout() { consumer = newConsumer(); TopicPartition tp = new TopicPartition("topic1", 0); - Map<TopicPartition, OffsetAndTimestamp> expectedResult = - Collections.singletonMap(tp, null); + Map<TopicPartition, OffsetAndTimestamp> expectedResult = Collections.singletonMap(tp, null); Map<TopicPartition, Long> timestampToSearch = Collections.singletonMap(tp, 5L); - Map<TopicPartition, OffsetAndTimestamp> result = - assertDoesNotThrow(() -> consumer.offsetsForTimes(timestampToSearch, - Duration.ZERO)); + assertDoesNotThrow(() -> consumer.offsetsForTimes(timestampToSearch, Duration.ZERO)); assertEquals(expectedResult, result); verify(applicationEventHandler, never()).addAndGet(ArgumentMatchers.isA(ListOffsetsEvent.class), ArgumentMatchers.isA(Timer.class)); @@ -946,7 +956,7 @@ public class AsyncKafkaConsumerTest { @Test public void testWakeupCommitted() { consumer = newConsumer(); - final HashMap<TopicPartition, OffsetAndMetadata> offsets = mockTopicPartitionOffset(); + final Map<TopicPartition, OffsetAndMetadata> offsets = mockTopicPartitionOffset(); doAnswer(invocation -> { CompletableApplicationEvent<?> event = invocation.getArgument(0); Timer timer = invocation.getArgument(1); @@ -1647,28 +1657,28 @@ public class AsyncKafkaConsumerTest { } } - private HashMap<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() { + private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() { final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); - HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<>(); + Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<>(); topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L)); topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L)); return topicPartitionOffsets; } - private HashMap<TopicPartition, OffsetAndTimestamp> mockOffsetAndTimestamp() { + private Map<TopicPartition, OffsetAndTimestampInternal> mockOffsetAndTimestamp() { final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); - HashMap<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = new HashMap<>(); - offsetAndTimestamp.put(t0, new OffsetAndTimestamp(5L, 1L)); - offsetAndTimestamp.put(t1, new OffsetAndTimestamp(6L, 3L)); + Map<TopicPartition, OffsetAndTimestampInternal> offsetAndTimestamp = new HashMap<>(); + offsetAndTimestamp.put(t0, new OffsetAndTimestampInternal(5L, 1L, Optional.empty())); + offsetAndTimestamp.put(t1, new OffsetAndTimestampInternal(6L, 3L, Optional.empty())); return offsetAndTimestamp; } - private HashMap<TopicPartition, Long> mockTimestampToSearch() { + private Map<TopicPartition, Long> mockTimestampToSearch() { final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); - HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>(); + Map<TopicPartition, Long> timestampToSearch = new HashMap<>(); timestampToSearch.put(t0, 1L); timestampToSearch.put(t1, 2L); return timestampToSearch; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java index e4d492fb581..091009064de 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java @@ -47,6 +47,8 @@ import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; import java.util.ArrayList; @@ -169,11 +171,12 @@ public class ConsumerNetworkThreadTest { verify(applicationEventProcessor).process(any(SyncCommitEvent.class)); } - @Test - public void testListOffsetsEventIsProcessed() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testListOffsetsEventIsProcessed(boolean requireTimestamp) { Map<TopicPartition, Long> timestamps = Collections.singletonMap(new TopicPartition("topic1", 1), 5L); Timer timer = time.timer(100); - ApplicationEvent e = new ListOffsetsEvent(timestamps, true, timer); + ApplicationEvent e = new ListOffsetsEvent(timestamps, timer, requireTimestamp); applicationEventsQueue.add(e); consumerNetworkThread.runOnce(); verify(applicationEventProcessor).process(any(ListOffsetsEvent.class)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java index 582efdfe76b..109a7d92b55 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java @@ -20,7 +20,6 @@ import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NodeApiVersions; -import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; @@ -131,13 +130,15 @@ public class OffsetsRequestManagerTest { ListOffsetsRequest.EARLIEST_TIMESTAMP); mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); - CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> result = requestManager.fetchOffsets( + CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> result = requestManager.fetchOffsets( timestampsToSearch, false); assertEquals(1, requestManager.requestsToSend()); assertEquals(0, requestManager.requestsToRetry()); - Map<TopicPartition, OffsetAndTimestamp> expectedOffsets = Collections.singletonMap(TEST_PARTITION_1, new OffsetAndTimestamp(5L, 1L)); + Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = Collections.singletonMap( + TEST_PARTITION_1, + new OffsetAndTimestampInternal(5L, -1, Optional.empty())); verifySuccessfulPollAndResponseReceived(result, expectedOffsets); } @@ -148,8 +149,9 @@ public class OffsetsRequestManagerTest { // Building list offsets request fails with unknown leader mockFailedRequest_MissingLeader(); - CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsetsFuture = - requestManager.fetchOffsets(timestampsToSearch, false); + CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsetsFuture = + requestManager.fetchOffsets(timestampsToSearch, false); + assertEquals(0, requestManager.requestsToSend()); assertEquals(1, requestManager.requestsToRetry()); verify(metadata).requestUpdate(true); @@ -172,22 +174,23 @@ public class OffsetsRequestManagerTest { partitionLeaders.put(TEST_PARTITION_1, LEADER_1); partitionLeaders.put(TEST_PARTITION_2, LEADER_1); mockSuccessfulRequest(partitionLeaders); - CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> result = requestManager.fetchOffsets( - timestampsToSearch, - false); + CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> result = requestManager.fetchOffsets( + timestampsToSearch, + false); assertEquals(1, requestManager.requestsToSend()); assertEquals(0, requestManager.requestsToRetry()); - Map<TopicPartition, OffsetAndTimestamp> expectedOffsets = timestampsToSearch.entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey(), e -> new OffsetAndTimestamp(5L, 1L))); + Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = timestampsToSearch.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, + e -> new OffsetAndTimestampInternal(5L, -1, Optional.empty()))); verifySuccessfulPollAndResponseReceived(result, expectedOffsets); } @Test public void testListOffsetsRequestEmpty() throws ExecutionException, InterruptedException { - CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> result = requestManager.fetchOffsets( - Collections.emptyMap(), - false); + CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> result = requestManager.fetchOffsets( + Collections.emptyMap(), + false); assertEquals(0, requestManager.requestsToSend()); assertEquals(0, requestManager.requestsToRetry()); @@ -209,7 +212,7 @@ public class OffsetsRequestManagerTest { ListOffsetsRequest.EARLIEST_TIMESTAMP); mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); - CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> result = requestManager.fetchOffsets( + CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> result = requestManager.fetchOffsets( timestampsToSearch, false); assertEquals(1, requestManager.requestsToSend()); @@ -223,9 +226,7 @@ public class OffsetsRequestManagerTest { NetworkClientDelegate.UnsentRequest unsentRequest = retriedPoll.unsentRequests.get(0); ClientResponse clientResponse = buildClientResponse(unsentRequest, topicResponses); clientResponse.onComplete(); - - Map<TopicPartition, OffsetAndTimestamp> expectedOffsets = - Collections.singletonMap(TEST_PARTITION_1, null); + Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = Collections.singletonMap(TEST_PARTITION_1, null); verifyRequestSuccessfullyCompleted(result, expectedOffsets); } @@ -237,9 +238,8 @@ public class OffsetsRequestManagerTest { // Building list offsets request fails with unknown leader mockFailedRequest_MissingLeader(); - CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsetsFuture = - requestManager.fetchOffsets(timestampsToSearch, - false); + CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsetsFuture = + requestManager.fetchOffsets(timestampsToSearch, false); assertEquals(0, requestManager.requestsToSend()); assertEquals(1, requestManager.requestsToRetry()); verify(metadata).requestUpdate(true); @@ -254,8 +254,8 @@ public class OffsetsRequestManagerTest { requestManager.onUpdate(new ClusterResource("")); assertEquals(1, requestManager.requestsToSend()); - Map<TopicPartition, OffsetAndTimestamp> expectedOffsets = Collections.singletonMap( - TEST_PARTITION_1, new OffsetAndTimestamp(5L, 1L)); + Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = Collections.singletonMap( + TEST_PARTITION_1, new OffsetAndTimestampInternal(5L, -1, Optional.empty())); verifySuccessfulPollAndResponseReceived(fetchOffsetsFuture, expectedOffsets); } @@ -267,7 +267,7 @@ public class OffsetsRequestManagerTest { // List offsets request successfully built mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); - CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsetsFuture = requestManager.fetchOffsets( + CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsetsFuture = requestManager.fetchOffsets( timestampsToSearch, false); assertEquals(1, requestManager.requestsToSend()); @@ -293,7 +293,8 @@ public class OffsetsRequestManagerTest { requestManager.onUpdate(new ClusterResource("")); assertEquals(1, requestManager.requestsToSend()); - Map<TopicPartition, OffsetAndTimestamp> expectedOffsets = Collections.singletonMap(TEST_PARTITION_1, new OffsetAndTimestamp(5L, 1L)); + Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = + Collections.singletonMap(TEST_PARTITION_1, new OffsetAndTimestampInternal(5L, -1, Optional.empty())); verifySuccessfulPollAndResponseReceived(fetchOffsetsFuture, expectedOffsets); } @@ -315,7 +316,7 @@ public class OffsetsRequestManagerTest { // List offsets request successfully built mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); - CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsetsFuture = requestManager.fetchOffsets( + CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsetsFuture = requestManager.fetchOffsets( timestampsToSearch, false); assertEquals(1, requestManager.requestsToSend()); @@ -334,7 +335,8 @@ public class OffsetsRequestManagerTest { clientResponse.onComplete(); // Null offsets should be returned for each partition - Map<TopicPartition, OffsetAndTimestamp> expectedOffsets = Collections.singletonMap(TEST_PARTITION_1, null); + Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = + Collections.singletonMap(TEST_PARTITION_1, null); verifyRequestSuccessfullyCompleted(fetchOffsetsFuture, expectedOffsets); } @@ -344,15 +346,17 @@ public class OffsetsRequestManagerTest { timestampsToSearch.put(TEST_PARTITION_1, ListOffsetsRequest.EARLIEST_TIMESTAMP); timestampsToSearch.put(TEST_PARTITION_2, ListOffsetsRequest.EARLIEST_TIMESTAMP); - Map<TopicPartition, OffsetAndTimestamp> expectedOffsets = timestampsToSearch.entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey(), e -> new OffsetAndTimestamp(5L, 1L))); + Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = timestampsToSearch.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> new OffsetAndTimestampInternal(5L, -1, Optional.empty()))); // List offsets request to 2 brokers successfully built Map<TopicPartition, Node> partitionLeaders = new HashMap<>(); partitionLeaders.put(TEST_PARTITION_1, LEADER_1); partitionLeaders.put(TEST_PARTITION_2, LEADER_2); mockSuccessfulRequest(partitionLeaders); - CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsetsFuture = requestManager.fetchOffsets( + CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsetsFuture = requestManager.fetchOffsets( timestampsToSearch, false); assertEquals(2, requestManager.requestsToSend()); @@ -366,14 +370,16 @@ public class OffsetsRequestManagerTest { // Mixed response with failures and successes. Offsets successfully fetched from one // broker but retriable UNKNOWN_LEADER_EPOCH received from second broker. NetworkClientDelegate.UnsentRequest unsentRequest1 = res.unsentRequests.get(0); + long offsets = expectedOffsets.get(TEST_PARTITION_1).offset(); ClientResponse clientResponse1 = buildClientResponse( unsentRequest1, - Collections.singletonMap(TEST_PARTITION_1, expectedOffsets.get(TEST_PARTITION_1))); + Collections.singletonMap(TEST_PARTITION_1, + new OffsetAndTimestampInternal(offsets, -1L, Optional.empty()))); clientResponse1.onComplete(); NetworkClientDelegate.UnsentRequest unsentRequest2 = res.unsentRequests.get(1); ClientResponse clientResponse2 = buildClientResponseWithErrors( - unsentRequest2, - Collections.singletonMap(TEST_PARTITION_2, Errors.UNKNOWN_LEADER_EPOCH)); + unsentRequest2, + Collections.singletonMap(TEST_PARTITION_2, Errors.UNKNOWN_LEADER_EPOCH)); clientResponse2.onComplete(); assertFalse(fetchOffsetsFuture.isDone()); @@ -389,8 +395,10 @@ public class OffsetsRequestManagerTest { NetworkClientDelegate.PollResult retriedPoll = requestManager.poll(time.milliseconds()); verifySuccessfulPollAwaitingResponse(retriedPoll); NetworkClientDelegate.UnsentRequest unsentRequest = retriedPoll.unsentRequests.get(0); + long offsets2 = expectedOffsets.get(TEST_PARTITION_2).offset(); ClientResponse clientResponse = buildClientResponse(unsentRequest, - Collections.singletonMap(TEST_PARTITION_2, expectedOffsets.get(TEST_PARTITION_2))); + Collections.singletonMap(TEST_PARTITION_2, + new OffsetAndTimestampInternal(offsets2, -1L, Optional.empty()))); clientResponse.onComplete(); // Verify global result with the offset initially retrieved, and the offset that @@ -405,7 +413,7 @@ public class OffsetsRequestManagerTest { // List offsets request successfully built mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); - CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsetsFuture = + CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsetsFuture = requestManager.fetchOffsets( timestampsToSearch, false); @@ -434,7 +442,7 @@ public class OffsetsRequestManagerTest { // List offsets request successfully built mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); - CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsetsFuture = + CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsetsFuture = requestManager.fetchOffsets( timestampsToSearch, false); @@ -466,10 +474,11 @@ public class OffsetsRequestManagerTest { // List offsets request successfully built mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, LEADER_1)); - CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> fetchOffsetsFuture = - requestManager.fetchOffsets( - timestampsToSearch, - false); + CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsetsFuture = + requestManager.fetchOffsets( + timestampsToSearch, + false); + assertEquals(1, requestManager.requestsToSend()); assertEquals(0, requestManager.requestsToRetry()); @@ -480,11 +489,16 @@ public class OffsetsRequestManagerTest { // Response received with auth error NetworkClientDelegate.UnsentRequest unsentRequest = res.unsentRequests.get(0); ClientResponse clientResponse = - buildClientResponseWithAuthenticationException(unsentRequest); + buildClientResponse(unsentRequest, + Collections.emptyList(), + false, + new AuthenticationException("Authentication failed")); clientResponse.onComplete(); - // Request completed with error. Nothing pending to be sent or retried - verifyRequestCompletedWithErrorResponse(fetchOffsetsFuture, AuthenticationException.class); + assertTrue(fetchOffsetsFuture.isCompletedExceptionally()); + Throwable failure = assertThrows(ExecutionException.class, fetchOffsetsFuture::get); + assertEquals(AuthenticationException.class, failure.getCause().getClass()); + assertEquals(0, requestManager.requestsToRetry()); assertEquals(0, requestManager.requestsToSend()); } @@ -669,8 +683,6 @@ public class OffsetsRequestManagerTest { Node leader = LEADER_1; OffsetResetStrategy strategy = OffsetResetStrategy.EARLIEST; long offset = 5L; - Map<TopicPartition, OffsetAndTimestamp> expectedOffsets = Collections.singletonMap(tp, - new OffsetAndTimestamp(offset, 1L, leaderAndEpoch.epoch)); when(subscriptionState.partitionsNeedingReset(time.milliseconds())).thenReturn(Collections.singleton(tp)); when(subscriptionState.resetStrategy(any())).thenReturn(strategy); mockSuccessfulRequest(Collections.singletonMap(tp, leader)); @@ -682,7 +694,8 @@ public class OffsetsRequestManagerTest { when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(leader, leaderAndEpoch.epoch)); NetworkClientDelegate.PollResult pollResult = requestManager.poll(time.milliseconds()); NetworkClientDelegate.UnsentRequest unsentRequest = pollResult.unsentRequests.get(0); - ClientResponse clientResponse = buildClientResponse(unsentRequest, expectedOffsets); + ClientResponse clientResponse = buildClientResponse(unsentRequest, Collections.singletonMap(tp, + new OffsetAndTimestampInternal(offset, 1L, leaderAndEpoch.epoch))); clientResponse.onComplete(); assertTrue(unsentRequest.future().isDone()); assertFalse(unsentRequest.future().isCompletedExceptionally()); @@ -714,8 +727,8 @@ public class OffsetsRequestManagerTest { } private void verifySuccessfulPollAndResponseReceived( - CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> actualResult, - Map<TopicPartition, OffsetAndTimestamp> expectedResult) throws ExecutionException, + CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> actualResult, + Map<TopicPartition, OffsetAndTimestampInternal> expectedResult) throws ExecutionException, InterruptedException { // Following poll should send the request and get a response NetworkClientDelegate.PollResult retriedPoll = requestManager.poll(time.milliseconds()); @@ -726,6 +739,15 @@ public class OffsetsRequestManagerTest { verifyRequestSuccessfullyCompleted(actualResult, expectedResult); } + + private void verifyRequestCompletedWithErrorResponse(CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> actualResult, + Class<? extends Throwable> expectedFailure) { + assertTrue(actualResult.isDone()); + assertTrue(actualResult.isCompletedExceptionally()); + Throwable failure = assertThrows(ExecutionException.class, actualResult::get); + assertEquals(expectedFailure, failure.getCause().getClass()); + } + private void mockSuccessfulRequest(Map<TopicPartition, Node> partitionLeaders) { partitionLeaders.forEach((tp, broker) -> { when(metadata.currentLeader(tp)).thenReturn(testLeaderEpoch(broker, @@ -753,24 +775,24 @@ public class OffsetsRequestManagerTest { } private void verifyRequestSuccessfullyCompleted( - CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> actualResult, - Map<TopicPartition, OffsetAndTimestamp> expectedResult) throws ExecutionException, InterruptedException { + CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> actualResult, + Map<TopicPartition, OffsetAndTimestampInternal> expectedResult) throws ExecutionException, InterruptedException { assertEquals(0, requestManager.requestsToRetry()); assertEquals(0, requestManager.requestsToSend()); assertTrue(actualResult.isDone()); assertFalse(actualResult.isCompletedExceptionally()); - Map<TopicPartition, OffsetAndTimestamp> partitionOffsets = actualResult.get(); + Map<TopicPartition, OffsetAndTimestampInternal> partitionOffsets = actualResult.get(); assertEquals(expectedResult, partitionOffsets); // Validate that the subscription state has been updated for all non-null offsets retrieved - Map<TopicPartition, OffsetAndTimestamp> validExpectedOffsets = expectedResult.entrySet().stream() + Map<TopicPartition, Long> validExpectedOffsets = expectedResult.entrySet().stream() .filter(entry -> entry.getValue() != null) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + .collect(Collectors.toMap(Map.Entry::getKey, v -> v.getValue().offset())); verifySubscriptionStateUpdated(validExpectedOffsets); } - private void verifySubscriptionStateUpdated(Map<TopicPartition, OffsetAndTimestamp> expectedResult) { + private void verifySubscriptionStateUpdated(Map<TopicPartition, Long> expectedResult) { ArgumentCaptor<TopicPartition> tpCaptor = ArgumentCaptor.forClass(TopicPartition.class); ArgumentCaptor<Long> offsetCaptor = ArgumentCaptor.forClass(Long.class); @@ -784,18 +806,10 @@ public class OffsetsRequestManagerTest { assertEquals(expectedResult.values().size(), updatedOffsets.size()); expectedResult.values().stream() - .map(offsetAndTimestamp -> updatedOffsets.contains(offsetAndTimestamp.offset())) + .map(updatedOffsets::contains) .forEach(Assertions::assertTrue); } - private void verifyRequestCompletedWithErrorResponse(CompletableFuture<Map<TopicPartition, OffsetAndTimestamp>> actualResult, - Class<? extends Throwable> expectedFailure) { - assertTrue(actualResult.isDone()); - assertTrue(actualResult.isCompletedExceptionally()); - Throwable failure = assertThrows(ExecutionException.class, actualResult::get); - assertEquals(expectedFailure, failure.getCause().getClass()); - } - private Metadata.LeaderAndEpoch testLeaderEpoch(Node leader, Optional<Integer> epoch) { return new Metadata.LeaderAndEpoch(Optional.of(leader), epoch); } @@ -814,7 +828,7 @@ public class OffsetsRequestManagerTest { private ClientResponse buildClientResponse( final NetworkClientDelegate.UnsentRequest request, - final Map<TopicPartition, OffsetAndTimestamp> partitionsOffsets) { + final Map<TopicPartition, OffsetAndTimestampInternal> partitionsOffsets) { List<ListOffsetsResponseData.ListOffsetsTopicResponse> topicResponses = new ArrayList<>(); partitionsOffsets.forEach((tp, offsetAndTimestamp) -> { @@ -901,7 +915,6 @@ public class OffsetsRequestManagerTest { private ClientResponse buildClientResponse( final NetworkClientDelegate.UnsentRequest request, final List<ListOffsetsResponseData.ListOffsetsTopicResponse> topicResponses) { - return buildClientResponse(request, topicResponses, false, null); } @@ -919,12 +932,6 @@ public class OffsetsRequestManagerTest { return buildClientResponse(request, topicResponses, false, null); } - private ClientResponse buildClientResponseWithAuthenticationException( - final NetworkClientDelegate.UnsentRequest request) { - return buildClientResponse(request, Collections.emptyList(), true, - new AuthenticationException("Authentication failed")); - } - private ClientResponse buildClientResponse( final NetworkClientDelegate.UnsentRequest request, final List<ListOffsetsResponseData.ListOffsetsTopicResponse> topicResponses, @@ -933,7 +940,9 @@ public class OffsetsRequestManagerTest { AbstractRequest abstractRequest = request.requestBuilder().build(); assertInstanceOf(ListOffsetsRequest.class, abstractRequest); ListOffsetsRequest offsetFetchRequest = (ListOffsetsRequest) abstractRequest; - ListOffsetsResponse response = buildListOffsetsResponse(topicResponses); + ListOffsetsResponse response = new ListOffsetsResponse(new ListOffsetsResponseData() + .setThrottleTimeMs(0) + .setTopics(topicResponses)); return new ClientResponse( new RequestHeader(ApiKeys.OFFSET_FETCH, offsetFetchRequest.version(), "", 1), request.handler(), @@ -946,13 +955,4 @@ public class OffsetsRequestManagerTest { response ); } - - private ListOffsetsResponse buildListOffsetsResponse( - List<ListOffsetsResponseData.ListOffsetsTopicResponse> offsetsTopicResponses) { - ListOffsetsResponseData responseData = new ListOffsetsResponseData() - .setThrottleTimeMs(0) - .setTopics(offsetsTopicResponses); - - return new ListOffsetsResponse(responseData); - } } \ No newline at end of file diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 77ba47fa08f..933bea9ac0e 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -15,7 +15,7 @@ package kafka.api import java.time.Duration import java.util import java.util.Arrays.asList -import java.util.{Locale, Properties} +import java.util.{Collections, Locale, Optional, Properties} import kafka.server.{KafkaBroker, QuotaType} import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin.{NewPartitions, NewTopic} @@ -805,4 +805,58 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer2.close() } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testEndOffsets(quorum: String, groupProtocol: String): Unit = { + val producer = createProducer() + val startingTimestamp = System.currentTimeMillis() + val numRecords = 10000 + (0 until numRecords).map { i => + val timestamp = startingTimestamp + i.toLong + val record = new ProducerRecord(tp.topic(), tp.partition(), timestamp, s"key $i".getBytes, s"value $i".getBytes) + producer.send(record) + record + } + producer.flush() + + val consumer = createConsumer() + consumer.subscribe(List(topic).asJava) + awaitAssignment(consumer, Set(tp, tp2)) + + val endOffsets = consumer.endOffsets(Set(tp).asJava) + assertEquals(numRecords, endOffsets.get(tp)) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testFetchOffsetsForTime(quorum: String, groupProtocol: String): Unit = { + val numPartitions = 2 + val producer = createProducer() + val timestampsToSearch = new util.HashMap[TopicPartition, java.lang.Long]() + var i = 0 + for (part <- 0 until numPartitions) { + val tp = new TopicPartition(topic, part) + // key, val, and timestamp equal to the sequence number. + sendRecords(producer, numRecords = 100, tp, startingTimestamp = 0) + timestampsToSearch.put(tp, (i * 20).toLong) + i += 1 + } + + val consumer = createConsumer() + // Test negative target time + assertThrows(classOf[IllegalArgumentException], + () => consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition(topic, 0), -1))) + val timestampOffsets = consumer.offsetsForTimes(timestampsToSearch) + + val timestampTp0 = timestampOffsets.get(new TopicPartition(topic, 0)) + assertEquals(0, timestampTp0.offset) + assertEquals(0, timestampTp0.timestamp) + assertEquals(Optional.of(0), timestampTp0.leaderEpoch) + + val timestampTp1 = timestampOffsets.get(new TopicPartition(topic, 1)) + assertEquals(20, timestampTp1.offset) + assertEquals(20, timestampTp1.timestamp) + assertEquals(Optional.of(0), timestampTp1.leaderEpoch) + } }