[ 
https://issues.apache.org/jira/browse/KAFKA-6979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510380#comment-16510380
 ] 

ASF GitHub Bot commented on KAFKA-6979:
---------------------------------------

hachikuji closed pull request #5122: KAFKA-6979: Add default.api.timeout.ms to 
KafkaConsumer (KIP-266)
URL: https://github.com/apache/kafka/pull/5122
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 72e496cbd46..bc9a716158e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -218,6 +218,10 @@
     public static final String REQUEST_TIMEOUT_MS_CONFIG = 
CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
     private static final String REQUEST_TIMEOUT_MS_DOC = 
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
 
+    /** <code>default.api.timeout.ms</code> */
+    public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = 
"default.api.timeout.ms";
+    public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the 
timeout (in milliseconds) for consumer APIs that could block. This 
configuration is used as the default timeout for all consumer operations that 
do not explicitly accept a <code>timeout</code> parameter.";
+
     /** <code>interceptor.classes</code> */
     public static final String INTERCEPTOR_CLASSES_CONFIG = 
"interceptor.classes";
     public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to 
use as interceptors. "
@@ -403,6 +407,12 @@
                                         atLeast(0),
                                         Importance.MEDIUM,
                                         REQUEST_TIMEOUT_MS_DOC)
+                                .define(DEFAULT_API_TIMEOUT_MS_CONFIG,
+                                        Type.INT,
+                                        60 * 1000,
+                                        atLeast(0),
+                                        Importance.MEDIUM,
+                                        DEFAULT_API_TIMEOUT_MS_DOC)
                                 /* default is set to be a bit lower than the 
server default (10 min), to avoid both client and server closing connection at 
same time */
                                 .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
                                         Type.LONG,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 5bd6b935b39..d6973c0a818 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -567,6 +567,7 @@
     private final Metadata metadata;
     private final long retryBackoffMs;
     private final long requestTimeoutMs;
+    private final int defaultApiTimeoutMs;
     private volatile boolean closed = false;
     private List<PartitionAssignor> assignors;
 
