[ 
https://issues.apache.org/jira/browse/KAFKA-6608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495301#comment-16495301
 ] 

ASF GitHub Bot commented on KAFKA-6608:
---------------------------------------

hachikuji closed pull request #5014: [KAFKA-6608] Add timeout parameter to 
methods which retrieves offsets
URL: https://github.com/apache/kafka/pull/5014
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index acb53e11088..2e8ad2cf717 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -97,6 +97,10 @@
      */
     void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
 
+    /**
+     * @see KafkaConsumer#commitSync(Map, Duration)
+     */
+    void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, 
final Duration timeout);
     /**
      * @see KafkaConsumer#commitAsync()
      */
@@ -131,12 +135,22 @@
      * @see KafkaConsumer#position(TopicPartition)
      */
     long position(TopicPartition partition);
+    
+    /**
+     * @see KafkaConsumer#position(TopicPartition, Duration)
+     */
+    long position(TopicPartition partition, final Duration timeout);
 
     /**
      * @see KafkaConsumer#committed(TopicPartition)
      */
     OffsetAndMetadata committed(TopicPartition partition);
 
+    /**
+     * @see KafkaConsumer#committed(TopicPartition, Duration)
+     */
+    OffsetAndMetadata committed(TopicPartition partition, final Duration 
timeout);
+
     /**
      * @see KafkaConsumer#metrics()
      */
@@ -147,11 +161,21 @@
      */
     List<PartitionInfo> partitionsFor(String topic);
 
+    /**
+     * @see KafkaConsumer#partitionsFor(String, Duration)
+     */
+    List<PartitionInfo> partitionsFor(String topic, Duration timeout);
+
     /**
      * @see KafkaConsumer#listTopics()
      */
     Map<String, List<PartitionInfo>> listTopics();
 
+    /**
+     * @see KafkaConsumer#listTopics(Duration)
+     */
+    Map<String, List<PartitionInfo>> listTopics(Duration timeout);
+
     /**
      * @see KafkaConsumer#paused()
      */
@@ -168,20 +192,35 @@
     void resume(Collection<TopicPartition> partitions);
 
     /**
-     * @see KafkaConsumer#offsetsForTimes(java.util.Map)
+     * @see KafkaConsumer#offsetsForTimes(Map)
      */
     Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);
 
     /**
-     * @see KafkaConsumer#beginningOffsets(java.util.Collection)
+     * @see KafkaConsumer#offsetsForTimes(Map, Duration)
+     */
+    Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout);
+
+    /**
+     * @see KafkaConsumer#beginningOffsets(Collection)
      */
     Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> 
partitions);
 
     /**
-     * @see KafkaConsumer#endOffsets(java.util.Collection)
+     * @see KafkaConsumer#beginningOffsets(Collection, Duration)
+     */
+    Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> 
partitions, Duration timeout);
+
+    /**
+     * @see KafkaConsumer#endOffsets(Collection)
      */
     Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> 
partitions);
 
+    /**
+     * @see KafkaConsumer#endOffsets(Collection, Duration)
+     */
+    Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> 
partitions, Duration timeoutMs);
+
     /**
      * @see KafkaConsumer#close()
      */
@@ -190,8 +229,14 @@
     /**
      * @see KafkaConsumer#close(long, TimeUnit)
      */
+    @Deprecated
     void close(long timeout, TimeUnit unit);
 
