[ 
https://issues.apache.org/jira/browse/KAFKA-1933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311977#comment-14311977
 ] 

Maxim Ivanov edited comment on KAFKA-1933 at 2/9/15 10:38 AM:
--------------------------------------------------------------

Hi, thank for looking into this. 

Today I realised that my assumptions were wrong, and without checking them I 
proceeded with writing a patch. Whole idea is based on the fact, that number of 
messages is known in advance and I used "shallowCount" to get this number for 
some reason. I don't know what got into my head, because shallow count turned 
out to be always 1. So the whole offset allocation logic is wrong here. I need 
to redo it to do decompression before reservin correct range of offsets. I'll 
redo it in near future and resubmit version 2.

As for your concerns, the locking scheme isn't very sophisticated, with a bit 
of (much needed) cleanup in log.append it will be easy to follow.

1. it will be present in one way or another, because we have to synchronize 
access in 2 phases and have a constraint that second synhronization should be 
done in the same order as first one. I am not familiar with JVM/Scala 
concurrency primitives, I couldn't find anything in java.utils.concurrent which 
can help me achieving that out of the box. If you prefer I can abstract it into 
separate class, then from log.Log point of view it will be sequence of actions: 
1) register in the queue and obtain the ticket 2) wait in the queue presenting 
its ticket, where ticket is pair of semaphores, but that would be such a thin 
shim, that I decided just to do it all in place. If you have other ideas how to 
synchronize and resynchornize again, keeping the order, I'd be happy utilize 
your way of doing it if it makes merging patch easier.

2. Non-compressed case as well as "assignOffset = false" mode should be 
refactored into separate code paths. My thinking was that once idea got 
approval, I'll do it. So at the end there should be no impact for 
non-compressed case

3. There must be more than that. Raw gzip compressor is capable of processing 
~40MB/sec, Kafka with single topic,single partition, no replication and 5 
netcat clients pushing into it prerecorded messages (== infinite pipelining)  
is capable of doing 8.18 MB/sec, so the overhead of whole Kafka system is 
massive, especially given the fact that network handling and parsing is done in 
separate threads. Paralellizing compression seemed to bring most value for time 
spent on the patch and wouldn't prevent any other optimisations to take place.

4. That was my thinking the moment I discovered why our Kafka servers are 
choking on CPU, not network or disks, when massive push from hadoop is 
happening. Kafka had a chance to implement that when protocol was changing in 
0.8, but now it is very intrusive, I certainly would not propose it as my first 
patch :) Changes to Log.append are self containing, least intrusive, very local 
and most important give a relief to our immediate problem without thinking 
about migration to new log format or changing client protocol. When there will 
be anoter breaking change, please do so, but seeing how kafka 0.7 -> 0.8 
migration is ongoing, you wont make many friends.



was (Author: rossmohax):
Hi, thank for looking into this. 

Today I realised that my assumptions were wrong, and without checking them I 
proceeded with writing a patch. Whole idea is based on the fact, that number of 
messages is known in advance and I used "shallowCount" to get this number for 
some reason. I don't know what got into my head, because shallow count turned 
out to be always 1. So the whole offset allocation logic is wrong here. I need 
to redo it to do decompression in critical path, just to reserve correct range 
of offsets, that in turn will make performance improvement of this patch 
smaller, but still substantial. I'll redo it in near future and resubmit 
version 2.

As for your concerns, the locking scheme isn't very sophisticated, with a bit 
of (much needed) cleanup in log.append it will be easy to follow.

1. it will be present in one way or another, because we have to synchronize 
access in 2 phases and have a constraint that second synhronization should be 
done in the same order as first one. I am not familiar with JVM/Scala 
concurrency primitives, I couldn't find anything in java.utils.concurrent which 
can help me achieving that out of the box. If you prefer I can abstract it into 
separate class, then from log.Log point of view it will be sequence of actions: 
1) register in the queue and obtain the ticket 2) wait in the queue presenting 
its ticket, where ticket is pair of semaphores, but that would be such a thin 
shim, that I decided just to do it all in place. If you have other ideas how to 
synchronize and resynchornize again, keeping the order, I'd be happy utilize 
your way of doing it if it makes merging patch easier.

2. Non-compressed case as well as "assignOffset = false" mode should be 
refactored into separate code paths. My thinking was that once idea got 
approval, I'll do it. So at the end there should be no impact for 
non-compressed case

3. There must be more than that. Raw gzip compressor is capable of processing 
~40MB/sec, Kafka with single topic,single partition, no replication and 5 
netcat clients pushing into it prerecorded messages (== infinite pipelining)  
is capable of doing 8.18 MB/sec, so the overhead of whole Kafka system is 
massive, especially given the fact that network handling and parsing is done in 
separate threads. Paralellizing compression seemed to bring most value for time 
spent on the patch and wouldn't prevent any other optimisations to take place.

4. That was my thinking the moment I discovered why our Kafka servers are 
choking on CPU, not network or disks, when massive push from hadoop is 
happening. Kafka had a chance to implement that when protocol was changing in 
0.8, but now it is very intrusive, I certainly would not propose it as my first 
patch :) Changes to Log.append are self containing, least intrusive, very local 
and most important give a relief to our immediate problem without thinking 
about migration to new log format or changing client protocol. When there will 
be anoter breaking change, please do so, but seeing how kafka 0.7 -> 0.8 
migration is ongoing, you wont make many friends.


> Fine-grained locking in log append
> ----------------------------------
>
>                 Key: KAFKA-1933
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1933
>             Project: Kafka
>          Issue Type: Improvement
>          Components: log
>            Reporter: Maxim Ivanov
>            Assignee: Jay Kreps
>            Priority: Minor
>             Fix For: 0.8.2
>
>         Attachments: KAFKA-1933.patch
>
>
> This patch adds finer locking when appending to log. It breaks
> global append lock into 2 sequential and 1 parallel phase.
> Basic idea is to allow every thread to "reserve" offsets in non
> overlapping ranges, then do compression in parallel and then
> "commit" write to log in the same order offsets where reserved.
> On my Core i3 M370 @2.4Ghz (2 cores + HT) it resulted in following 
> performance boost:
> LZ4: 7.2 sec -> 3.9 sec
> Gzip: 62.3 sec -> 24.8 sec
> Kafka was configured to run 4 IO threads, data was pushed using 5 netcat 
> instances pushing in parallel batches of 200 msg 6.2 kb each (510 MB in 
> total, 82180 messages in total)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to