[ 
https://issues.apache.org/jira/browse/KAFKA-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-4024:
-----------------------------------
       Resolution: Fixed
    Fix Version/s: 0.10.2.0
           Status: Resolved  (was: Patch Available)

Issue resolved by pull request 1707
[https://github.com/apache/kafka/pull/1707]

> First metadata update always take retry.backoff.ms milliseconds to complete
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-4024
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4024
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.9.0.1, 0.10.0.0
>            Reporter: Yuto Kawamura
>            Assignee: Yuto Kawamura
>             Fix For: 0.10.2.0
>
>
> Recently I updated our KafkaProducer configuration, specifically we adjusted 
> {{retry.backoff.ms}} from default(100ms) to 1000ms.
> After that we observed that the first {{send()}} start taking longer than 
> before, investigated then found following facts.
> Environment:
> - Kafka broker 0.9.0.1
> - Kafka producer 0.9.0.1
> Our current version is 0.9.0.1 but it reproduced with latest build from trunk 
> branch as well.
> h2. TL;DR
> The first {{KafkaProducer.send()}} always blocked {{retry.backoff.ms}} 
> milliseconds, due to unintentionally applied backoff on first metadata update.
> h2. Proof
> I wrote following test code and placed under the clients/main/java/
> {code}
> import java.util.Properties;
> import java.util.concurrent.TimeUnit;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.Producer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.common.serialization.ByteArraySerializer;
> public final class KafkaProducerMetadataUpdateDurationTest {
>     public static void main(String[] args) {
>         Properties props = new Properties();
>         props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9092");
>         props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "30000");
>         String retryBackoffMs = System.getProperty("retry.backoff.ms");
>         System.err.println("Experimenting with retry.backoff.ms = " + 
> retryBackoffMs);
>         props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 
> retryBackoffMs);
>         Producer<byte[], byte[]> producer =
>                 new KafkaProducer<>(props, new ByteArraySerializer(), new 
> ByteArraySerializer());
>         long t0 = System.nanoTime();
>         try {
>             producer.partitionsFor("test");
>             long duration = System.nanoTime() - t0;
>             System.err.println("Duration = " + 
> TimeUnit.NANOSECONDS.toMillis(duration) + " ms");
>         } finally {
>             producer.close();
>         }
>     }
> }
> {code}
> Here's experiment log:
> {code}
> # Start zookeeper & kafka broker
> ./bin/zookeeper-server-start.sh config/zookeeper.properties
> ./bin/kafka-server-start.sh config/server.properties
> # Create test topic
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test 
> --replication-factor 1 --partitions 1
> $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=100 
> KafkaProducerMetadataUpdateDurationTest
> Experimenting with retry.backoff.ms = 100
> Duration = 175 ms
> $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=1000 
> KafkaProducerMetadataUpdateDurationTest
> Experimenting with retry.backoff.ms = 1000
> Duration = 1066 ms
> $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=10000 
> KafkaProducerMetadataUpdateDurationTest
> Experimenting with retry.backoff.ms = 10000
> Duration = 10070 ms
> {code}
> As you can see, duration of {{partitionsFor()}} increases linearly in 
> proportion to the value of {{retry.backoff.ms}}.
> Here I describe the scenario that leads this behavior:
> 1. KafkaProducer initializes metadata with giving {{bootstrap.servers}} and 
> the current timestamp: 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L276
> 2. On the first {{send()}}, KafkaProducer requests metadata update due to 
> missing partition info: 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L527
> 3. But, DefaultMetadataUpdater doesn't actually send MetadataRequest, because 
> {{metadata.timeToNextUpdate}} returns a value lager than zero: 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L541-L548
> 4. {{Metadata.timeToNextUpdate}} returns lager one of time till metadata 
> expiration or time till backing off expiration but practially needUpdate is 
> always true at the first time so here the timeToAllowUpdate is always 
> adopted, which never be zero until {{retry.backoff.ms}} elapsed since the 
> first {{metadata.update()}}: 
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L116
> This is because of kafka client tries to keep interval configured by 
> {{retry.backoff.ms}} between each metadata update so it's basically works 
> fine from the second update but for the first time, since it could never have 
> the actual metadata(which is obtained by MetadaUpdate request), this backing 
> off isn't making sense and in fact it's harming our application by blocking 
> the first {{send()}} insanely long.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to