[ https://issues.apache.org/jira/browse/KAFKA-6979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510380#comment-16510380 ]
ASF GitHub Bot commented on KAFKA-6979: --------------------------------------- hachikuji closed pull request #5122: KAFKA-6979: Add default.api.timeout.ms to KafkaConsumer (KIP-266) URL: https://github.com/apache/kafka/pull/5122 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 72e496cbd46..bc9a716158e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -218,6 +218,10 @@ public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; + /** <code>default.api.timeout.ms</code> */ + public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms"; + public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for consumer APIs that could block. This configuration is used as the default timeout for all consumer operations that do not explicitly accept a <code>timeout</code> parameter."; + /** <code>interceptor.classes</code> */ public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes"; public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. " @@ -403,6 +407,12 @@ atLeast(0), Importance.MEDIUM, REQUEST_TIMEOUT_MS_DOC) + .define(DEFAULT_API_TIMEOUT_MS_CONFIG, + Type.INT, + 60 * 1000, + atLeast(0), + Importance.MEDIUM, + DEFAULT_API_TIMEOUT_MS_DOC) /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */ .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 5bd6b935b39..d6973c0a818 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -567,6 +567,7 @@ private final Metadata metadata; private final long retryBackoffMs; private final long requestTimeoutMs; + private final int defaultApiTimeoutMs; private volatile boolean closed = false; private List<PartitionAssignor> assignors; @@ -666,6 +667,7 @@ private KafkaConsumer(ConsumerConfig config, log.debug("Initializing the Kafka consumer"); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); + this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG); int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG); int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs) @@ -814,6 +816,7 @@ private KafkaConsumer(ConsumerConfig config, Metadata metadata, long retryBackoffMs, long requestTimeoutMs, + int defaultApiTimeoutMs, List<PartitionAssignor> assignors) { this.log = logContext.logger(getClass()); this.clientId = clientId; @@ -829,6 +832,7 @@ private KafkaConsumer(ConsumerConfig config, this.metadata = metadata; this.retryBackoffMs = retryBackoffMs; this.requestTimeoutMs = requestTimeoutMs; + this.defaultApiTimeoutMs = defaultApiTimeoutMs; this.assignors = assignors; } @@ -1268,8 +1272,9 @@ private long remainingTimeAtLeastZero(final long timeoutMs, final long elapsedTi * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API * should not be used. * <p> - * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is - * encountered (in which case it is thrown to the caller). + * This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is + * encountered (in which case it is thrown to the caller), or the timeout specified by {@code default.api.timeout.ms} expires + * (in which case a {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller). * <p> * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)} * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method. @@ -1286,10 +1291,12 @@ private long remainingTimeAtLeastZero(final long timeoutMs, final long elapsedTi * configured groupId. See the exception for more details * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata * is too large or if the topic does not exist). + * @throws org.apache.kafka.common.errors.TimeoutException if the timeout specified by {@code default.api.timeout.ms} expires + * before successful completion of the offset commit */ @Override public void commitSync() { - commitSync(Duration.ofMillis(Long.MAX_VALUE)); + commitSync(Duration.ofMillis(defaultApiTimeoutMs)); } /** @@ -1343,7 +1350,8 @@ public void commitSync(Duration timeout) { * i.e. lastProcessedMessageOffset + 1. * <p> * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is - * encountered (in which case it is thrown to the caller). + * encountered (in which case it is thrown to the caller), or the timeout specified by {@code default.api.timeout.ms} expires + * (in which case a {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller). * <p> * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)} * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method. @@ -1362,10 +1370,12 @@ public void commitSync(Duration timeout) { * @throws java.lang.IllegalArgumentException if the committed offset is negative * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata * is too large or if the topic does not exist). + * @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion + * of the offset commit */ @Override public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) { - commitSync(offsets, Duration.ofMillis(Long.MAX_VALUE)); + commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs)); } /** @@ -1560,7 +1570,8 @@ public void seekToEnd(Collection<TopicPartition> partitions) { * This method may issue a remote call to the server if there is no current position for the given partition. * <p> * This call will block until either the position could be determined or an unrecoverable error is - * encountered (in which case it is thrown to the caller). + * encountered (in which case it is thrown to the caller), or the timeout specified by {@code default.api.timeout.ms} expires + * (in which case a {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller). * * @param partition The partition to get the position for * @return The current position of the consumer (that is, the offset of the next record to be fetched) @@ -1575,10 +1586,12 @@ public void seekToEnd(Collection<TopicPartition> partitions) { * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the * configured groupId. See the exception for more details * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * @throws org.apache.kafka.common.errors.TimeoutException if the position cannot be determined before the + * timeout specified by {@code default.api.timeout.ms} expires */ @Override public long position(TopicPartition partition) { - return position(partition, Duration.ofMillis(Long.MAX_VALUE)); + return position(partition, Duration.ofMillis(defaultApiTimeoutMs)); } /** @@ -1641,7 +1654,10 @@ public long position(TopicPartition partition, final Duration timeout) { * Get the last committed offset for the given partition (whether the commit happened by this process or * another). This offset will be used as the position for the consumer in the event of a failure. * <p> - * This call will block to do a remote call to get the latest committed offsets from the server. + * This call will do a remote call to get the latest committed offset from the server, and will block until the + * committed offset is gotten successfully, an unrecoverable error is encountered (in which case it is thrown to + * the caller), or the timeout specified by {@code default.api.timeout.ms} expires (in which case a + * {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller). * * @param partition The partition to check * @return The last committed offset and metadata or null if there was no prior commit @@ -1653,10 +1669,12 @@ public long position(TopicPartition partition, final Duration timeout) { * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the * configured groupId. See the exception for more details * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before + * the timeout specified by {@code default.api.timeout.ms} expires. */ @Override public OffsetAndMetadata committed(TopicPartition partition) { - return committed(partition, Duration.ofMillis(Long.MAX_VALUE)); + return committed(partition, Duration.ofMillis(defaultApiTimeoutMs)); } /** @@ -1718,11 +1736,11 @@ public OffsetAndMetadata committed(TopicPartition partition, final Duration time * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic. See the exception for more details * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before - * the amount of time allocated by {@code request.timeout.ms} expires. + * the amount of time allocated by {@code default.api.timeout.ms} expires. */ @Override public List<PartitionInfo> partitionsFor(String topic) { - return partitionsFor(topic, Duration.ofMillis(requestTimeoutMs)); + return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs)); } /** @@ -1774,11 +1792,11 @@ public OffsetAndMetadata committed(TopicPartition partition, final Duration time * this function is called * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before - * the amount of time allocated by {@code request.timeout.ms} expires. + * the amount of time allocated by {@code default.api.timeout.ms} expires. */ @Override public Map<String, List<PartitionInfo>> listTopics() { - return listTopics(Duration.ofMillis(requestTimeoutMs)); + return listTopics(Duration.ofMillis(defaultApiTimeoutMs)); } /** @@ -1879,13 +1897,13 @@ public void resume(Collection<TopicPartition> partitions) { * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details * @throws IllegalArgumentException if the target timestamp is negative * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before - * the amount of time allocated by {@code request.timeout.ms} expires. + * the amount of time allocated by {@code default.api.timeout.ms} expires. * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up * the offsets by timestamp */ @Override public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) { - return offsetsForTimes(timestampsToSearch, Duration.ofMillis(requestTimeoutMs)); + return offsetsForTimes(timestampsToSearch, Duration.ofMillis(defaultApiTimeoutMs)); } /** @@ -1939,11 +1957,11 @@ public void resume(Collection<TopicPartition> partitions) { * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before - * expiration of the configured {@code request.timeout.ms} + * expiration of the configured {@code default.api.timeout.ms} */ @Override public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) { - return beginningOffsets(partitions, Duration.ofMillis(requestTimeoutMs)); + return beginningOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs)); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 4be688422ce..b8681e8f1bc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1748,6 +1748,7 @@ private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset, String metricGroupPrefix = "consumer"; long retryBackoffMs = 100; long requestTimeoutMs = 30000; + int defaultApiTimeoutMs = 30000; boolean excludeInternalTopics = true; int minBytes = 1; int maxBytes = Integer.MAX_VALUE; @@ -1825,6 +1826,7 @@ private FetchResponse fetchResponse(TopicPartition partition, long fetchOffset, metadata, retryBackoffMs, requestTimeoutMs, + defaultApiTimeoutMs, assignors); } diff --git a/docs/upgrade.html b/docs/upgrade.html index 056fb8366e4..7b5449d1b75 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -91,10 +91,13 @@ <h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 2 <code>internal.value.converter=org.apache.kafka.connect.json.JsonConverter</code> <code>internal.value.converter.schemas.enable=false</code> </li> - <li><a href="https://cwiki.apache.org/confluence/x/5kiHB">KIP-266</a> adds overloads to the consumer to support - timeout behavior for blocking APIs. In particular, a new <code>poll(Duration)</code> API has been added which - does not block for dynamic partition assignment. The old <code>poll(long)</code> API has been deprecated and - will be removed in a future version.</li> + <li><a href="https://cwiki.apache.org/confluence/x/5kiHB">KIP-266</a> adds a new consumer configuration <code>default.api.timeout.ms</code> + to specify the default timeout to use for <code>KafkaConsumer</code> APIs that could block. The KIP also adds overloads for such blocking + APIs to support specifying a specific timeout to use for each of them instead of using the default timeout set by <code>default.api.timeout.ms</code>. + In particular, a new <code>poll(Duration)</code> API has been added which does not block for dynamic partition assignment. + The old <code>poll(long)</code> API has been deprecated and will be removed in a future version. Overloads have also been added + for other <code>KafkaConsumer</code> methods like <code>partitionsFor</code>, <code>listTopics</code>, <code>offsetsForTimes</code>, + <code>beginningOffsets</code>, <code>endOffsets</code> and <code>close</code> that take in a <code>Duration</code>.</li> <li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li> <li>The tool kafka.tools.ReplayLogProducer has been removed.</li> </ul> ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add max.block.ms to consumer for default timeout behavior > --------------------------------------------------------- > > Key: KAFKA-6979 > URL: https://issues.apache.org/jira/browse/KAFKA-6979 > Project: Kafka > Issue Type: Improvement > Components: consumer > Reporter: Jason Gustafson > Assignee: Dhruvil Shah > Priority: Blocker > Fix For: 2.0.0 > > > Implement max.block.ms as described in KIP-266: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior. -- This message was sent by Atlassian JIRA (v7.6.3#76005)