+    /**
+     * @see KafkaConsumer#close(Duration)
+     */
+    void close(Duration timeout);
+
     /**
      * @see KafkaConsumer#wakeup()
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 602c9d76d5e..feaadd164ce 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -36,6 +36,7 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -1321,16 +1322,55 @@ public void commitSync() {
      */
     @Override
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> 
offsets) {
+        commitSync(offsets, Duration.ofMillis(Long.MAX_VALUE));
+    }
+
+    /**
+    * Commit the specified offsets for the specified list of topics and 
partitions.
+    * <p>
+    * This commits offsets to Kafka. The offsets committed using this API will 
be used on the first fetch after every
+    * rebalance and also on startup. As such, if you need to store offsets in 
anything other than Kafka, this API
+    * should not be used. The committed offset should be the next message your 
application will consume,
+    * i.e. lastProcessedMessageOffset + 1.
+    * <p>
+    * This is a synchronous commits and will block until either the commit 
succeeds, an unrecoverable error is
+    * encountered (in which case it is thrown to the caller), or the timeout 
expires.
+    * <p>
+    * Note that asynchronous offset commits sent previously with the {@link 
#commitAsync(OffsetCommitCallback)}
+    * (or similar) are guaranteed to have their callbacks invoked prior to 
completion of this method.
+    *
+    * @param offsets A map of offsets by partition with associated metadata
+    * @param timeout The maximum amount of time to await completion of the 
offset commit
+    * @throws org.apache.kafka.clients.consumer.CommitFailedException if the 
commit failed and cannot be retried.
+    *             This can only occur if you are using automatic group 
management with {@link #subscribe(Collection)},
+    *             or if there is an active group with the same groupId which 
is using group management.
+    * @throws org.apache.kafka.common.errors.WakeupException if {@link 
#wakeup()} is called before or while this
+    *             function is called
+    * @throws org.apache.kafka.common.errors.InterruptException if the calling 
thread is interrupted before or while
+    *             this function is called
+    * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+    * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
+    *             configured groupId. See the exception for more details
+    * @throws java.lang.IllegalArgumentException if the committed offset is 
negative
+    * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors (e.g. if offset metadata
+    *             is too large or if the topic does not exist).
+    * @throws org.apache.kafka.common.errors.TimeoutException if the timeout 
expires before successful completion
+     *            of the offset commit
+    */
+    @Override
+    public void commitSync(final Map<TopicPartition, OffsetAndMetadata> 
offsets, final Duration timeout) {
         acquireAndEnsureOpen();
         try {
-            coordinator.commitOffsetsSync(new HashMap<>(offsets), 
Long.MAX_VALUE);
+            if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), 
timeout.toMillis())) {
+                throw new TimeoutException("Committing offsets synchronously 
took too long.");
+            }
         } finally {
             release();
         }
     }
 
     /**
-     * Commit offsets returned on the last {@link #poll(Duration) poll()} for 
all the subscribed list of topics and partition.
+     * Commit offsets returned on the last {@link #poll(Duration)} for all the 
subscribed list of topics and partition.
      * Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
      */
     @Override
