[ https://issues.apache.org/jira/browse/KAFKA-16259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Satish Duggana reassigned KAFKA-16259: -------------------------------------- Assignee: Zhifeng Chen > Immutable MetadataCache to improve client performance > ----------------------------------------------------- > > Key: KAFKA-16259 > URL: https://issues.apache.org/jira/browse/KAFKA-16259 > Project: Kafka > Issue Type: Improvement > Components: clients > Affects Versions: 2.8.0 > Reporter: Zhifeng Chen > Assignee: Zhifeng Chen > Priority: Major > Attachments: image-2024-02-14-12-11-07-366.png > > > TL;DR, A Kafka client produce latency issue is identified caused by > synchronized lock contention of metadata cache read/write in the native kafka > producer. > Trigger Condition: A producer need to produce to large number of topics. such > as in kafka rest-proxy > > > What is producer metadata cache > Kafka producer maintains a in-memory copy of cluster metadata, and it avoided > fetch metadata every time when produce message to reduce latency > > What’s the synchronized lock contention problem > Kafka producer metadata cache is a *mutable* object, read/write are isolated > by a synchronized lock. Which means when the metadata cache is being updated, > all read requests are blocked. > Topic metadata expiration frequency increase liner with number of topics. In > a kafka cluster with large number of topic partitions, topic metadata > expiration and refresh triggers high frequent metadata update. When read > operation blocked by update, producer threads are blocked and caused high > produce latency issue. > > *Proposed solution* > TL;DR Optimize performance of metadata cache read operation of native kafka > producer with copy-on-write strategy > What is copy-on-write strategy > It’s a solution to reduce synchronized lock contention by making the object > immutable, and always create a new instance when updating, but since the > object is immutable, read operation will be free from waiting, thus produce > latency reduced significantly > Besides performance, it can also make the metadata cache immutable from > unexpected modification, reduce occurrence of code bugs due to incorrect > synchronization > > {*}Test result{*}: > Environment: Kafka-rest-proxy > Client version: 2.8.0 > Number of topic partitions: 250k > test result show 90%+ latency reduction on test cluster > !image-2024-02-14-12-11-07-366.png! > P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper > part show latency after the improvement, lower part show before improvement) > *Dump show details of the problem* > Threads acquiring lock > Kafka-rest-proxy-jetty-thread-pool-199waiting to acquire [ > 0x00007f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-200waiting to acquire [ > 0x00007f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-202waiting to acquire [ > 0x00007f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-203waiting to acquire [ > 0x00007f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-204waiting to acquire [ > 0x00007f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-205waiting to acquire [ > 0x00007f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-207waiting to acquire [ > 0x00007f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-212waiting to acquire [ > 0x00007f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-214waiting to acquire [ > 0x00007f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-215waiting to acquire [ > 0x00007f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-217waiting to acquire [ > 0x00007f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-218waiting to acquire [ > 0x00007f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-219waiting to acquire [ > 0x00007f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-222waiting to acquire [ > 0x00007f77d70121a0 ] > ... > at org.apache.kafka.clients.Metadata.fetch(Metadata.java:111) > at > org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1019) > at > org.apache.kafka.clients.producer.KafkaProducer.partitionsFor(KafkaProducer.java:1144) > at > io.confluent.kafkarest.producer.internal.MetadataImpl.maybeUpdate(MetadataImpl.java:39) > at > io.confluent.kafkarest.producer.ResilientProducer.send(ResilientProducer.java:117) > Threads hold the lock > kafka-producer-network-thread | kafka-rest-proxyrunning , holding [ > 0x00007f77d70121a0 ] > at > java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(java.base@11.0.18/ArrayList.java:1655) > at > java.util.stream.AbstractPipeline.copyInto(java.base@11.0.18/AbstractPipeline.java:484) > at > org.apache.kafka.common.requests.MetadataResponse.convertToNodeArray(MetadataResponse.java:162) > at > org.apache.kafka.common.requests.MetadataResponse.toPartitionInfo(MetadataResponse.java:152) > at > org.apache.kafka.clients.MetadataCache.lambda$computeClusterView$1(MetadataCache.java:177) > at > org.apache.kafka.clients.MetadataCache$$Lambda$695/0x00007f75da3ddcb0.apply(Unknown > Source) > at > java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195) > at > org.apache.kafka.clients.MetadataCache.computeClusterView(MetadataCache.java:178) > at java.lang.Thread.run(java.base@11.0.18/Thread.java:829) -- This message was sent by Atlassian Jira (v8.20.10#820010)