This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 9979503  KAFKA-13637: Use default.api.timeout.ms as default timeout 
value for KafkaConsumer.endOffsets (#11726)
9979503 is described below

commit 997950343843c254cfdcb429866250e9f3f7b677
Author: dengziming <[email protected]>
AuthorDate: Thu Feb 3 17:32:25 2022 +0800

    KAFKA-13637: Use default.api.timeout.ms as default timeout value for 
KafkaConsumer.endOffsets (#11726)
    
    We introduced `default.api.timeout.ms` in 
https://github.com/apache/kafka/commit/53ca52f855e903907378188d29224b3f9cefa6cb 
but we missed updating `KafkaConsumer.endOffsets` which still use 
`request.timeout.ms`. This patch fixes this.
    
    Reviewers: David Jacot <[email protected]>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  4 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 58 +++++++++++++++++++++-
 2 files changed, 58 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 29a9e37..c3a4b87 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -2181,11 +2181,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic(s). See the exception for more details
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
-     *         the amount of time allocated by {@code request.timeout.ms} 
expires
+     *         the amount of time allocated by {@code default.api.timeout.ms} 
expires
      */
     @Override
     public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> 
partitions) {
-        return endOffsets(partitions, Duration.ofMillis(requestTimeoutMs));
+        return endOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 326eaa4..a035d2b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -157,6 +157,8 @@ public class KafkaConsumerTest {
     private final TopicPartition t3p0 = new TopicPartition(topic3, 0);
 
     private final int sessionTimeoutMs = 10000;
+    private final int defaultApiTimeoutMs = 60000;
+    private final int requestTimeoutMs = defaultApiTimeoutMs / 2;
     private final int heartbeatIntervalMs = 1000;
 
     // Set auto commit interval lower than heartbeat so we don't need to deal 
with
@@ -2496,8 +2498,6 @@ public class KafkaConsumerTest {
         String clientId = "mock-consumer";
         String metricGroupPrefix = "consumer";
         long retryBackoffMs = 100;
-        int requestTimeoutMs = 30000;
-        int defaultApiTimeoutMs = 30000;
         int minBytes = 1;
         int maxBytes = Integer.MAX_VALUE;
         int maxWaitMs = 500;
@@ -2815,6 +2815,60 @@ public class KafkaConsumerTest {
         }
     }
 
+    @Test
+    public void testOffsetsForTimesTimeout() {
+        final KafkaConsumer<String, String> consumer = 
consumerForCheckingTimeoutException();
+        assertEquals(
+            "Failed to get offsets by times in 60000ms",
+            
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> 
consumer.offsetsForTimes(singletonMap(tp0, 0L))).getMessage()
+        );
+    }
+
+    @Test
+    public void testBeginningOffsetsTimeout() {
+        final KafkaConsumer<String, String> consumer = 
consumerForCheckingTimeoutException();
+        assertEquals(
+            "Failed to get offsets by times in 60000ms",
+            
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> 
consumer.beginningOffsets(singletonList(tp0))).getMessage()
+        );
+    }
+
+    @Test
+    public void testEndOffsetsTimeout() {
+        final KafkaConsumer<String, String> consumer = 
consumerForCheckingTimeoutException();
+        assertEquals(
+            "Failed to get offsets by times in 60000ms",
+            
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> 
consumer.endOffsets(singletonList(tp0))).getMessage()
+        );
+    }
+
+    private KafkaConsumer<String, String> 
consumerForCheckingTimeoutException() {
+        final Time time = new MockTime();
+        SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+
+        initMetadata(client, singletonMap(topic, 1));
+
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, 
client, subscription, metadata, assignor, false, groupInstanceId);
+
+        for (int i = 0; i < 10; i++) {
+            client.prepareResponse(
+                request -> {
+                    time.sleep(defaultApiTimeoutMs / 10);
+                    return request instanceof ListOffsetsRequest;
+                },
+                listOffsetsResponse(
+                    Collections.emptyMap(),
+                    Collections.singletonMap(tp0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+                ));
+        }
+
+        return consumer;
+    }
+
     private static final List<String> CLIENT_IDS = new ArrayList<>();
     public static class DeserializerForClientId implements 
Deserializer<byte[]> {
         @Override

Reply via email to