[ 
https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864532#comment-16864532
 ] 

Guozhang Wang commented on KAFKA-8106:
--------------------------------------

Okay guys after some further benchmarks now I think I've finally realized the 
difference in perf:

1. our current code would allocate a byte buffer for each record deserialized:

{code}
        ByteBuffer recordBuffer = ByteBuffer.allocate(sizeOfBodyInBytes);
        input.readFully(recordBuffer.array(), 0, sizeOfBodyInBytes);
{code}

the byte array size is `sizeOfBodyInBytes`, and it can cause GC pressure;

2. the skipBytes implementation tries to not allocate such a big byte array; 
HOWEVER depending on the compressionType the underlying implementation is 
different:

2.a. LZ4 used KafkaLZ4BlockInputStream, which has a shared decompressionBuffer, 
default size 65536. It means, for a batch of records, we will only have a 
single allocated buffer.

2.b. All other compressionType used BufferedDataInputStream, whose skipBytes is 
implemented as

{code}
int size = (int)Math.min(MAX_SKIP_BUFFER_SIZE, remaining);
        byte[] skipBuffer = new byte[size];
        while (remaining > 0) {
            nr = read(skipBuffer, 0, (int)Math.min(size, remaining));
            if (nr < 0) {
                break;
            }
            remaining -= nr;
        }
{code}


I.e. each call will allocate a new byte buffer, which is different from 2.a.

3. Our current implementation is slightly better than 2.b because we still only 
allocate one buffer although we need to `skip` three fields (key, value, and 
array of headers). But it is still inferior to 2.a, which only use a single 
buffer; current implementation is ONLY beneficial if the record size is larger 
than 2048, whereas 2.a's approach, as demonstrated by the original author whose 
used LZ4, is much better even for 1KB message size. So our perf numbers on 
other compression types than LZ4 would not show much benefits until record size 
is much larger than 2048.

So I think if we want to get the similar performance boost for all compression 
types as the original PR did for LZ4, we then need to have single shared buffer 
associated with the `InputStream` object, generated from `wrapForInput`, which 
can then be used for all the records within a batch. (edited) 

> Reducing the allocation and copying of ByteBuffer  when logValidator  do 
> validation.
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8106
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8106
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 2.2.0, 2.1.1
>         Environment: Server : 
> cpu:2*16 ; 
> MemTotal : 256G;
> Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network 
> Connection ; 
> SSD.
>            Reporter: Flower.min
>            Assignee: Flower.min
>            Priority: Major
>              Labels: performance
>
>       We do performance testing about Kafka in specific scenarios as 
> described below .We build a kafka cluster with one broker,and create topics 
> with different number of partitions.Then we start lots of producer processes 
> to send large amounts of messages to one of the topics at one  testing .
> *_Specific Scenario_*
>   
>  *_1.Main config of Kafka_*  
>  # Main config of Kafka  
> server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/]
>  # Number of TopicPartition : 50~2000
>  # Size of Single Message : 1024B
>  
>  *_2.Config of KafkaProducer_* 
> ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory||
> |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB|
> *_3.The best result of performance testing_*  
> ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of 
> production||
> |550MB/s~610MB/s|97%~99%|550MB/s~610MB/s       |23,000,000 messages/s|
> *_4.Phenomenon and  my doubt_*
>         _The upper limit of CPU usage has been reached  But  it does not 
> reach the upper limit of the bandwidth of the server  network. *We are 
> doubtful about which  cost too much CPU time and we want to Improve  
> performance and reduces CPU usage of Kafka server.*_
> _*5.Analysis*_
>         We analysis the JFIR of Kafka server when doing performance testing 
> .After we checked and completed the performance test again, we located the 
> code "*ByteBuffer recordBuffer = 
> ByteBuffer.allocate(sizeOfBodyInBytes);*(*Class:DefaultRecord,Function:readFrom()*)”
>  which consumed CPU resources and caused a lot of GC .Our modified code 
> reduces the allocation and copying of ByteBuffer, so the test performance is 
> greatly improved, and the CPU's stable usage is *below 60%*. The following is 
> a comparison of different code test performance under the same conditions.
> *Result of performance testing*
> *Main config of Kafka: Single 
> Message:1024B;TopicPartitions:200;linger.ms:1000ms.*
> | Single Message : 1024B,|Network inflow rate|CPU(%)|Messages/s|
> |Source code|600M/s|97%|25,000,000|
> |Modified code|1GB/s|<60%|41,660,000|
> **1.Before modified code(Source code) GC:**
> ![](https://i.loli.net/2019/05/07/5cd16df163ad3.png)
> **2.After modified code(remove allocation of ByteBuffer) GC:**
> ![](https://i.loli.net/2019/05/07/5cd16dae1dbc2.png)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to