This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 9979503 KAFKA-13637: Use default.api.timeout.ms as default timeout
value for KafkaConsumer.endOffsets (#11726)
9979503 is described below
commit 997950343843c254cfdcb429866250e9f3f7b677
Author: dengziming <[email protected]>
AuthorDate: Thu Feb 3 17:32:25 2022 +0800
KAFKA-13637: Use default.api.timeout.ms as default timeout value for
KafkaConsumer.endOffsets (#11726)
We introduced `default.api.timeout.ms` in
https://github.com/apache/kafka/commit/53ca52f855e903907378188d29224b3f9cefa6cb
but we missed updating `KafkaConsumer.endOffsets` which still use
`request.timeout.ms`. This patch fixes this.
Reviewers: David Jacot <[email protected]>
---
.../kafka/clients/consumer/KafkaConsumer.java | 4 +-
.../kafka/clients/consumer/KafkaConsumerTest.java | 58 +++++++++++++++++++++-
2 files changed, 58 insertions(+), 4 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 29a9e37..c3a4b87 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -2181,11 +2181,11 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
* @throws org.apache.kafka.common.errors.AuthenticationException if
authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not
authorized to the topic(s). See the exception for more details
* @throws org.apache.kafka.common.errors.TimeoutException if the offset
metadata could not be fetched before
- * the amount of time allocated by {@code request.timeout.ms}
expires
+ * the amount of time allocated by {@code default.api.timeout.ms}
expires
*/
@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition>
partitions) {
- return endOffsets(partitions, Duration.ofMillis(requestTimeoutMs));
+ return endOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
}
/**
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 326eaa4..a035d2b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -157,6 +157,8 @@ public class KafkaConsumerTest {
private final TopicPartition t3p0 = new TopicPartition(topic3, 0);
private final int sessionTimeoutMs = 10000;
+ private final int defaultApiTimeoutMs = 60000;
+ private final int requestTimeoutMs = defaultApiTimeoutMs / 2;
private final int heartbeatIntervalMs = 1000;
// Set auto commit interval lower than heartbeat so we don't need to deal
with
@@ -2496,8 +2498,6 @@ public class KafkaConsumerTest {
String clientId = "mock-consumer";
String metricGroupPrefix = "consumer";
long retryBackoffMs = 100;
- int requestTimeoutMs = 30000;
- int defaultApiTimeoutMs = 30000;
int minBytes = 1;
int maxBytes = Integer.MAX_VALUE;
int maxWaitMs = 500;
@@ -2815,6 +2815,60 @@ public class KafkaConsumerTest {
}
}
+ @Test
+ public void testOffsetsForTimesTimeout() {
+ final KafkaConsumer<String, String> consumer =
consumerForCheckingTimeoutException();
+ assertEquals(
+ "Failed to get offsets by times in 60000ms",
+
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () ->
consumer.offsetsForTimes(singletonMap(tp0, 0L))).getMessage()
+ );
+ }
+
+ @Test
+ public void testBeginningOffsetsTimeout() {
+ final KafkaConsumer<String, String> consumer =
consumerForCheckingTimeoutException();
+ assertEquals(
+ "Failed to get offsets by times in 60000ms",
+
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () ->
consumer.beginningOffsets(singletonList(tp0))).getMessage()
+ );
+ }
+
+ @Test
+ public void testEndOffsetsTimeout() {
+ final KafkaConsumer<String, String> consumer =
consumerForCheckingTimeoutException();
+ assertEquals(
+ "Failed to get offsets by times in 60000ms",
+
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () ->
consumer.endOffsets(singletonList(tp0))).getMessage()
+ );
+ }
+
+ private KafkaConsumer<String, String>
consumerForCheckingTimeoutException() {
+ final Time time = new MockTime();
+ SubscriptionState subscription = new SubscriptionState(new
LogContext(), OffsetResetStrategy.EARLIEST);
+ ConsumerMetadata metadata = createMetadata(subscription);
+ MockClient client = new MockClient(time, metadata);
+
+ initMetadata(client, singletonMap(topic, 1));
+
+ ConsumerPartitionAssignor assignor = new RangeAssignor();
+
+ final KafkaConsumer<String, String> consumer = newConsumer(time,
client, subscription, metadata, assignor, false, groupInstanceId);
+
+ for (int i = 0; i < 10; i++) {
+ client.prepareResponse(
+ request -> {
+ time.sleep(defaultApiTimeoutMs / 10);
+ return request instanceof ListOffsetsRequest;
+ },
+ listOffsetsResponse(
+ Collections.emptyMap(),
+ Collections.singletonMap(tp0,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ ));
+ }
+
+ return consumer;
+ }
+
private static final List<String> CLIENT_IDS = new ArrayList<>();
public static class DeserializerForClientId implements
Deserializer<byte[]> {
@Override