[ https://issues.apache.org/jira/browse/KAFKA-6608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495301#comment-16495301 ]
ASF GitHub Bot commented on KAFKA-6608: --------------------------------------- hachikuji closed pull request #5014: [KAFKA-6608] Add timeout parameter to methods which retrieves offsets URL: https://github.com/apache/kafka/pull/5014 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index acb53e11088..2e8ad2cf717 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -97,6 +97,10 @@ */ void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets); + /** + * @see KafkaConsumer#commitSync(Map, Duration) + */ + void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout); /** * @see KafkaConsumer#commitAsync() */ @@ -131,12 +135,22 @@ * @see KafkaConsumer#position(TopicPartition) */ long position(TopicPartition partition); + + /** + * @see KafkaConsumer#position(TopicPartition, Duration) + */ + long position(TopicPartition partition, final Duration timeout); /** * @see KafkaConsumer#committed(TopicPartition) */ OffsetAndMetadata committed(TopicPartition partition); + /** + * @see KafkaConsumer#committed(TopicPartition, Duration) + */ + OffsetAndMetadata committed(TopicPartition partition, final Duration timeout); + /** * @see KafkaConsumer#metrics() */ @@ -147,11 +161,21 @@ */ List<PartitionInfo> partitionsFor(String topic); + /** + * @see KafkaConsumer#partitionsFor(String, Duration) + */ + List<PartitionInfo> partitionsFor(String topic, Duration timeout); + /** * @see KafkaConsumer#listTopics() */ Map<String, List<PartitionInfo>> listTopics(); + /** + * @see KafkaConsumer#listTopics(Duration) + */ + Map<String, List<PartitionInfo>> listTopics(Duration timeout); + /** * @see KafkaConsumer#paused() */ @@ -168,20 +192,35 @@ void resume(Collection<TopicPartition> partitions); /** - * @see KafkaConsumer#offsetsForTimes(java.util.Map) + * @see KafkaConsumer#offsetsForTimes(Map) */ Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch); /** - * @see KafkaConsumer#beginningOffsets(java.util.Collection) + * @see KafkaConsumer#offsetsForTimes(Map, Duration) + */ + Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout); + + /** + * @see KafkaConsumer#beginningOffsets(Collection) */ Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions); /** - * @see KafkaConsumer#endOffsets(java.util.Collection) + * @see KafkaConsumer#beginningOffsets(Collection, Duration) + */ + Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout); + + /** + * @see KafkaConsumer#endOffsets(Collection) */ Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions); + /** + * @see KafkaConsumer#endOffsets(Collection, Duration) + */ + Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeoutMs); + /** * @see KafkaConsumer#close() */ @@ -190,8 +229,14 @@ /** * @see KafkaConsumer#close(long, TimeUnit) */ + @Deprecated void close(long timeout, TimeUnit unit); + /** + * @see KafkaConsumer#close(Duration) + */ + void close(Duration timeout); + /** * @see KafkaConsumer#wakeup() */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 602c9d76d5e..feaadd164ce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; @@ -1321,16 +1322,55 @@ public void commitSync() { */ @Override public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) { + commitSync(offsets, Duration.ofMillis(Long.MAX_VALUE)); + } + + /** + * Commit the specified offsets for the specified list of topics and partitions. + * <p> + * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every + * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API + * should not be used. The committed offset should be the next message your application will consume, + * i.e. lastProcessedMessageOffset + 1. + * <p> + * This is a synchronous commits and will block until either the commit succeeds, an unrecoverable error is + * encountered (in which case it is thrown to the caller), or the timeout expires. + * <p> + * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)} + * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method. + * + * @param offsets A map of offsets by partition with associated metadata + * @param timeout The maximum amount of time to await completion of the offset commit + * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried. + * This can only occur if you are using automatic group management with {@link #subscribe(Collection)}, + * or if there is an active group with the same groupId which is using group management. + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the + * configured groupId. See the exception for more details + * @throws java.lang.IllegalArgumentException if the committed offset is negative + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata + * is too large or if the topic does not exist). + * @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion + * of the offset commit + */ + @Override + public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) { acquireAndEnsureOpen(); try { - coordinator.commitOffsetsSync(new HashMap<>(offsets), Long.MAX_VALUE); + if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), timeout.toMillis())) { + throw new TimeoutException("Committing offsets synchronously took too long."); + } } finally { release(); } } /** - * Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and partition. + * Commit offsets returned on the last {@link #poll(Duration)} for all the subscribed list of topics and partition. * Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)} */ @Override @@ -1426,6 +1466,7 @@ public void seek(TopicPartition partition, long offset) { * @throws IllegalArgumentException if {@code partitions} is {@code null} * @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer */ + @Override public void seekToBeginning(Collection<TopicPartition> partitions) { if (partitions == null) throw new IllegalArgumentException("Partitions collection cannot be null"); @@ -1453,6 +1494,7 @@ public void seekToBeginning(Collection<TopicPartition> partitions) { * @throws IllegalArgumentException if {@code partitions} is {@code null} * @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer */ + @Override public void seekToEnd(Collection<TopicPartition> partitions) { if (partitions == null) throw new IllegalArgumentException("Partitions collection cannot be null"); @@ -1490,20 +1532,59 @@ public void seekToEnd(Collection<TopicPartition> partitions) { * configured groupId. See the exception for more details * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors */ + @Override public long position(TopicPartition partition) { + return position(partition, Duration.ofMillis(Long.MAX_VALUE)); + } + + /** + * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists). + * This method may issue a remote call to the server if there is no current position + * for the given partition. + * <p> + * This call will block until the position can be determined, an unrecoverable error is + * encountered (in which case it is thrown to the caller), or the timeout expires. + * + * @param partition The partition to get the position for + * @param timeout The maximum amount of time to await determination of the current position + * @return The current position of the consumer (that is, the offset of the next record to be fetched) + * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer + * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for + * the partition + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.TimeoutException if the position cannot be determined before the + * passed timeout expires + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the + * configured groupId. See the exception for more details + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + */ + @Override + public long position(TopicPartition partition, final Duration timeout) { + final long timeoutMs = timeout.toMillis(); acquireAndEnsureOpen(); try { if (!this.subscriptions.isAssigned(partition)) throw new IllegalStateException("You can only check the position for partitions assigned to this consumer."); Long offset = this.subscriptions.position(partition); - while (offset == null) { + final long startMs = time.milliseconds(); + long finishMs = startMs; + + while (offset == null && finishMs - startMs < timeoutMs) { // batch update fetch positions for any partitions without a valid position - while (!updateFetchPositions(Long.MAX_VALUE)) { - log.warn("Still updating fetch positions"); + if (!updateFetchPositions(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startMs))) { + break; } - client.poll(retryBackoffMs); + finishMs = time.milliseconds(); + + client.poll(remainingTimeAtLeastZero(timeoutMs, finishMs - startMs)); offset = this.subscriptions.position(partition); + finishMs = time.milliseconds(); } + if (offset == null) throw new TimeoutException("request timed out, position is unable to be acquired."); return offset; } finally { release(); @@ -1529,14 +1610,37 @@ public long position(TopicPartition partition) { */ @Override public OffsetAndMetadata committed(TopicPartition partition) { + return committed(partition, Duration.ofMillis(Long.MAX_VALUE)); + } + + /** + * Get the last committed offset for the given partition (whether the commit happened by this process or + * another). This offset will be used as the position for the consumer in the event of a failure. + * <p> + * This call will block to do a remote call to get the latest committed offsets from the server. + * + * @param partition The partition to check + * @param timeout The maximum amount of time to await the current committed offset + * @return The last committed offset and metadata or null if there was no prior commit + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the + * configured groupId. See the exception for more details + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before + * expiration of the timeout + */ + @Override + public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) { acquireAndEnsureOpen(); try { - Map<TopicPartition, OffsetAndMetadata> offsets = null; - while (offsets == null) { - offsets = coordinator.fetchCommittedOffsets( - Collections.singleton(partition), - Long.MAX_VALUE - ); + Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets( + Collections.singleton(partition), timeout.toMillis()); + if (offsets == null) { + throw new TimeoutException("Unable to find committed offsets for partition within set duration."); } return offsets.get(partition); } finally { @@ -1565,13 +1669,38 @@ public OffsetAndMetadata committed(TopicPartition partition) { * this function is called * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic. See the exception for more details - * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before - * expiration of the configured request timeout * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before + * the amount of time allocated by {@code request.timeout.ms} expires. */ @Override public List<PartitionInfo> partitionsFor(String topic) { + return partitionsFor(topic, Duration.ofMillis(requestTimeoutMs)); + } + + /** + * Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it + * does not already have any metadata about the given topic. + * + * @param topic The topic to get partition metadata for + * @param timeout The maximum of time to await topic metadata + * + * @return The list of partitions + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic. See + * the exception for more details + * @throws org.apache.kafka.common.errors.TimeoutException if topic metadata cannot be fetched before expiration + * of the passed timeout + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + */ + @Override + public List<PartitionInfo> partitionsFor(String topic, Duration timeout) { acquireAndEnsureOpen(); + long timeoutMs = timeout.toMillis(); try { Cluster cluster = this.metadata.fetch(); List<PartitionInfo> parts = cluster.partitionsForTopic(topic); @@ -1579,7 +1708,7 @@ public OffsetAndMetadata committed(TopicPartition partition) { return parts; Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata( - new MetadataRequest.Builder(Collections.singletonList(topic), true), requestTimeoutMs); + new MetadataRequest.Builder(Collections.singletonList(topic), true), timeoutMs); return topicMetadata.get(topic); } finally { release(); @@ -1596,15 +1725,35 @@ public OffsetAndMetadata committed(TopicPartition partition) { * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called - * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before - * expiration of the configured request timeout * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before + * the amount of time allocated by {@code request.timeout.ms} expires. */ @Override public Map<String, List<PartitionInfo>> listTopics() { + return listTopics(Duration.ofMillis(requestTimeoutMs)); + } + + /** + * Get metadata about partitions for all topics that the user is authorized to view. This method will issue a + * remote call to the server. + * + * @param timeout The maximum time this operation will block to fetch topic metadata + * + * @return The map of topics and its partitions + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before + * expiration of the passed timeout + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + */ + @Override + public Map<String, List<PartitionInfo>> listTopics(Duration timeout) { acquireAndEnsureOpen(); try { - return fetcher.getAllTopicMetadata(requestTimeoutMs); + return fetcher.getAllTopicMetadata(timeout.toMillis()); } finally { release(); } @@ -1683,12 +1832,39 @@ public void resume(Collection<TopicPartition> partitions) { * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details * @throws IllegalArgumentException if the target timestamp is negative * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before - * expiration of the configured {@code request.timeout.ms} + * the amount of time allocated by {@code request.timeout.ms} expires. * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up * the offsets by timestamp */ @Override public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) { + return offsetsForTimes(timestampsToSearch, Duration.ofMillis(requestTimeoutMs)); + } + + /** + * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the + * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. + * + * This is a blocking call. The consumer does not have to be assigned the partitions. + * If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null + * will be returned for that partition. + * + * @param timestampsToSearch the mapping from partition to the timestamp to look up. + * @param timeout The maximum amount of time to await retrieval of the offsets + * + * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater + * than or equal to the target timestamp. {@code null} will be returned for the partition if there is no + * such message. + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details + * @throws IllegalArgumentException if the target timestamp is negative + * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before + * expiration of the passed timeout + * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up + * the offsets by timestamp + */ + @Override + public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) { acquireAndEnsureOpen(); try { for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) { @@ -1698,7 +1874,7 @@ public void resume(Collection<TopicPartition> partitions) { throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " + entry.getValue() + ". The target time cannot be negative."); } - return fetcher.offsetsByTimes(timestampsToSearch, requestTimeoutMs); + return fetcher.offsetsByTimes(timestampsToSearch, timeout.toMillis()); } finally { release(); } @@ -1715,14 +1891,35 @@ public void resume(Collection<TopicPartition> partitions) { * @return The earliest available offsets for the given partitions * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details - * @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before + * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before * expiration of the configured {@code request.timeout.ms} */ @Override public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) { + return beginningOffsets(partitions, Duration.ofMillis(requestTimeoutMs)); + } + + /** + * Get the first offset for the given partitions. + * <p> + * This method does not change the current consumer position of the partitions. + * + * @see #seekToBeginning(Collection) + * + * @param partitions the partitions to get the earliest offsets + * @param timeout The maximum amount of time to await retrieval of the beginning offsets + * + * @return The earliest available offsets for the given partitions + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details + * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before + * expiration of the passed timeout + */ + @Override + public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) { acquireAndEnsureOpen(); try { - return fetcher.beginningOffsets(partitions, requestTimeoutMs); + return fetcher.beginningOffsets(partitions, timeout.toMillis()); } finally { release(); } @@ -1744,14 +1941,40 @@ public void resume(Collection<TopicPartition> partitions) { * @return The end offsets for the given partitions. * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details - * @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before - * expiration of the configured {@code request.timeout.ms} + * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before + * the amount of time allocated by {@code request.timeout.ms} expires */ @Override public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) { + return endOffsets(partitions, Duration.ofMillis(requestTimeoutMs)); + } + + /** + * Get the end offsets for the given partitions. In the default {@code read_uncommitted} isolation level, the end + * offset is the high watermark (that is, the offset of the last successfully replicated message plus one). For + * {@code read_committed} consumers, the end offset is the last stable offset (LSO), which is the minimum of + * the high watermark and the smallest offset of any open transaction. Finally, if the partition has never been + * written to, the end offset is 0. + * + * <p> + * This method does not change the current consumer position of the partitions. + * + * @see #seekToEnd(Collection) + * + * @param partitions the partitions to get the end offsets. + * @param timeout The maximum amount of time to await retrieval of the end offsets + * + * @return The end offsets for the given partitions. + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details + * @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before + * expiration of the passed timeout + */ + @Override + public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) { acquireAndEnsureOpen(); try { - return fetcher.endOffsets(partitions, requestTimeoutMs); + return fetcher.endOffsets(partitions, timeout.toMillis()); } finally { release(); } @@ -1760,7 +1983,7 @@ public void resume(Collection<TopicPartition> partitions) { /** * Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup. * If auto-commit is enabled, this will commit the current offsets if possible within the default - * timeout. See {@link #close(long, TimeUnit)} for details. Note that {@link #wakeup()} + * timeout. See {@link #close(Duration)} for details. Note that {@link #wakeup()} * cannot be used to interrupt close. * * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted @@ -1769,7 +1992,7 @@ public void resume(Collection<TopicPartition> partitions) { */ @Override public void close() { - close(DEFAULT_CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)); } /** @@ -1786,15 +2009,39 @@ public void close() { * @throws IllegalArgumentException If the {@code timeout} is negative. * @throws InterruptException If the thread is interrupted before or while this function is called * @throws org.apache.kafka.common.KafkaException for any other error during close + * + * @deprecated Since 2.0. Use {@link #close(Duration)} or {@link #close()}. */ + @Deprecated + @Override public void close(long timeout, TimeUnit timeUnit) { - if (timeout < 0) + close(Duration.ofMillis(TimeUnit.MILLISECONDS.toMillis(timeout))); + } + + /** + * Tries to close the consumer cleanly within the specified timeout. This method waits up to + * {@code timeout} for the consumer to complete pending commits and leave the group. + * If auto-commit is enabled, this will commit the current offsets if possible within the + * timeout. If the consumer is unable to complete offset commits and gracefully leave the group + * before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be + * used to interrupt close. + * + * @param timeout The maximum time to wait for consumer to close gracefully. The value must be + * non-negative. Specifying a timeout of zero means do not wait for pending requests to complete. + * + * @throws IllegalArgumentException If the {@code timeout} is negative. + * @throws InterruptException If the thread is interrupted before or while this function is called + * @throws org.apache.kafka.common.KafkaException for any other error during close + */ + @Override + public void close(Duration timeout) { + if (timeout.toMillis() < 0) throw new IllegalArgumentException("The timeout cannot be negative."); acquire(); try { if (!closed) { closed = true; - close(timeUnit.toMillis(timeout), false); + close(timeout.toMillis(), false); } } finally { release(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 479a9ffaaf0..3502156d4d8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -251,6 +251,11 @@ public synchronized void commitSync() { commitSync(this.subscriptions.allConsumed()); } + @Override + public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) { + commitSync(offsets); + } + @Override public synchronized void seek(TopicPartition partition, long offset) { ensureNotClosed(); @@ -266,6 +271,11 @@ public synchronized OffsetAndMetadata committed(TopicPartition partition) { return new OffsetAndMetadata(0); } + @Override + public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) { + return committed(partition); + } + @Override public synchronized long position(TopicPartition partition) { ensureNotClosed(); @@ -279,6 +289,11 @@ public synchronized long position(TopicPartition partition) { return offset; } + @Override + public synchronized long position(TopicPartition partition, final Duration timeout) { + return position(partition); + } + @Override public synchronized void seekToBeginning(Collection<TopicPartition> partitions) { ensureNotClosed(); @@ -470,4 +485,35 @@ private Long getEndOffset(List<Long> offsets) { } return offsets.size() > 1 ? offsets.remove(0) : offsets.get(0); } + + @Override + public List<PartitionInfo> partitionsFor(String topic, Duration timeout) { + return partitionsFor(topic); + } + + @Override + public Map<String, List<PartitionInfo>> listTopics(Duration timeout) { + return listTopics(); + } + + @Override + public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, + Duration timeout) { + return offsetsForTimes(timestampsToSearch); + } + + @Override + public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) { + return beginningOffsets(partitions); + } + + @Override + public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration duration) { + return endOffsets(partitions); + } + + @Override + public void close(Duration timeout) { + close(); + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add TimeoutException to KafkaConsumer#position() > ------------------------------------------------ > > Key: KAFKA-6608 > URL: https://issues.apache.org/jira/browse/KAFKA-6608 > Project: Kafka > Issue Type: Improvement > Components: consumer > Reporter: Richard Yu > Assignee: Richard Yu > Priority: Blocker > Labels: kip > > In KAFKA-4879, Kafka Consumer hangs indefinitely due to Fetcher's {{timeout}} > being set to {{Long.MAX_VALUE}}. While fixing this issue, it was pointed out > that if a timeout was added to methods which commits offsets synchronously, a > stricter control on time could be achieved. > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886 -- This message was sent by Atlassian JIRA (v7.6.3#76005)