[
https://issues.apache.org/jira/browse/KAFKA-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458639#comment-15458639
]
Yuto Kawamura commented on KAFKA-4024:
--------------------------------------
I reconsidered this issue and think I found that this is much worse than I
explained before.
IIUC, in short, setting {{retry.backoff.ms}} to lager value can delays
KafkaProducer to update outdated metadata.
That is, when we set {{retry.backoff.ms}} to 1 second for example, and a
partition leadership failover happens, the producer will take 1 seconds to fire
metadata request in the worst case, even though it could detect broker
disconnection or outdated partition leadership information.
Here's the result of my experiment. I modified
{{KafkaProducerMetadataUpdateDurationTest}} and observed DEBUG logs of
NetworkClient and Metadata.
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
{code}
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
public final class KafkaProducerMetadataUpdateDurationTest {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"HOST-1:9092,HOST-2:9092,HOST-3:9092");
props.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000");
props.setProperty(ProducerConfig.RETRIES_CONFIG,
String.valueOf(Integer.MAX_VALUE));
String retryBackoffMs = System.getProperty("retry.backoff.ms");
System.err.println("Experimenting with retry.backoff.ms = " +
retryBackoffMs);
props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,
retryBackoffMs);
Producer<String, String> producer =
new KafkaProducer<>(props, new StringSerializer(), new
StringSerializer());
try {
int i = 0;
while (true) {
final int produceSeq = i++;
final long t0 = System.nanoTime();
producer.send(new ProducerRecord<>("test", produceSeq % 3,
"key", "value"),
new Callback() {
@Override
public void onCompletion(RecordMetadata
metadata, Exception exception) {
long produceDuration =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0);
System.err.printf("Produce[%d]:
duration=%d, exception=%s\n", produceSeq, produceDuration, exception);
}
});
long sendDuration =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0);
System.err.printf("Send[%d]: duration=%d\n", produceSeq,
sendDuration);
Thread.sleep(1000);
}
} finally {
producer.close();
}
}
}
{code}
log4j.properties:
{code}
log4j.rootLogger=INFO, stdout
log4j.logger.org.apache.kafka.clients.Metadata=DEBUG, stdout
log4j.additivity.org.apache.kafka.clients.Metadata=false
log4j.logger.org.apache.kafka.clients.NetworkClient=DEBUG, stdout
log4j.additivity.org.apache.kafka.clients.NetworkClient=false
log4j.logger.org.apache.kafka.clients.producer.internals.Sender=DEBUG, stdout
log4j.additivity.org.apache.kafka.clients.producer.internals.Sender=DEBUG,
stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
{code}
Topic "test" has 3 replicas and 3 partitions.
Then I started KafkaProducerMetadataUpdateDurationTest, and stopped broker 1
manually at (*2). Here's the log:
{code}
./bin/kafka-run-class.sh -Dlog4j.configuration=file:./log4j.properties
-Dretry.backoff.ms=10000 KafkaProducerMetadataUpdateDurationTest
Experimenting with retry.backoff.ms = 10000
...
[2016-09-02 22:36:29,839] INFO Kafka version : 0.10.1.0-SNAPSHOT
(org.apache.kafka.common.utils.AppInfoParser)
[2016-09-02 22:36:29,839] INFO Kafka commitId : 8f3462552fa4d6a6
(org.apache.kafka.common.utils.AppInfoParser)
[2016-09-02 22:36:39,826] DEBUG Initialize connection to node -2 for sending
metadata request (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:39,826] DEBUG Initiating connection to node -2 at
HOST-2:9092. (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:39,883] DEBUG Completed connection to node -2
(org.apache.kafka.clients.NetworkClient)
# *1 The first metadata request
[2016-09-02 22:36:39,902] DEBUG Sending metadata request {topics=[test]} to
node -2 (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:39,929] DEBUG Updated cluster metadata version 2 to
Cluster(nodes = [HOST-2:9092 (id: 2 rack: null), HOST-1:9092 (id: 1 rack:
null), HOST-3:9092 (id: 3 rack: null)], partitions = [Partition(topic = test,
partition = 1, leader = 1, replicas = [1,2,3,], isr = [2,3,1,]),
Partition(topic = test, partition = 0, leader = 3, replicas = [1,2,3,], isr =
[3,2,1,]), Partition(topic = test, partition = 2, leader = 2, replicas =
[1,2,3,], isr = [3,2,1,])]) (org.apache.kafka.clients.Metadata)
Send[0]: duration=10104
[2016-09-02 22:36:39,944] DEBUG Initiating connection to node 3 at HOST-3:9092.
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:39,947] DEBUG Completed connection to node 3
(org.apache.kafka.clients.NetworkClient)
Produce[0]: duration=10117, exception=null
Send[1]: duration=0
[2016-09-02 22:36:40,950] DEBUG Initiating connection to node 1 at HOST-1:9092.
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:40,952] DEBUG Completed connection to node 1
(org.apache.kafka.clients.NetworkClient)
Produce[1]: duration=12, exception=null
Send[2]: duration=0
[2016-09-02 22:36:41,955] DEBUG Initiating connection to node 2 at HOST-2:9092.
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:41,958] DEBUG Completed connection to node 2
(org.apache.kafka.clients.NetworkClient)
Produce[2]: duration=5, exception=null
Send[3]: duration=0
Produce[3]: duration=4, exception=null
# *2 I stopped broker 1 at this moment
[2016-09-02 22:36:43,134] DEBUG Node 1 disconnected.
(org.apache.kafka.clients.NetworkClient)
Send[4]: duration=0
[2016-09-02 22:36:44,137] DEBUG Initiating connection to node 1 at HOST-1:9092.
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:44,139] DEBUG Node 1 disconnected.
(org.apache.kafka.clients.NetworkClient)
Send[5]: duration=0
Produce[5]: duration=4, exception=null
[2016-09-02 22:36:45,141] DEBUG Initiating connection to node 1 at HOST-1:9092.
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:45,143] DEBUG Node 1 disconnected.
(org.apache.kafka.clients.NetworkClient)
Send[6]: duration=0
Produce[6]: duration=3, exception=null
[2016-09-02 22:36:46,148] DEBUG Initiating connection to node 1 at HOST-1:9092.
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:46,150] DEBUG Node 1 disconnected.
(org.apache.kafka.clients.NetworkClient)
Send[7]: duration=0
[2016-09-02 22:36:47,154] DEBUG Initiating connection to node 1 at HOST-1:9092.
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:47,156] DEBUG Node 1 disconnected.
(org.apache.kafka.clients.NetworkClient)
Send[8]: duration=0
Produce[8]: duration=5, exception=null
[2016-09-02 22:36:48,159] DEBUG Initiating connection to node 1 at HOST-1:9092.
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:48,161] DEBUG Node 1 disconnected.
(org.apache.kafka.clients.NetworkClient)
Send[9]: duration=0
Produce[9]: duration=3, exception=null
[2016-09-02 22:36:49,165] DEBUG Initiating connection to node 1 at HOST-1:9092.
(org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:49,168] DEBUG Node 1 disconnected.
(org.apache.kafka.clients.NetworkClient)
# *3 The second metadata update exactly after 10 seconds since the first update.
[2016-09-02 22:36:49,914] DEBUG Sending metadata request {topics=[test]} to
node 3 (org.apache.kafka.clients.NetworkClient)
[2016-09-02 22:36:49,918] DEBUG Updated cluster metadata version 3 to
Cluster(nodes = [HOST-2:9092 (id: 2 rack: null), HOST-3:9092 (id: 3 rack:
null)], partitions = [Partition(topic = test, partition = 1, leader = 2,
replicas = [1,2,3,], isr = [2,3,]), Partition(topic = test, partition = 0,
leader = 3, replicas = [1,2,3,], isr = [3,2,]), Partition(topic = test,
partition = 2, leader = 2, replicas = [1,2,3,], isr = [3,2,])])
(org.apache.kafka.clients.Metadata)
Produce[4]: duration=5957, exception=null
Produce[7]: duration=2946, exception=null
Send[10]: duration=0
Produce[10]: duration=4, exception=null
{code}
First, as I explained already, the first send() blocked insanely long due to
not intentionally applied refreshBackoffMs (*1).
Then I stopped broker 1 at (*2). I think what we expect here is that
KafkaProducer immediately tries to update metadata in order to failover
producing target to the new leader, but it doesn't until 10
seconds(=retry.backoff.ms) elapsed since the first update at (*3).
This leads following bad effects:
- Producing latency
- Buffer full due to accumulated records
- Batch expiration by elapsing {{request.timeout.ms}} :
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java#L153-L156
> First metadata update always take retry.backoff.ms milliseconds to complete
> ---------------------------------------------------------------------------
>
> Key: KAFKA-4024
> URL: https://issues.apache.org/jira/browse/KAFKA-4024
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 0.9.0.1, 0.10.0.0
> Reporter: Yuto Kawamura
> Assignee: Yuto Kawamura
>
> Recently I updated our KafkaProducer configuration, specifically we adjusted
> {{retry.backoff.ms}} from default(100ms) to 1000ms.
> After that we observed that the first {{send()}} start taking longer than
> before, investigated then found following facts.
> Environment:
> - Kafka broker 0.9.0.1
> - Kafka producer 0.9.0.1
> Our current version is 0.9.0.1 but it reproduced with latest build from trunk
> branch as well.
> h2. TL;DR
> The first {{KafkaProducer.send()}} always blocked {{retry.backoff.ms}}
> milliseconds, due to unintentionally applied backoff on first metadata update.
> h2. Proof
> I wrote following test code and placed under the clients/main/java/
> {code}
> import java.util.Properties;
> import java.util.concurrent.TimeUnit;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.Producer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.common.serialization.ByteArraySerializer;
> public final class KafkaProducerMetadataUpdateDurationTest {
> public static void main(String[] args) {
> Properties props = new Properties();
> props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
> props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "30000");
> String retryBackoffMs = System.getProperty("retry.backoff.ms");
> System.err.println("Experimenting with retry.backoff.ms = " +
> retryBackoffMs);
> props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,
> retryBackoffMs);
> Producer<byte[], byte[]> producer =
> new KafkaProducer<>(props, new ByteArraySerializer(), new
> ByteArraySerializer());
> long t0 = System.nanoTime();
> try {
> producer.partitionsFor("test");
> long duration = System.nanoTime() - t0;
> System.err.println("Duration = " +
> TimeUnit.NANOSECONDS.toMillis(duration) + " ms");
> } finally {
> producer.close();
> }
> }
> }
> {code}
> Here's experiment log:
> {code}
> # Start zookeeper & kafka broker
> ./bin/zookeeper-server-start.sh config/zookeeper.properties
> ./bin/kafka-server-start.sh config/server.properties
> # Create test topic
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test
> --replication-factor 1 --partitions 1
> $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=100
> KafkaProducerMetadataUpdateDurationTest
> Experimenting with retry.backoff.ms = 100
> Duration = 175 ms
> $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=1000
> KafkaProducerMetadataUpdateDurationTest
> Experimenting with retry.backoff.ms = 1000
> Duration = 1066 ms
> $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=10000
> KafkaProducerMetadataUpdateDurationTest
> Experimenting with retry.backoff.ms = 10000
> Duration = 10070 ms
> {code}
> As you can see, duration of {{partitionsFor()}} increases linearly in
> proportion to the value of {{retry.backoff.ms}}.
> Here I describe the scenario that leads this behavior:
> 1. KafkaProducer initializes metadata with giving {{bootstrap.servers}} and
> the current timestamp:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L276
> 2. On the first {{send()}}, KafkaProducer requests metadata update due to
> missing partition info:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L527
> 3. But, DefaultMetadataUpdater doesn't actually send MetadataRequest, because
> {{metadata.timeToNextUpdate}} returns a value lager than zero:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L541-L548
> 4. {{Metadata.timeToNextUpdate}} returns lager one of time till metadata
> expiration or time till backing off expiration but practially needUpdate is
> always true at the first time so here the timeToAllowUpdate is always
> adopted, which never be zero until {{retry.backoff.ms}} elapsed since the
> first {{metadata.update()}}:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L116
> This is because of kafka client tries to keep interval configured by
> {{retry.backoff.ms}} between each metadata update so it's basically works
> fine from the second update but for the first time, since it could never have
> the actual metadata(which is obtained by MetadaUpdate request), this backing
> off isn't making sense and in fact it's harming our application by blocking
> the first {{send()}} insanely long.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)