@@ -1426,6 +1466,7 @@ public void seek(TopicPartition partition, long offset) {
      * @throws IllegalArgumentException if {@code partitions} is {@code null}
      * @throws IllegalStateException if any of the provided partitions are not 
currently assigned to this consumer
      */
+    @Override
     public void seekToBeginning(Collection<TopicPartition> partitions) {
         if (partitions == null)
             throw new IllegalArgumentException("Partitions collection cannot 
be null");
@@ -1453,6 +1494,7 @@ public void seekToBeginning(Collection<TopicPartition> 
partitions) {
      * @throws IllegalArgumentException if {@code partitions} is {@code null}
      * @throws IllegalStateException if any of the provided partitions are not 
currently assigned to this consumer
      */
+    @Override
     public void seekToEnd(Collection<TopicPartition> partitions) {
         if (partitions == null)
             throw new IllegalArgumentException("Partitions collection cannot 
be null");
@@ -1490,20 +1532,59 @@ public void seekToEnd(Collection<TopicPartition> 
partitions) {
      *             configured groupId. See the exception for more details
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
      */
+    @Override
     public long position(TopicPartition partition) {
+        return position(partition, Duration.ofMillis(Long.MAX_VALUE));
+    }
+
+    /**
+     * Get the offset of the <i>next record</i> that will be fetched (if a 
record with that offset exists).
+     * This method may issue a remote call to the server if there is no 
current position 
+     * for the given partition.
+     * <p>
+     * This call will block until the position can be determined, an 
unrecoverable error is
+     * encountered (in which case it is thrown to the caller), or the timeout 
expires.
+     *
+     * @param partition The partition to get the position for
+     * @param timeout The maximum amount of time to await determination of the 
current position
+     * @return The current position of the consumer (that is, the offset of 
the next record to be fetched)
+     * @throws IllegalArgumentException if the provided TopicPartition is not 
assigned to this consumer
+     * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no 
offset is currently defined for
+     *             the partition
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link 
#wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
+     *             this function is called
+     * @throws org.apache.kafka.common.errors.TimeoutException if the position 
cannot be determined before the
+     *             passed timeout expires
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
+     *             configured groupId. See the exception for more details
+     * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
+     */
+    @Override
+    public long position(TopicPartition partition, final Duration timeout) {
+        final long timeoutMs = timeout.toMillis();
         acquireAndEnsureOpen();
         try {
             if (!this.subscriptions.isAssigned(partition))
                 throw new IllegalStateException("You can only check the 
position for partitions assigned to this consumer.");
             Long offset = this.subscriptions.position(partition);
-            while (offset == null) {
+            final long startMs = time.milliseconds();
+            long finishMs = startMs;
+
+            while (offset == null && finishMs - startMs < timeoutMs) {
                 // batch update fetch positions for any partitions without a 
valid position
-                while (!updateFetchPositions(Long.MAX_VALUE)) {
-                    log.warn("Still updating fetch positions");
+                if (!updateFetchPositions(remainingTimeAtLeastZero(timeoutMs, 
time.milliseconds() - startMs))) {
+                    break;
                 }
-                client.poll(retryBackoffMs);
+                finishMs = time.milliseconds();
+
+                client.poll(remainingTimeAtLeastZero(timeoutMs, finishMs - 
startMs));
                 offset = this.subscriptions.position(partition);
+                finishMs = time.milliseconds();
             }
+            if (offset == null) throw new TimeoutException("request timed out, 
position is unable to be acquired.");
             return offset;
         } finally {
             release();
@@ -1529,14 +1610,37 @@ public long position(TopicPartition partition) {
      */
     @Override
     public OffsetAndMetadata committed(TopicPartition partition) {
+        return committed(partition, Duration.ofMillis(Long.MAX_VALUE));
+    }
+
+    /**
+     * Get the last committed offset for the given partition (whether the 
commit happened by this process or
+     * another). This offset will be used as the position for the consumer in 
the event of a failure.
+     * <p>
+     * This call will block to do a remote call to get the latest committed 
offsets from the server.
+     *
+     * @param partition The partition to check
+     * @param timeout  The maximum amount of time to await the current 
committed offset
+     * @return The last committed offset and metadata or null if there was no 
prior commit
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link 
#wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
+     *             this function is called
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
+     *             configured groupId. See the exception for more details
+     * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
+     * @throws org.apache.kafka.common.errors.TimeoutException if the 
committed offset cannot be found before
+     *             expiration of the timeout
+     */
+    @Override
+    public OffsetAndMetadata committed(TopicPartition partition, final 
Duration timeout) {
         acquireAndEnsureOpen();
         try {
-            Map<TopicPartition, OffsetAndMetadata> offsets = null;
-            while (offsets == null) {
-                offsets = coordinator.fetchCommittedOffsets(
-                    Collections.singleton(partition),
-                    Long.MAX_VALUE
-                );
+            Map<TopicPartition, OffsetAndMetadata> offsets = 
coordinator.fetchCommittedOffsets(
+                    Collections.singleton(partition), timeout.toMillis());
+            if (offsets == null) {
+                throw new TimeoutException("Unable to find committed offsets 
for partition within set duration.");
             }
             return offsets.get(partition);
         } finally {
@@ -1565,13 +1669,38 @@ public OffsetAndMetadata committed(TopicPartition 
partition) {
      *             this function is called
      * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the specified topic. See the exception for more details
-     * @throws org.apache.kafka.common.errors.TimeoutException if the topic 
metadata could not be fetched before
-     *             expiration of the configured request timeout
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
+     * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
+     *         the amount of time allocated by {@code request.timeout.ms} 
expires.
      */
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
+        return partitionsFor(topic, Duration.ofMillis(requestTimeoutMs));
+    }
+
+    /**
+     * Get metadata about the partitions for a given topic. This method will 
issue a remote call to the server if it
+     * does not already have any metadata about the given topic.
+     *
+     * @param topic The topic to get partition metadata for
+     * @param timeout The maximum of time to await topic metadata
+     *
+     * @return The list of partitions
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link 
#wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
+     *             this function is called
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the specified topic. See
+     *             the exception for more details
+     * @throws org.apache.kafka.common.errors.TimeoutException if topic 
metadata cannot be fetched before expiration
+     *             of the passed timeout
+     * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
+     */
+    @Override
+    public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
         acquireAndEnsureOpen();
+        long timeoutMs = timeout.toMillis();
         try {
             Cluster cluster = this.metadata.fetch();
             List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
@@ -1579,7 +1708,7 @@ public OffsetAndMetadata committed(TopicPartition 
partition) {
                 return parts;
 
             Map<String, List<PartitionInfo>> topicMetadata = 
fetcher.getTopicMetadata(
-                    new 
MetadataRequest.Builder(Collections.singletonList(topic), true), 
requestTimeoutMs);
+                    new 
MetadataRequest.Builder(Collections.singletonList(topic), true), timeoutMs);
             return topicMetadata.get(topic);
         } finally {
             release();
@@ -1596,15 +1725,35 @@ public OffsetAndMetadata committed(TopicPartition 
partition) {
      *             function is called
      * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
      *             this function is called
-     * @throws org.apache.kafka.common.errors.TimeoutException if the topic 
metadata could not be fetched before
-     *             expiration of the configured request timeout
      * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
+     * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
+     *         the amount of time allocated by {@code request.timeout.ms} 
expires.
      */
     @Override
     public Map<String, List<PartitionInfo>> listTopics() {
+        return listTopics(Duration.ofMillis(requestTimeoutMs));
+    }
+
+    /**
+     * Get metadata about partitions for all topics that the user is 
authorized to view. This method will issue a
+     * remote call to the server.
+     *
+     * @param timeout The maximum time this operation will block to fetch 
topic metadata
+     *
+     * @return The map of topics and its partitions
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link 
#wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
+     *             this function is called
+     * @throws org.apache.kafka.common.errors.TimeoutException if the topic 
metadata could not be fetched before
+     *             expiration of the passed timeout
+     * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
+     */
+    @Override
+    public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
         acquireAndEnsureOpen();
         try {
-            return fetcher.getAllTopicMetadata(requestTimeoutMs);
+            return fetcher.getAllTopicMetadata(timeout.toMillis());
         } finally {
             release();
         }
@@ -1683,12 +1832,39 @@ public void resume(Collection<TopicPartition> 
partitions) {
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic(s). See the exception for more details
      * @throws IllegalArgumentException if the target timestamp is negative
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
-     *         expiration of the configured {@code request.timeout.ms}
+     *         the amount of time allocated by {@code request.timeout.ms} 
expires.
      * @throws org.apache.kafka.common.errors.UnsupportedVersionException if 
the broker does not support looking up
      *         the offsets by timestamp
      */
     @Override
     public Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
+        return offsetsForTimes(timestampsToSearch, 
Duration.ofMillis(requestTimeoutMs));
+    }
+
+    /**
+     * Look up the offsets for the given partitions by timestamp. The returned 
offset for each partition is the
+     * earliest offset whose timestamp is greater than or equal to the given 
timestamp in the corresponding partition.
+     *
+     * This is a blocking call. The consumer does not have to be assigned the 
partitions.
+     * If the message format version in a partition is before 0.10.0, i.e. the 
messages do not have timestamps, null
+     * will be returned for that partition.
+     *
+     * @param timestampsToSearch the mapping from partition to the timestamp 
to look up.
+     * @param timeout The maximum amount of time to await retrieval of the 
offsets
+     *
+     * @return a mapping from partition to the timestamp and offset of the 
first message with timestamp greater
+     *         than or equal to the target timestamp. {@code null} will be 
returned for the partition if there is no
+     *         such message.
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic(s). See the exception for more details
+     * @throws IllegalArgumentException if the target timestamp is negative
+     * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
+     *         expiration of the passed timeout
+     * @throws org.apache.kafka.common.errors.UnsupportedVersionException if 
the broker does not support looking up
+     *         the offsets by timestamp
+     */
+    @Override
+    public Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) 
{
         acquireAndEnsureOpen();
         try {
             for (Map.Entry<TopicPartition, Long> entry : 
timestampsToSearch.entrySet()) {
@@ -1698,7 +1874,7 @@ public void resume(Collection<TopicPartition> partitions) 
{
                     throw new IllegalArgumentException("The target time for 
partition " + entry.getKey() + " is " +
                             entry.getValue() + ". The target time cannot be 
negative.");
             }
-            return fetcher.offsetsByTimes(timestampsToSearch, 
requestTimeoutMs);
+            return fetcher.offsetsByTimes(timestampsToSearch, 
timeout.toMillis());
         } finally {
             release();
         }
@@ -1715,14 +1891,35 @@ public void resume(Collection<TopicPartition> 
partitions) {
      * @return The earliest available offsets for the given partitions
      * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic(s). See the exception for more details
-     * @throws org.apache.kafka.common.errors.TimeoutException if the offsets 
could not be fetched before
+     * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
      *         expiration of the configured {@code request.timeout.ms}
      */
     @Override
     public Map<TopicPartition, Long> 
beginningOffsets(Collection<TopicPartition> partitions) {
+        return beginningOffsets(partitions, 
Duration.ofMillis(requestTimeoutMs));
+    }
+
+    /**
+     * Get the first offset for the given partitions.
+     * <p>
+     * This method does not change the current consumer position of the 
partitions.
+     *
+     * @see #seekToBeginning(Collection)
+     *
+     * @param partitions the partitions to get the earliest offsets
+     * @param timeout The maximum amount of time to await retrieval of the 
beginning offsets
+     *
+     * @return The earliest available offsets for the given partitions
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic(s). See the exception for more details
+     * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
+     *         expiration of the passed timeout
+     */
+    @Override
+    public Map<TopicPartition, Long> 
beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
         acquireAndEnsureOpen();
         try {
-            return fetcher.beginningOffsets(partitions, requestTimeoutMs);
+            return fetcher.beginningOffsets(partitions, timeout.toMillis());
         } finally {
             release();
         }
@@ -1744,14 +1941,40 @@ public void resume(Collection<TopicPartition> 
partitions) {
      * @return The end offsets for the given partitions.
      * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic(s). See the exception for more details
-     * @throws org.apache.kafka.common.errors.TimeoutException if the offsets 
could not be fetched before
-     *         expiration of the configured {@code request.timeout.ms}
+     * @throws org.apache.kafka.common.errors.TimeoutException if the offset 
metadata could not be fetched before
+     *         the amount of time allocated by {@code request.timeout.ms} 
expires
      */
     @Override
     public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> 
partitions) {
+        return endOffsets(partitions, Duration.ofMillis(requestTimeoutMs));
+    }
+
+    /**
+     * Get the end offsets for the given partitions. In the default {@code 
read_uncommitted} isolation level, the end
+     * offset is the high watermark (that is, the offset of the last 
successfully replicated message plus one). For
+     * {@code read_committed} consumers, the end offset is the last stable 
offset (LSO), which is the minimum of
+     * the high watermark and the smallest offset of any open transaction. 
Finally, if the partition has never been
+     * written to, the end offset is 0.
+     *
+     * <p>
+     * This method does not change the current consumer position of the 
partitions.
+     *
+     * @see #seekToEnd(Collection)
+     *
+     * @param partitions the partitions to get the end offsets.
+     * @param timeout The maximum amount of time to await retrieval of the end 
offsets
+     *
+     * @return The end offsets for the given partitions.
+     * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic(s). See the exception for more details
+     * @throws org.apache.kafka.common.errors.TimeoutException if the offsets 
could not be fetched before
+     *         expiration of the passed timeout
+     */
+    @Override
+    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> 
partitions, Duration timeout) {
         acquireAndEnsureOpen();
         try {
-            return fetcher.endOffsets(partitions, requestTimeoutMs);
+            return fetcher.endOffsets(partitions, timeout.toMillis());
         } finally {
             release();
         }
@@ -1760,7 +1983,7 @@ public void resume(Collection<TopicPartition> partitions) 
{
     /**
      * Close the consumer, waiting for up to the default timeout of 30 seconds 
for any needed cleanup.
      * If auto-commit is enabled, this will commit the current offsets if 
possible within the default
-     * timeout. See {@link #close(long, TimeUnit)} for details. Note that 
{@link #wakeup()}
+     * timeout. See {@link #close(Duration)} for details. Note that {@link 
#wakeup()}
      * cannot be used to interrupt close.
      *
      * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted
@@ -1769,7 +1992,7 @@ public void resume(Collection<TopicPartition> partitions) 
{
      */
     @Override
     public void close() {
-        close(DEFAULT_CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
     }
 
     /**
@@ -1786,15 +2009,39 @@ public void close() {
      * @throws IllegalArgumentException If the {@code timeout} is negative.
      * @throws InterruptException If the thread is interrupted before or while 
this function is called
      * @throws org.apache.kafka.common.KafkaException for any other error 
during close
+     *
+     * @deprecated Since 2.0. Use {@link #close(Duration)} or {@link #close()}.
      */
+    @Deprecated
+    @Override
     public void close(long timeout, TimeUnit timeUnit) {
-        if (timeout < 0)
+        close(Duration.ofMillis(TimeUnit.MILLISECONDS.toMillis(timeout)));
+    }
+
+    /**
+     * Tries to close the consumer cleanly within the specified timeout. This 
method waits up to
+     * {@code timeout} for the consumer to complete pending commits and leave 
the group.
+     * If auto-commit is enabled, this will commit the current offsets if 
possible within the
+     * timeout. If the consumer is unable to complete offset commits and 
gracefully leave the group
+     * before the timeout expires, the consumer is force closed. Note that 
{@link #wakeup()} cannot be
+     * used to interrupt close.
+     *
+     * @param timeout The maximum time to wait for consumer to close 
gracefully. The value must be
+     *                non-negative. Specifying a timeout of zero means do not 
wait for pending requests to complete.
+     *
+     * @throws IllegalArgumentException If the {@code timeout} is negative.
+     * @throws InterruptException If the thread is interrupted before or while 
this function is called
+     * @throws org.apache.kafka.common.KafkaException for any other error 
during close
+     */
+    @Override
+    public void close(Duration timeout) {
+        if (timeout.toMillis() < 0)
             throw new IllegalArgumentException("The timeout cannot be 
negative.");
         acquire();
         try {
             if (!closed) {
                 closed = true;
-                close(timeUnit.toMillis(timeout), false);
+                close(timeout.toMillis(), false);
             }
         } finally {
             release();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 479a9ffaaf0..3502156d4d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -251,6 +251,11 @@ public synchronized void commitSync() {
         commitSync(this.subscriptions.allConsumed());
     }
 
+    @Override
+    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, 
final Duration timeout) {
+        commitSync(offsets);
+    }
+
     @Override
     public synchronized void seek(TopicPartition partition, long offset) {
         ensureNotClosed();
@@ -266,6 +271,11 @@ public synchronized OffsetAndMetadata 
committed(TopicPartition partition) {
         return new OffsetAndMetadata(0);
     }
 
+    @Override
+    public OffsetAndMetadata committed(TopicPartition partition, final 
Duration timeout) {
+        return committed(partition);
+    }
+
     @Override
     public synchronized long position(TopicPartition partition) {
         ensureNotClosed();
@@ -279,6 +289,11 @@ public synchronized long position(TopicPartition 
partition) {
         return offset;
     }
 
+    @Override
+    public synchronized long position(TopicPartition partition, final Duration 
timeout) {
+        return position(partition);
+    }
+
     @Override
     public synchronized void seekToBeginning(Collection<TopicPartition> 
partitions) {
         ensureNotClosed();
@@ -470,4 +485,35 @@ private Long getEndOffset(List<Long> offsets) {
         }
         return offsets.size() > 1 ? offsets.remove(0) : offsets.get(0);
     }
+
+    @Override
+    public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
+        return partitionsFor(topic);
+    }
+
+    @Override
+    public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
+        return listTopics();
+    }
+
+    @Override
+    public Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch,
+            Duration timeout) {
+        return offsetsForTimes(timestampsToSearch);
+    }
+
+    @Override
+    public Map<TopicPartition, Long> 
beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
+        return beginningOffsets(partitions);
+    }
+
+    @Override
+    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> 
partitions, Duration duration) {
+        return endOffsets(partitions);
+    }
+
+    @Override
+    public void close(Duration timeout) {
+        close();
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add TimeoutException to KafkaConsumer#position()
> ------------------------------------------------
>
>                 Key: KAFKA-6608
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6608
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>            Reporter: Richard Yu
>            Assignee: Richard Yu
>            Priority: Blocker
>              Labels: kip
>
> In KAFKA-4879, Kafka Consumer hangs indefinitely due to Fetcher's {{timeout}} 
> being set to {{Long.MAX_VALUE}}. While fixing this issue, it was pointed out 
> that if a timeout was added to methods which commits offsets synchronously, a 
> stricter control on time could be achieved.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to