[
https://issues.apache.org/jira/browse/KAFKA-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15410563#comment-15410563
]
ASF GitHub Bot commented on KAFKA-4024:
---------------------------------------
GitHub user kawamuray opened a pull request:
https://github.com/apache/kafka/pull/1707
KAFKA-4024 KafkaProducer should not initialize metadata with current
timestamp
Issue: https://issues.apache.org/jira/browse/KAFKA-4024
Solves the problem that the first metadata update of KafkaProducer takes
`retry.backoff.ms` to complete.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/kawamuray/kafka KAFKA-4024-metadata-backoff
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/1707.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1707
----
commit fc02b2381c9eb90a75fd0da338596679bd764a31
Author: Yuto Kawamura <[email protected]>
Date: 2016-08-06T09:25:29Z
KAFKA-4024 KafkaProducer should not initialize metadata with current
timestamp
----
> First metadata update always take retry.backoff.ms milliseconds to complete
> ---------------------------------------------------------------------------
>
> Key: KAFKA-4024
> URL: https://issues.apache.org/jira/browse/KAFKA-4024
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 0.9.0.1, 0.10.0.0
> Reporter: Yuto Kawamura
> Assignee: Yuto Kawamura
>
> Recently I updated our KafkaProducer configuration, specifically we adjusted
> {{retry.backoff.ms}} from default(100ms) to 1000ms.
> After that we observed that the first {{send()}} start taking longer than
> before, investigated then found following facts.
> Environment:
> - Kafka broker 0.9.0.1
> - Kafka producer 0.9.0.1
> Our current version is 0.9.0.1 but it reproduced with latest build from trunk
> branch as well.
> h2. TL;DR
> The first {{KafkaProducer.send()}} always blocked {{retry.backoff.ms}}
> milliseconds, due to unintentionally applied backoff on first metadata update.
> h2. Proof
> I wrote following test code and placed under the clients/main/java/
> {code}
> import java.util.Properties;
> import java.util.concurrent.TimeUnit;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.Producer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.common.serialization.ByteArraySerializer;
> public final class KafkaProducerMetadataUpdateDurationTest {
> public static void main(String[] args) {
> Properties props = new Properties();
> props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
> props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "30000");
> String retryBackoffMs = System.getProperty("retry.backoff.ms");
> System.err.println("Experimenting with retry.backoff.ms = " +
> retryBackoffMs);
> props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,
> retryBackoffMs);
> Producer<byte[], byte[]> producer =
> new KafkaProducer<>(props, new ByteArraySerializer(), new
> ByteArraySerializer());
> long t0 = System.nanoTime();
> try {
> producer.partitionsFor("test");
> long duration = System.nanoTime() - t0;
> System.err.println("Duration = " +
> TimeUnit.NANOSECONDS.toMillis(duration) + " ms");
> } finally {
> producer.close();
> }
> }
> }
> {code}
> Here's experiment log:
> {code}
> # Start zookeeper & kafka broker
> ./bin/zookeeper-server-start.sh config/zookeeper.properties
> ./bin/kafka-server-start.sh config/server.properties
> # Create test topic
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test
> --replication-factor 1 --partitions 1
> $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=100
> KafkaProducerMetadataUpdateDurationTest
> Experimenting with retry.backoff.ms = 100
> Duration = 175 ms
> $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=1000
> KafkaProducerMetadataUpdateDurationTest
> Experimenting with retry.backoff.ms = 1000
> Duration = 1066 ms
> $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=10000
> KafkaProducerMetadataUpdateDurationTest
> Experimenting with retry.backoff.ms = 10000
> Duration = 10070 ms
> {code}
> As you can see, duration of {{partitionsFor()}} increases linearly in
> proportion to the value of {{retry.backoff.ms}}.
> Here I describe the scenario that leads this behavior:
> 1. KafkaProducer initializes metadata with giving {{bootstrap.servers}} and
> the current timestamp:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L276
> 2. On the first {{send()}}, KafkaProducer requests metadata update due to
> missing partition info:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L527
> 3. But, DefaultMetadataUpdater doesn't actually send MetadataRequest, because
> {{metadata.timeToNextUpdate}} returns a value lager than zero:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L541-L548
> 4. {{Metadata.timeToNextUpdate}} returns lager one of time till metadata
> expiration or time till backing off expiration but practially needUpdate is
> always true at the first time so here the timeToAllowUpdate is always
> adopted, which never be zero until {{retry.backoff.ms}} elapsed since the
> first {{metadata.update()}}:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L116
> This is because of kafka client tries to keep interval configured by
> {{retry.backoff.ms}} between each metadata update so it's basically works
> fine from the second update but for the first time, since it could never have
> the actual metadata(which is obtained by MetadaUpdate request), this backing
> off isn't making sense and in fact it's harming our application by blocking
> the first {{send()}} insanely long.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)