[ 
https://issues.apache.org/jira/browse/KAFKA-3428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15203919#comment-15203919
 ] 

Ismael Juma commented on KAFKA-3428:
------------------------------------

Say someone wrote code like:

{code}
synchronized (metadata) {
  cluster.update(...)
  ....
  cluster.update(...)
}
{code}

With the current code, `metadata.fetch()` would block until all updates had 
been performed. With the suggested change, that would no longer be the case (a 
good example why exposing the lock object makes reasoning harder). In either 
case, it doesn't protect against the case where the user is reading a `cluster` 
instance returned from `fetch()` without any synchronization. I filed 
KAFKA-3432 to see if we can remove the `update` method, which seems inherently 
unsafe.

> Remove metadata sync bottleneck from mirrormaker's producer
> -----------------------------------------------------------
>
>                 Key: KAFKA-3428
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3428
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Maysam Yabandeh
>
> Due to sync on the single producer, MM in a setup with 32 consumer threads 
> could not send more than 
> 358k msg/sec hence not being able to saturate the NIC. Profiling showed the 
> producer.send takes 0.080 ms in average, which explains the bottleneck of 
> 358k msg/sec. The following explains the bottleneck in producer.send and 
> suggests how to improve it.
> Current impl of MM relies on a single reducer. For EACH message, the 
> producer.send() calls waitOnMetadata which runs the following synchronized 
> method
> {code}
>         // add topic to metadata topic list if it is not there already.
>         if (!this.metadata.containsTopic(topic))
>             this.metadata.add(topic);
> {code}
> Although the code is mostly noop, since containsTopic is synchronized it 
> becomes the bottleneck in MM.
> Profiling highlights this bottleneck:
> {code}
> 100.0% - 65,539 ms kafka.tools.MirrorMaker$MirrorMakerThread.run
>   18.9% - 12,403 ms org.apache.kafka.clients.producer.KafkaProducer.send
>   13.8% - 9,056 ms 
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
>   12.1% - 7,933 ms org.apache.kafka.clients.Metadata.containsTopic
>   1.7% - 1,088 ms org.apache.kafka.clients.Metadata.fetch
>   2.6% - 1,729 ms org.apache.kafka.clients.Metadata.fetch
>   2.2% - 1,442 ms 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append
> {code}
> After replacing this bottleneck with a kind of noop, another run of the 
> profiler shows that fetch is the next bottleneck:
> {code}
> org.xerial.snappy.SnappyNative.arrayCopy       132 s (54 %)   n/a     n/a
>       java.lang.Thread.run     50,776 ms (21 %)       n/a     n/a
>       org.apache.kafka.clients.Metadata.fetch  20,881 ms (8 %)        n/a     
> n/a
>   6.8% - 16,546 ms 
> org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
>   6.8% - 16,546 ms org.apache.kafka.clients.producer.KafkaProducer.send
>   6.8% - 16,546 ms kafka.tools.MirrorMaker$MirrorMakerProducer.send
> {code}
> however the fetch method does not need to be syncronized
> {code}
>     public synchronized Cluster fetch() {
>         return this.cluster;
>     }
> {code}
> removing sync from the fetch method shows that bottleneck is disappeared:
> {code}
> org.xerial.snappy.SnappyNative.arrayCopy       249 s (78 %)   n/a     n/a
>       org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel  
>  24,489 ms (7 %)        n/a     n/a
>       org.xerial.snappy.SnappyNative.rawUncompress     17,024 ms (5 %)        
> n/a     n/a
>       org.apache.kafka.clients.producer.internals.RecordAccumulator.append    
>  13,817 ms (4 %)        n/a     n/a
>   4.3% - 13,817 ms org.apache.kafka.clients.producer.KafkaProducer.send
> {code}
> Internally we have applied a patch to remove this bottleneck. The patch does 
> the following:
> 1. replace HashSet with a concurrent hash set
> 2. remove sync from containsTopic and fetch
> 3. pass a replica of topics to getClusterForCurrentTopics since this 
> synchronized method access topics at two locations and topics being hanged in 
> the middle might mess with the semantics.
> Any interest in applying this patch? Any alternative suggestions?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to