[
https://issues.apache.org/jira/browse/KAFKA-6608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495301#comment-16495301
]
ASF GitHub Bot commented on KAFKA-6608:
---------------------------------------
hachikuji closed pull request #5014: [KAFKA-6608] Add timeout parameter to
methods which retrieves offsets
URL: https://github.com/apache/kafka/pull/5014
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index acb53e11088..2e8ad2cf717 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -97,6 +97,10 @@
*/
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
+ /**
+ * @see KafkaConsumer#commitSync(Map, Duration)
+ */
+ void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets,
final Duration timeout);
/**
* @see KafkaConsumer#commitAsync()
*/
@@ -131,12 +135,22 @@
* @see KafkaConsumer#position(TopicPartition)
*/
long position(TopicPartition partition);
+
+ /**
+ * @see KafkaConsumer#position(TopicPartition, Duration)
+ */
+ long position(TopicPartition partition, final Duration timeout);
/**
* @see KafkaConsumer#committed(TopicPartition)
*/
OffsetAndMetadata committed(TopicPartition partition);
+ /**
+ * @see KafkaConsumer#committed(TopicPartition, Duration)
+ */
+ OffsetAndMetadata committed(TopicPartition partition, final Duration
timeout);
+
/**
* @see KafkaConsumer#metrics()
*/
@@ -147,11 +161,21 @@
*/
List<PartitionInfo> partitionsFor(String topic);
+ /**
+ * @see KafkaConsumer#partitionsFor(String, Duration)
+ */
+ List<PartitionInfo> partitionsFor(String topic, Duration timeout);
+
/**
* @see KafkaConsumer#listTopics()
*/
Map<String, List<PartitionInfo>> listTopics();
+ /**
+ * @see KafkaConsumer#listTopics(Duration)
+ */
+ Map<String, List<PartitionInfo>> listTopics(Duration timeout);
+
/**
* @see KafkaConsumer#paused()
*/
@@ -168,20 +192,35 @@
void resume(Collection<TopicPartition> partitions);
/**
- * @see KafkaConsumer#offsetsForTimes(java.util.Map)
+ * @see KafkaConsumer#offsetsForTimes(Map)
*/
Map<TopicPartition, OffsetAndTimestamp>
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);
/**
- * @see KafkaConsumer#beginningOffsets(java.util.Collection)
+ * @see KafkaConsumer#offsetsForTimes(Map, Duration)
+ */
+ Map<TopicPartition, OffsetAndTimestamp>
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout);
+
+ /**
+ * @see KafkaConsumer#beginningOffsets(Collection)
*/
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition>
partitions);
/**
- * @see KafkaConsumer#endOffsets(java.util.Collection)
+ * @see KafkaConsumer#beginningOffsets(Collection, Duration)
+ */
+ Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition>
partitions, Duration timeout);
+
+ /**
+ * @see KafkaConsumer#endOffsets(Collection)
*/
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition>
partitions);
+ /**
+ * @see KafkaConsumer#endOffsets(Collection, Duration)
+ */
+ Map<TopicPartition, Long> endOffsets(Collection<TopicPartition>
partitions, Duration timeoutMs);
+
/**
* @see KafkaConsumer#close()
*/
@@ -190,8 +229,14 @@
/**
* @see KafkaConsumer#close(long, TimeUnit)
*/
+ @Deprecated
void close(long timeout, TimeUnit unit);
+ /**
+ * @see KafkaConsumer#close(Duration)
+ */
+ void close(Duration timeout);
+
/**
* @see KafkaConsumer#wakeup()
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 602c9d76d5e..feaadd164ce 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -36,6 +36,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -1321,16 +1322,55 @@ public void commitSync() {
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata>
offsets) {
+ commitSync(offsets, Duration.ofMillis(Long.MAX_VALUE));
+ }
+
+ /**
+ * Commit the specified offsets for the specified list of topics and
partitions.
+ * <p>
+ * This commits offsets to Kafka. The offsets committed using this API will
be used on the first fetch after every
+ * rebalance and also on startup. As such, if you need to store offsets in
anything other than Kafka, this API
+ * should not be used. The committed offset should be the next message your
application will consume,
+ * i.e. lastProcessedMessageOffset + 1.
+ * <p>
+ * This is a synchronous commits and will block until either the commit
succeeds, an unrecoverable error is
+ * encountered (in which case it is thrown to the caller), or the timeout
expires.
+ * <p>
+ * Note that asynchronous offset commits sent previously with the {@link
#commitAsync(OffsetCommitCallback)}
+ * (or similar) are guaranteed to have their callbacks invoked prior to
completion of this method.
+ *
+ * @param offsets A map of offsets by partition with associated metadata
+ * @param timeout The maximum amount of time to await completion of the
offset commit
+ * @throws org.apache.kafka.clients.consumer.CommitFailedException if the
commit failed and cannot be retried.
+ * This can only occur if you are using automatic group
management with {@link #subscribe(Collection)},
+ * or if there is an active group with the same groupId which
is using group management.
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link
#wakeup()} is called before or while this
+ * function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the calling
thread is interrupted before or while
+ * this function is called
+ * @throws org.apache.kafka.common.errors.AuthenticationException if
authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not
authorized to the topic or to the
+ * configured groupId. See the exception for more details
+ * @throws java.lang.IllegalArgumentException if the committed offset is
negative
+ * @throws org.apache.kafka.common.KafkaException for any other
unrecoverable errors (e.g. if offset metadata
+ * is too large or if the topic does not exist).
+ * @throws org.apache.kafka.common.errors.TimeoutException if the timeout
expires before successful completion
+ * of the offset commit
+ */
+ @Override
+ public void commitSync(final Map<TopicPartition, OffsetAndMetadata>
offsets, final Duration timeout) {
acquireAndEnsureOpen();
try {
- coordinator.commitOffsetsSync(new HashMap<>(offsets),
Long.MAX_VALUE);
+ if (!coordinator.commitOffsetsSync(new HashMap<>(offsets),
timeout.toMillis())) {
+ throw new TimeoutException("Committing offsets synchronously
took too long.");
+ }
} finally {
release();
}
}
/**
- * Commit offsets returned on the last {@link #poll(Duration) poll()} for
all the subscribed list of topics and partition.
+ * Commit offsets returned on the last {@link #poll(Duration)} for all the
subscribed list of topics and partition.
* Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
*/
@Override
@@ -1426,6 +1466,7 @@ public void seek(TopicPartition partition, long offset) {
* @throws IllegalArgumentException if {@code partitions} is {@code null}
* @throws IllegalStateException if any of the provided partitions are not
currently assigned to this consumer
*/
+ @Override
public void seekToBeginning(Collection<TopicPartition> partitions) {
if (partitions == null)
throw new IllegalArgumentException("Partitions collection cannot
be null");
@@ -1453,6 +1494,7 @@ public void seekToBeginning(Collection<TopicPartition>
partitions) {
* @throws IllegalArgumentException if {@code partitions} is {@code null}
* @throws IllegalStateException if any of the provided partitions are not
currently assigned to this consumer
*/
+ @Override
public void seekToEnd(Collection<TopicPartition> partitions) {
if (partitions == null)
throw new IllegalArgumentException("Partitions collection cannot
be null");
@@ -1490,20 +1532,59 @@ public void seekToEnd(Collection<TopicPartition>
partitions) {
* configured groupId. See the exception for more details
* @throws org.apache.kafka.common.KafkaException for any other
unrecoverable errors
*/
+ @Override
public long position(TopicPartition partition) {
+ return position(partition, Duration.ofMillis(Long.MAX_VALUE));
+ }
+
+ /**
+ * Get the offset of the <i>next record</i> that will be fetched (if a
record with that offset exists).
+ * This method may issue a remote call to the server if there is no
current position
+ * for the given partition.
+ * <p>
+ * This call will block until the position can be determined, an
unrecoverable error is
+ * encountered (in which case it is thrown to the caller), or the timeout
expires.
+ *
+ * @param partition The partition to get the position for
+ * @param timeout The maximum amount of time to await determination of the
current position
+ * @return The current position of the consumer (that is, the offset of
the next record to be fetched)
+ * @throws IllegalArgumentException if the provided TopicPartition is not
assigned to this consumer
+ * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no
offset is currently defined for
+ * the partition
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link
#wakeup()} is called before or while this
+ * function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the
calling thread is interrupted before or while
+ * this function is called
+ * @throws org.apache.kafka.common.errors.TimeoutException if the position
cannot be determined before the
+ * passed timeout expires
+ * @throws org.apache.kafka.common.errors.AuthenticationException if
authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not
authorized to the topic or to the
+ * configured groupId. See the exception for more details
+ * @throws org.apache.kafka.common.KafkaException for any other
unrecoverable errors
+ */
+ @Override
+ public long position(TopicPartition partition, final Duration timeout) {
+ final long timeoutMs = timeout.toMillis();
acquireAndEnsureOpen();
try {
if (!this.subscriptions.isAssigned(partition))
throw new IllegalStateException("You can only check the
position for partitions assigned to this consumer.");
Long offset = this.subscriptions.position(partition);
- while (offset == null) {
+ final long startMs = time.milliseconds();
+ long finishMs = startMs;
+
+ while (offset == null && finishMs - startMs < timeoutMs) {
// batch update fetch positions for any partitions without a
valid position
- while (!updateFetchPositions(Long.MAX_VALUE)) {
- log.warn("Still updating fetch positions");
+ if (!updateFetchPositions(remainingTimeAtLeastZero(timeoutMs,
time.milliseconds() - startMs))) {
+ break;
}
- client.poll(retryBackoffMs);
+ finishMs = time.milliseconds();
+
+ client.poll(remainingTimeAtLeastZero(timeoutMs, finishMs -
startMs));
offset = this.subscriptions.position(partition);
+ finishMs = time.milliseconds();
}
+ if (offset == null) throw new TimeoutException("request timed out,
position is unable to be acquired.");
return offset;
} finally {
release();
@@ -1529,14 +1610,37 @@ public long position(TopicPartition partition) {
*/
@Override
public OffsetAndMetadata committed(TopicPartition partition) {
+ return committed(partition, Duration.ofMillis(Long.MAX_VALUE));
+ }
+
+ /**
+ * Get the last committed offset for the given partition (whether the
commit happened by this process or
+ * another). This offset will be used as the position for the consumer in
the event of a failure.
+ * <p>
+ * This call will block to do a remote call to get the latest committed
offsets from the server.
+ *
+ * @param partition The partition to check
+ * @param timeout The maximum amount of time to await the current
committed offset
+ * @return The last committed offset and metadata or null if there was no
prior commit
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link
#wakeup()} is called before or while this
+ * function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the
calling thread is interrupted before or while
+ * this function is called
+ * @throws org.apache.kafka.common.errors.AuthenticationException if
authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not
authorized to the topic or to the
+ * configured groupId. See the exception for more details
+ * @throws org.apache.kafka.common.KafkaException for any other
unrecoverable errors
+ * @throws org.apache.kafka.common.errors.TimeoutException if the
committed offset cannot be found before
+ * expiration of the timeout
+ */
+ @Override
+ public OffsetAndMetadata committed(TopicPartition partition, final
Duration timeout) {
acquireAndEnsureOpen();
try {
- Map<TopicPartition, OffsetAndMetadata> offsets = null;
- while (offsets == null) {
- offsets = coordinator.fetchCommittedOffsets(
- Collections.singleton(partition),
- Long.MAX_VALUE
- );
+ Map<TopicPartition, OffsetAndMetadata> offsets =
coordinator.fetchCommittedOffsets(
+ Collections.singleton(partition), timeout.toMillis());
+ if (offsets == null) {
+ throw new TimeoutException("Unable to find committed offsets
for partition within set duration.");
}
return offsets.get(partition);
} finally {
@@ -1565,13 +1669,38 @@ public OffsetAndMetadata committed(TopicPartition
partition) {
* this function is called
* @throws org.apache.kafka.common.errors.AuthenticationException if
authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not
authorized to the specified topic. See the exception for more details
- * @throws org.apache.kafka.common.errors.TimeoutException if the topic
metadata could not be fetched before
- * expiration of the configured request timeout
* @throws org.apache.kafka.common.KafkaException for any other
unrecoverable errors
+ * @throws org.apache.kafka.common.errors.TimeoutException if the offset
metadata could not be fetched before
+ * the amount of time allocated by {@code request.timeout.ms}
expires.
*/
@Override
public List<PartitionInfo> partitionsFor(String topic) {
+ return partitionsFor(topic, Duration.ofMillis(requestTimeoutMs));
+ }
+
+ /**
+ * Get metadata about the partitions for a given topic. This method will
issue a remote call to the server if it
+ * does not already have any metadata about the given topic.
+ *
+ * @param topic The topic to get partition metadata for
+ * @param timeout The maximum of time to await topic metadata
+ *
+ * @return The list of partitions
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link
#wakeup()} is called before or while this
+ * function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the
calling thread is interrupted before or while
+ * this function is called
+ * @throws org.apache.kafka.common.errors.AuthenticationException if
authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not
authorized to the specified topic. See
+ * the exception for more details
+ * @throws org.apache.kafka.common.errors.TimeoutException if topic
metadata cannot be fetched before expiration
+ * of the passed timeout
+ * @throws org.apache.kafka.common.KafkaException for any other
unrecoverable errors
+ */
+ @Override
+ public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
acquireAndEnsureOpen();
+ long timeoutMs = timeout.toMillis();
try {
Cluster cluster = this.metadata.fetch();
List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
@@ -1579,7 +1708,7 @@ public OffsetAndMetadata committed(TopicPartition
partition) {
return parts;
Map<String, List<PartitionInfo>> topicMetadata =
fetcher.getTopicMetadata(
- new
MetadataRequest.Builder(Collections.singletonList(topic), true),
requestTimeoutMs);
+ new
MetadataRequest.Builder(Collections.singletonList(topic), true), timeoutMs);
return topicMetadata.get(topic);
} finally {
release();
@@ -1596,15 +1725,35 @@ public OffsetAndMetadata committed(TopicPartition
partition) {
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the
calling thread is interrupted before or while
* this function is called
- * @throws org.apache.kafka.common.errors.TimeoutException if the topic
metadata could not be fetched before
- * expiration of the configured request timeout
* @throws org.apache.kafka.common.KafkaException for any other
unrecoverable errors
+ * @throws org.apache.kafka.common.errors.TimeoutException if the offset
metadata could not be fetched before
+ * the amount of time allocated by {@code request.timeout.ms}
expires.
*/
@Override
public Map<String, List<PartitionInfo>> listTopics() {
+ return listTopics(Duration.ofMillis(requestTimeoutMs));
+ }
+
+ /**
+ * Get metadata about partitions for all topics that the user is
authorized to view. This method will issue a
+ * remote call to the server.
+ *
+ * @param timeout The maximum time this operation will block to fetch
topic metadata
+ *
+ * @return The map of topics and its partitions
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link
#wakeup()} is called before or while this
+ * function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the
calling thread is interrupted before or while
+ * this function is called
+ * @throws org.apache.kafka.common.errors.TimeoutException if the topic
metadata could not be fetched before
+ * expiration of the passed timeout
+ * @throws org.apache.kafka.common.KafkaException for any other
unrecoverable errors
+ */
+ @Override
+ public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
acquireAndEnsureOpen();
try {
- return fetcher.getAllTopicMetadata(requestTimeoutMs);
+ return fetcher.getAllTopicMetadata(timeout.toMillis());
} finally {
release();
}
@@ -1683,12 +1832,39 @@ public void resume(Collection<TopicPartition>
partitions) {
* @throws org.apache.kafka.common.errors.AuthorizationException if not
authorized to the topic(s). See the exception for more details
* @throws IllegalArgumentException if the target timestamp is negative
* @throws org.apache.kafka.common.errors.TimeoutException if the offset
metadata could not be fetched before
- * expiration of the configured {@code request.timeout.ms}
+ * the amount of time allocated by {@code request.timeout.ms}
expires.
* @throws org.apache.kafka.common.errors.UnsupportedVersionException if
the broker does not support looking up
* the offsets by timestamp
*/
@Override
public Map<TopicPartition, OffsetAndTimestamp>
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
+ return offsetsForTimes(timestampsToSearch,
Duration.ofMillis(requestTimeoutMs));
+ }
+
+ /**
+ * Look up the offsets for the given partitions by timestamp. The returned
offset for each partition is the
+ * earliest offset whose timestamp is greater than or equal to the given
timestamp in the corresponding partition.
+ *
+ * This is a blocking call. The consumer does not have to be assigned the
partitions.
+ * If the message format version in a partition is before 0.10.0, i.e. the
messages do not have timestamps, null
+ * will be returned for that partition.
+ *
+ * @param timestampsToSearch the mapping from partition to the timestamp
to look up.
+ * @param timeout The maximum amount of time to await retrieval of the
offsets
+ *
+ * @return a mapping from partition to the timestamp and offset of the
first message with timestamp greater
+ * than or equal to the target timestamp. {@code null} will be
returned for the partition if there is no
+ * such message.
+ * @throws org.apache.kafka.common.errors.AuthenticationException if
authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not
authorized to the topic(s). See the exception for more details
+ * @throws IllegalArgumentException if the target timestamp is negative
+ * @throws org.apache.kafka.common.errors.TimeoutException if the offset
metadata could not be fetched before
+ * expiration of the passed timeout
+ * @throws org.apache.kafka.common.errors.UnsupportedVersionException if
the broker does not support looking up
+ * the offsets by timestamp
+ */
+ @Override
+ public Map<TopicPartition, OffsetAndTimestamp>
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout)
{
acquireAndEnsureOpen();
try {
for (Map.Entry<TopicPartition, Long> entry :
timestampsToSearch.entrySet()) {
@@ -1698,7 +1874,7 @@ public void resume(Collection<TopicPartition> partitions)
{
throw new IllegalArgumentException("The target time for
partition " + entry.getKey() + " is " +
entry.getValue() + ". The target time cannot be
negative.");
}
- return fetcher.offsetsByTimes(timestampsToSearch,
requestTimeoutMs);
+ return fetcher.offsetsByTimes(timestampsToSearch,
timeout.toMillis());
} finally {
release();
}
@@ -1715,14 +1891,35 @@ public void resume(Collection<TopicPartition>
partitions) {
* @return The earliest available offsets for the given partitions
* @throws org.apache.kafka.common.errors.AuthenticationException if
authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not
authorized to the topic(s). See the exception for more details
- * @throws org.apache.kafka.common.errors.TimeoutException if the offsets
could not be fetched before
+ * @throws org.apache.kafka.common.errors.TimeoutException if the offset
metadata could not be fetched before
* expiration of the configured {@code request.timeout.ms}
*/
@Override
public Map<TopicPartition, Long>
beginningOffsets(Collection<TopicPartition> partitions) {
+ return beginningOffsets(partitions,
Duration.ofMillis(requestTimeoutMs));
+ }
+
+ /**
+ * Get the first offset for the given partitions.
+ * <p>
+ * This method does not change the current consumer position of the
partitions.
+ *
+ * @see #seekToBeginning(Collection)
+ *
+ * @param partitions the partitions to get the earliest offsets
+ * @param timeout The maximum amount of time to await retrieval of the
beginning offsets
+ *
+ * @return The earliest available offsets for the given partitions
+ * @throws org.apache.kafka.common.errors.AuthenticationException if
authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not
authorized to the topic(s). See the exception for more details
+ * @throws org.apache.kafka.common.errors.TimeoutException if the offset
metadata could not be fetched before
+ * expiration of the passed timeout
+ */
+ @Override
+ public Map<TopicPartition, Long>
beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
acquireAndEnsureOpen();
try {
- return fetcher.beginningOffsets(partitions, requestTimeoutMs);
+ return fetcher.beginningOffsets(partitions, timeout.toMillis());
} finally {
release();
}
@@ -1744,14 +1941,40 @@ public void resume(Collection<TopicPartition>
partitions) {
* @return The end offsets for the given partitions.
* @throws org.apache.kafka.common.errors.AuthenticationException if
authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not
authorized to the topic(s). See the exception for more details
- * @throws org.apache.kafka.common.errors.TimeoutException if the offsets
could not be fetched before
- * expiration of the configured {@code request.timeout.ms}
+ * @throws org.apache.kafka.common.errors.TimeoutException if the offset
metadata could not be fetched before
+ * the amount of time allocated by {@code request.timeout.ms}
expires
*/
@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition>
partitions) {
+ return endOffsets(partitions, Duration.ofMillis(requestTimeoutMs));
+ }
+
+ /**
+ * Get the end offsets for the given partitions. In the default {@code
read_uncommitted} isolation level, the end
+ * offset is the high watermark (that is, the offset of the last
successfully replicated message plus one). For
+ * {@code read_committed} consumers, the end offset is the last stable
offset (LSO), which is the minimum of
+ * the high watermark and the smallest offset of any open transaction.
Finally, if the partition has never been
+ * written to, the end offset is 0.
+ *
+ * <p>
+ * This method does not change the current consumer position of the
partitions.
+ *
+ * @see #seekToEnd(Collection)
+ *
+ * @param partitions the partitions to get the end offsets.
+ * @param timeout The maximum amount of time to await retrieval of the end
offsets
+ *
+ * @return The end offsets for the given partitions.
+ * @throws org.apache.kafka.common.errors.AuthenticationException if
authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not
authorized to the topic(s). See the exception for more details
+ * @throws org.apache.kafka.common.errors.TimeoutException if the offsets
could not be fetched before
+ * expiration of the passed timeout
+ */
+ @Override
+ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition>
partitions, Duration timeout) {
acquireAndEnsureOpen();
try {
- return fetcher.endOffsets(partitions, requestTimeoutMs);
+ return fetcher.endOffsets(partitions, timeout.toMillis());
} finally {
release();
}
@@ -1760,7 +1983,7 @@ public void resume(Collection<TopicPartition> partitions)
{
/**
* Close the consumer, waiting for up to the default timeout of 30 seconds
for any needed cleanup.
* If auto-commit is enabled, this will commit the current offsets if
possible within the default
- * timeout. See {@link #close(long, TimeUnit)} for details. Note that
{@link #wakeup()}
+ * timeout. See {@link #close(Duration)} for details. Note that {@link
#wakeup()}
* cannot be used to interrupt close.
*
* @throws org.apache.kafka.common.errors.InterruptException if the
calling thread is interrupted
@@ -1769,7 +1992,7 @@ public void resume(Collection<TopicPartition> partitions)
{
*/
@Override
public void close() {
- close(DEFAULT_CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
}
/**
@@ -1786,15 +2009,39 @@ public void close() {
* @throws IllegalArgumentException If the {@code timeout} is negative.
* @throws InterruptException If the thread is interrupted before or while
this function is called
* @throws org.apache.kafka.common.KafkaException for any other error
during close
+ *
+ * @deprecated Since 2.0. Use {@link #close(Duration)} or {@link #close()}.
*/
+ @Deprecated
+ @Override
public void close(long timeout, TimeUnit timeUnit) {
- if (timeout < 0)
+ close(Duration.ofMillis(TimeUnit.MILLISECONDS.toMillis(timeout)));
+ }
+
+ /**
+ * Tries to close the consumer cleanly within the specified timeout. This
method waits up to
+ * {@code timeout} for the consumer to complete pending commits and leave
the group.
+ * If auto-commit is enabled, this will commit the current offsets if
possible within the
+ * timeout. If the consumer is unable to complete offset commits and
gracefully leave the group
+ * before the timeout expires, the consumer is force closed. Note that
{@link #wakeup()} cannot be
+ * used to interrupt close.
+ *
+ * @param timeout The maximum time to wait for consumer to close
gracefully. The value must be
+ * non-negative. Specifying a timeout of zero means do not
wait for pending requests to complete.
+ *
+ * @throws IllegalArgumentException If the {@code timeout} is negative.
+ * @throws InterruptException If the thread is interrupted before or while
this function is called
+ * @throws org.apache.kafka.common.KafkaException for any other error
during close
+ */
+ @Override
+ public void close(Duration timeout) {
+ if (timeout.toMillis() < 0)
throw new IllegalArgumentException("The timeout cannot be
negative.");
acquire();
try {
if (!closed) {
closed = true;
- close(timeUnit.toMillis(timeout), false);
+ close(timeout.toMillis(), false);
}
} finally {
release();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 479a9ffaaf0..3502156d4d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -251,6 +251,11 @@ public synchronized void commitSync() {
commitSync(this.subscriptions.allConsumed());
}
+ @Override
+ public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets,
final Duration timeout) {
+ commitSync(offsets);
+ }
+
@Override
public synchronized void seek(TopicPartition partition, long offset) {
ensureNotClosed();
@@ -266,6 +271,11 @@ public synchronized OffsetAndMetadata
committed(TopicPartition partition) {
return new OffsetAndMetadata(0);
}
+ @Override
+ public OffsetAndMetadata committed(TopicPartition partition, final
Duration timeout) {
+ return committed(partition);
+ }
+
@Override
public synchronized long position(TopicPartition partition) {
ensureNotClosed();
@@ -279,6 +289,11 @@ public synchronized long position(TopicPartition
partition) {
return offset;
}
+ @Override
+ public synchronized long position(TopicPartition partition, final Duration
timeout) {
+ return position(partition);
+ }
+
@Override
public synchronized void seekToBeginning(Collection<TopicPartition>
partitions) {
ensureNotClosed();
@@ -470,4 +485,35 @@ private Long getEndOffset(List<Long> offsets) {
}
return offsets.size() > 1 ? offsets.remove(0) : offsets.get(0);
}
+
+ @Override
+ public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
+ return partitionsFor(topic);
+ }
+
+ @Override
+ public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
+ return listTopics();
+ }
+
+ @Override
+ public Map<TopicPartition, OffsetAndTimestamp>
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch,
+ Duration timeout) {
+ return offsetsForTimes(timestampsToSearch);
+ }
+
+ @Override
+ public Map<TopicPartition, Long>
beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
+ return beginningOffsets(partitions);
+ }
+
+ @Override
+ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition>
partitions, Duration duration) {
+ return endOffsets(partitions);
+ }
+
+ @Override
+ public void close(Duration timeout) {
+ close();
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add TimeoutException to KafkaConsumer#position()
> ------------------------------------------------
>
> Key: KAFKA-6608
> URL: https://issues.apache.org/jira/browse/KAFKA-6608
> Project: Kafka
> Issue Type: Improvement
> Components: consumer
> Reporter: Richard Yu
> Assignee: Richard Yu
> Priority: Blocker
> Labels: kip
>
> In KAFKA-4879, Kafka Consumer hangs indefinitely due to Fetcher's {{timeout}}
> being set to {{Long.MAX_VALUE}}. While fixing this issue, it was pointed out
> that if a timeout was added to methods which commits offsets synchronously, a
> stricter control on time could be achieved.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)