@@ -666,6 +667,7 @@ private KafkaConsumer(ConsumerConfig config,
 
             log.debug("Initializing the Kafka consumer");
             this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+            this.defaultApiTimeoutMs = 
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
             int sessionTimeOutMs = 
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
             int fetchMaxWaitMs = 
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
             if (this.requestTimeoutMs <= sessionTimeOutMs || 
this.requestTimeoutMs <= fetchMaxWaitMs)
@@ -814,6 +816,7 @@ private KafkaConsumer(ConsumerConfig config,
                   Metadata metadata,
                   long retryBackoffMs,
                   long requestTimeoutMs,
+                  int defaultApiTimeoutMs,
                   List<PartitionAssignor> assignors) {
         this.log = logContext.logger(getClass());
         this.clientId = clientId;
@@ -829,6 +832,7 @@ private KafkaConsumer(ConsumerConfig config,
         this.metadata = metadata;
         this.retryBackoffMs = retryBackoffMs;
         this.requestTimeoutMs = requestTimeoutMs;
+        this.defaultApiTimeoutMs = defaultApiTimeoutMs;
         this.assignors = assignors;
     }
 
@@ -1268,8 +1272,9 @@ private long remainingTimeAtLeastZero(final long 
timeoutMs, final long elapsedTi
      * every rebalance and also on startup. As such, if you need to store 
offsets in anything other than Kafka, this API
      * should not be used.
      * <p>
-     * This is a synchronous commits and will block until either the commit 
succeeds or an unrecoverable error is
-     * encountered (in which case it is thrown to the caller).
+     * This is a synchronous commit and will block until either the commit 
succeeds, an unrecoverable error is
+     * encountered (in which case it is thrown to the caller), or the timeout 
specified by {@code default.api.timeout.ms} expires
+     * (in which case a {@link 
org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
      * <p>
      * Note that asynchronous offset commits sent previously with the {@link 
#commitAsync(OffsetCommitCallback)}
      * (or similar) are guaranteed to have their callbacks invoked prior to 
completion of this method.
@@ -1286,10 +1291,12 @@ private long remainingTimeAtLeastZero(final long 
timeoutMs, final long elapsedTi
      *             configured groupId. See the exception for more details
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors (e.g. if offset metadata
      *             is too large or if the topic does not exist).
+     * @throws org.apache.kafka.common.errors.TimeoutException if the timeout 
specified by {@code default.api.timeout.ms} expires
+     *            before successful completion of the offset commit
      */
     @Override
     public void commitSync() {
-        commitSync(Duration.ofMillis(Long.MAX_VALUE));
+        commitSync(Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1343,7 +1350,8 @@ public void commitSync(Duration timeout) {
      * i.e. lastProcessedMessageOffset + 1.
      * <p>
      * This is a synchronous commits and will block until either the commit 
succeeds or an unrecoverable error is
-     * encountered (in which case it is thrown to the caller).
+     * encountered (in which case it is thrown to the caller), or the timeout 
specified by {@code default.api.timeout.ms} expires
+     * (in which case a {@link 
org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
      * <p>
      * Note that asynchronous offset commits sent previously with the {@link 
#commitAsync(OffsetCommitCallback)}
      * (or similar) are guaranteed to have their callbacks invoked prior to 
completion of this method.
@@ -1362,10 +1370,12 @@ public void commitSync(Duration timeout) {
      * @throws java.lang.IllegalArgumentException if the committed offset is 
negative
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors (e.g. if offset metadata
      *             is too large or if the topic does not exist).
+     * @throws org.apache.kafka.common.errors.TimeoutException if the timeout 
expires before successful completion
+     *            of the offset commit
      */
     @Override
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> 
offsets) {
-        commitSync(offsets, Duration.ofMillis(Long.MAX_VALUE));
+        commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1560,7 +1570,8 @@ public void seekToEnd(Collection<TopicPartition> 
partitions) {
      * This method may issue a remote call to the server if there is no 
current position for the given partition.
      * <p>
      * This call will block until either the position could be determined or 
an unrecoverable error is
-     * encountered (in which case it is thrown to the caller).
+     * encountered (in which case it is thrown to the caller), or the timeout 
specified by {@code default.api.timeout.ms} expires
+     * (in which case a {@link 
org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
      *
      * @param partition The partition to get the position for
      * @return The current position of the consumer (that is, the offset of 
the next record to be fetched)
@@ -1575,10 +1586,12 @@ public void seekToEnd(Collection<TopicPartition> 
partitions) {
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
      *             configured groupId. See the exception for more details
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
+     * @throws org.apache.kafka.common.errors.TimeoutException if the position 
cannot be determined before the
+     *             timeout specified by {@code default.api.timeout.ms} expires
      */
     @Override
     public long position(TopicPartition partition) {
-        return position(partition, Duration.ofMillis(Long.MAX_VALUE));
+        return position(partition, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1641,7 +1654,10 @@ public long position(TopicPartition partition, final 
Duration timeout) {
      * Get the last committed offset for the given partition (whether the 
commit happened by this process or
      * another). This offset will be used as the position for the consumer in 
the event of a failure.
      * <p>
-     * This call will block to do a remote call to get the latest committed 
offsets from the server.
+     * This call will do a remote call to get the latest committed offset from 
the server, and will block until the
+     * committed offset is gotten successfully, an unrecoverable error is 
encountered (in which case it is thrown to
+     * the caller), or the timeout specified by {@code default.api.timeout.ms} 
expires (in which case a
+     * {@link org.apache.kafka.common.errors.TimeoutException} is thrown to 
the caller).
      *
      * @param partition The partition to check
      * @return The last committed offset and metadata or null if there was no 
prior commit
@@ -1653,10 +1669,12 @@ public long position(TopicPartition partition, final 
Duration timeout) {
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
      *             configured groupId. See the exception for more details
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
+     * @throws org.apache.kafka.common.errors.TimeoutException if the 
committed offset cannot be found before
+     *             the timeout specified by {@code default.api.timeout.ms} 
expires.
      */
     @Override
     public OffsetAndMetadata committed(TopicPartition partition) {
-        return committed(partition, Duration.ofMillis(Long.MAX_VALUE));
+        return committed(partition, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1718,11 +1736,11 @@ public OffsetAndMetadata committed(TopicPartition 
partition, final Duration time
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the specified topic. See the exception for more details
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
-     *         the amount of time allocated by {@code request.timeout.ms} 
expires.
+     *         the amount of time allocated by {@code default.api.timeout.ms} 
expires.
      */
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
-        return partitionsFor(topic, Duration.ofMillis(requestTimeoutMs));
+        return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1774,11 +1792,11 @@ public OffsetAndMetadata committed(TopicPartition 
partition, final Duration time
      *             this function is called
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
-     *         the amount of time allocated by {@code request.timeout.ms} 
expires.
+     *         the amount of time allocated by {@code default.api.timeout.ms} 
expires.
      */
     @Override
     public Map<String, List<PartitionInfo>> listTopics() {
-        return listTopics(Duration.ofMillis(requestTimeoutMs));
+        return listTopics(Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1879,13 +1897,13 @@ public void resume(Collection<TopicPartition> 
partitions) {
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic(s). See the exception for more details
      * @throws IllegalArgumentException if the target timestamp is negative
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
-     *         the amount of time allocated by {@code request.timeout.ms} 
expires.
+     *         the amount of time allocated by {@code default.api.timeout.ms} 
expires.
      * @throws org.apache.kafka.common.errors.UnsupportedVersionException if 
the broker does not support looking up
      *         the offsets by timestamp
      */
     @Override
     public Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
-        return offsetsForTimes(timestampsToSearch, 
Duration.ofMillis(requestTimeoutMs));
+        return offsetsForTimes(timestampsToSearch, 
Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1939,11 +1957,11 @@ public void resume(Collection<TopicPartition> 
partitions) {
      * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic(s). See the exception for more details
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
-     *         expiration of the configured {@code request.timeout.ms}
+     *         expiration of the configured {@code default.api.timeout.ms}
      */
     @Override
     public Map<TopicPartition, Long> 
beginningOffsets(Collection<TopicPartition> partitions) {
-        return beginningOffsets(partitions, 
Duration.ofMillis(requestTimeoutMs));
+        return beginningOffsets(partitions, 
Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 4be688422ce..b8681e8f1bc 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1748,6 +1748,7 @@ private FetchResponse fetchResponse(TopicPartition 
partition, long fetchOffset,
         String metricGroupPrefix = "consumer";
         long retryBackoffMs = 100;
         long requestTimeoutMs = 30000;
+        int defaultApiTimeoutMs = 30000;
         boolean excludeInternalTopics = true;
         int minBytes = 1;
         int maxBytes = Integer.MAX_VALUE;
@@ -1825,6 +1826,7 @@ private FetchResponse fetchResponse(TopicPartition 
partition, long fetchOffset,
                 metadata,
                 retryBackoffMs,
                 requestTimeoutMs,
+                defaultApiTimeoutMs,
                 assignors);
     }
 
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 056fb8366e4..7b5449d1b75 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -91,10 +91,13 @@ <h5><a id="upgrade_200_notable" 
href="#upgrade_200_notable">Notable changes in 2
         
<code>internal.value.converter=org.apache.kafka.connect.json.JsonConverter</code>
         <code>internal.value.converter.schemas.enable=false</code>
     </li>
-    <li><a href="https://cwiki.apache.org/confluence/x/5kiHB";>KIP-266</a> adds 
overloads to the consumer to support
-        timeout behavior for blocking APIs. In particular, a new 
<code>poll(Duration)</code> API has been added which
-        does not block for dynamic partition assignment. The old 
<code>poll(long)</code> API has been deprecated and
-        will be removed in a future version.</li>
+    <li><a href="https://cwiki.apache.org/confluence/x/5kiHB";>KIP-266</a> adds 
a new consumer configuration <code>default.api.timeout.ms</code>
+        to specify the default timeout to use for <code>KafkaConsumer</code> 
APIs that could block. The KIP also adds overloads for such blocking
+        APIs to support specifying a specific timeout to use for each of them 
instead of using the default timeout set by <code>default.api.timeout.ms</code>.
+        In particular, a new <code>poll(Duration)</code> API has been added 
which does not block for dynamic partition assignment.
+        The old <code>poll(long)</code> API has been deprecated and will be 
removed in a future version. Overloads have also been added
+        for other <code>KafkaConsumer</code> methods like 
<code>partitionsFor</code>, <code>listTopics</code>, 
<code>offsetsForTimes</code>,
+        <code>beginningOffsets</code>, <code>endOffsets</code> and 
<code>close</code> that take in a <code>Duration</code>.</li>
     <li>The internal method 
<code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. 
Users are encouraged to migrate to 
<code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li>
     <li>The tool kafka.tools.ReplayLogProducer has been removed.</li>
 </ul>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add max.block.ms to consumer for default timeout behavior
> ---------------------------------------------------------
>
>                 Key: KAFKA-6979
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6979
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>            Reporter: Jason Gustafson
>            Assignee: Dhruvil Shah
>            Priority: Blocker
>             Fix For: 2.0.0
>
>
> Implement max.block.ms as described in KIP-266: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to