This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new af6a9a1  KAFKA-13637: Use default.api.timeout.ms as default timeout 
value for KafkaConsumer.endOffsets (#11726)
af6a9a1 is described below

commit af6a9a17bffe654096d21190b3742e8835b2fccf
Author: dengziming <[email protected]>
AuthorDate: Thu Feb 3 17:32:25 2022 +0800

    KAFKA-13637: Use default.api.timeout.ms as default timeout value for 
KafkaConsumer.endOffsets (#11726)
    
    We introduced `default.api.timeout.ms` in 
https://github.com/apache/kafka/commit/53ca52f855e903907378188d29224b3f9cefa6cb 
but we missed updating `KafkaConsumer.endOffsets` which still use 
`request.timeout.ms`. This patch fixes this.
    
    Reviewers: David Jacot <[email protected]>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  4 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 58 +++++++++++++++++++++-
 2 files changed, 58 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f369e14..e537ff0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -2197,11 +2197,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic(s). See the exception for more details
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
-     *         the amount of time allocated by {@code request.timeout.ms} 
expires
+     *         the amount of time allocated by {@code default.api.timeout.ms} 
expires
      */
     @Override
     public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> 
partitions) {
-        return endOffsets(partitions, Duration.ofMillis(requestTimeoutMs));
+        return endOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 2872983..9b79473 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -164,6 +164,8 @@ public class KafkaConsumerTest {
     private final TopicPartition t3p0 = new TopicPartition(topic3, 0);
 
     private final int sessionTimeoutMs = 10000;
+    private final int defaultApiTimeoutMs = 60000;
+    private final int requestTimeoutMs = defaultApiTimeoutMs / 2;
     private final int heartbeatIntervalMs = 1000;
 
     // Set auto commit interval lower than heartbeat so we don't need to deal 
with
@@ -2618,8 +2620,6 @@ public class KafkaConsumerTest {
         String clientId = "mock-consumer";
         String metricGroupPrefix = "consumer";
         long retryBackoffMs = 100;
-        int requestTimeoutMs = 30000;
-        int defaultApiTimeoutMs = 30000;
         int minBytes = 1;
         int maxBytes = Integer.MAX_VALUE;
         int maxWaitMs = 500;
@@ -2948,6 +2948,60 @@ public class KafkaConsumerTest {
             () -> new KafkaConsumer<>(configs, new StringDeserializer(), new 
StringDeserializer()));
     }
 
+    @Test
+    public void testOffsetsForTimesTimeout() {
+        final KafkaConsumer<String, String> consumer = 
consumerForCheckingTimeoutException();
+        assertEquals(
+            "Failed to get offsets by times in 60000ms",
+            
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> 
consumer.offsetsForTimes(singletonMap(tp0, 0L))).getMessage()
+        );
+    }
+
+    @Test
+    public void testBeginningOffsetsTimeout() {
+        final KafkaConsumer<String, String> consumer = 
consumerForCheckingTimeoutException();
+        assertEquals(
+            "Failed to get offsets by times in 60000ms",
+            
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> 
consumer.beginningOffsets(singletonList(tp0))).getMessage()
+        );
+    }
+
+    @Test
+    public void testEndOffsetsTimeout() {
+        final KafkaConsumer<String, String> consumer = 
consumerForCheckingTimeoutException();
+        assertEquals(
+            "Failed to get offsets by times in 60000ms",
+            
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> 
consumer.endOffsets(singletonList(tp0))).getMessage()
+        );
+    }
+
+    private KafkaConsumer<String, String> 
consumerForCheckingTimeoutException() {
+        final Time time = new MockTime();
+        SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+
+        initMetadata(client, singletonMap(topic, 1));
+
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, 
client, subscription, metadata, assignor, false, groupInstanceId);
+
+        for (int i = 0; i < 10; i++) {
+            client.prepareResponse(
+                request -> {
+                    time.sleep(defaultApiTimeoutMs / 10);
+                    return request instanceof ListOffsetsRequest;
+                },
+                listOffsetsResponse(
+                    Collections.emptyMap(),
+                    Collections.singletonMap(tp0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+                ));
+        }
+
+        return consumer;
+    }
+
     private static final List<String> CLIENT_IDS = new ArrayList<>();
     public static class DeserializerForClientId implements 
Deserializer<byte[]> {
         @Override

Reply via email to