This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 444e10d29e0 Kafka adaptive timeout implementation to handle empty
topic cases (#29400)
444e10d29e0 is described below
commit 444e10d29e04e3e144ef08c0a505bd76aa08984f
Author: Talat UYARER <[email protected]>
AuthorDate: Tue Nov 21 11:02:57 2023 -0800
Kafka adaptive timeout implementation to handle empty topic cases (#29400)
* Kafka adaptive timeout implementation to handle empty topic cases
* Format fix with spotless
---------
Co-authored-by: tuyarer <[email protected]>
---
.../apache/beam/sdk/io/kafka/KafkaUnboundedReader.java | 18 +++++++++++++++---
1 file changed, 15 insertions(+), 3 deletions(-)
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index 55c71384162..054eb502cd8 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -357,7 +357,9 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
*/
private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
- private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT =
Duration.millis(10);
+ private Duration recordsDequeuePollTimeout;
+ private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT_MIN =
Duration.millis(1);
+ private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT_MAX =
Duration.millis(20);
private static final Duration RECORDS_ENQUEUE_POLL_TIMEOUT =
Duration.millis(100);
// Use a separate thread to read Kafka messages. Kafka Consumer does all its
work including
@@ -543,6 +545,7 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
bytesReadBySplit = SourceMetrics.bytesReadBySplit(splitId);
backlogBytesOfSplit = SourceMetrics.backlogBytesOfSplit(splitId);
backlogElementsOfSplit = SourceMetrics.backlogElementsOfSplit(splitId);
+ recordsDequeuePollTimeout = Duration.millis(10);
}
private void consumerPollLoop() {
@@ -614,8 +617,7 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
try {
// poll available records, wait (if necessary) up to the specified
timeout.
records =
- availableRecordsQueue.poll(
- RECORDS_DEQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS);
+ availableRecordsQueue.poll(recordsDequeuePollTimeout.getMillis(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("{}: Unexpected", this, e);
@@ -627,9 +629,19 @@ class KafkaUnboundedReader<K, V> extends
UnboundedReader<KafkaRecord<K, V>> {
if (consumerPollException.get() != null) {
throw new IOException("Exception while reading from Kafka",
consumerPollException.get());
}
+ if
(recordsDequeuePollTimeout.isLongerThan(RECORDS_DEQUEUE_POLL_TIMEOUT_MIN)) {
+ recordsDequeuePollTimeout =
recordsDequeuePollTimeout.minus(Duration.millis(1));
+ LOG.debug("Reducing poll timeout for reader to " +
recordsDequeuePollTimeout.getMillis());
+ }
return;
}
+ if
(recordsDequeuePollTimeout.isShorterThan(RECORDS_DEQUEUE_POLL_TIMEOUT_MAX)) {
+ recordsDequeuePollTimeout =
recordsDequeuePollTimeout.plus(Duration.millis(1));
+ LOG.debug("Increasing poll timeout for reader to " +
recordsDequeuePollTimeout.getMillis());
+ LOG.debug("Record count: " + records.count());
+ }
+
partitionStates.forEach(p -> p.recordIter =
records.records(p.topicPartition).iterator());
// cycle through the partitions in order to interleave records from each.