[ https://issues.apache.org/jira/browse/KAFKA-1933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14311977#comment-14311977 ]
Maxim Ivanov edited comment on KAFKA-1933 at 2/9/15 10:38 AM: -------------------------------------------------------------- Hi, thank for looking into this. Today I realised that my assumptions were wrong, and without checking them I proceeded with writing a patch. Whole idea is based on the fact, that number of messages is known in advance and I used "shallowCount" to get this number for some reason. I don't know what got into my head, because shallow count turned out to be always 1. So the whole offset allocation logic is wrong here. I need to redo it to do decompression before reservin correct range of offsets. I'll redo it in near future and resubmit version 2. As for your concerns, the locking scheme isn't very sophisticated, with a bit of (much needed) cleanup in log.append it will be easy to follow. 1. it will be present in one way or another, because we have to synchronize access in 2 phases and have a constraint that second synhronization should be done in the same order as first one. I am not familiar with JVM/Scala concurrency primitives, I couldn't find anything in java.utils.concurrent which can help me achieving that out of the box. If you prefer I can abstract it into separate class, then from log.Log point of view it will be sequence of actions: 1) register in the queue and obtain the ticket 2) wait in the queue presenting its ticket, where ticket is pair of semaphores, but that would be such a thin shim, that I decided just to do it all in place. If you have other ideas how to synchronize and resynchornize again, keeping the order, I'd be happy utilize your way of doing it if it makes merging patch easier. 2. Non-compressed case as well as "assignOffset = false" mode should be refactored into separate code paths. My thinking was that once idea got approval, I'll do it. So at the end there should be no impact for non-compressed case 3. There must be more than that. Raw gzip compressor is capable of processing ~40MB/sec, Kafka with single topic,single partition, no replication and 5 netcat clients pushing into it prerecorded messages (== infinite pipelining) is capable of doing 8.18 MB/sec, so the overhead of whole Kafka system is massive, especially given the fact that network handling and parsing is done in separate threads. Paralellizing compression seemed to bring most value for time spent on the patch and wouldn't prevent any other optimisations to take place. 4. That was my thinking the moment I discovered why our Kafka servers are choking on CPU, not network or disks, when massive push from hadoop is happening. Kafka had a chance to implement that when protocol was changing in 0.8, but now it is very intrusive, I certainly would not propose it as my first patch :) Changes to Log.append are self containing, least intrusive, very local and most important give a relief to our immediate problem without thinking about migration to new log format or changing client protocol. When there will be anoter breaking change, please do so, but seeing how kafka 0.7 -> 0.8 migration is ongoing, you wont make many friends. was (Author: rossmohax): Hi, thank for looking into this. Today I realised that my assumptions were wrong, and without checking them I proceeded with writing a patch. Whole idea is based on the fact, that number of messages is known in advance and I used "shallowCount" to get this number for some reason. I don't know what got into my head, because shallow count turned out to be always 1. So the whole offset allocation logic is wrong here. I need to redo it to do decompression in critical path, just to reserve correct range of offsets, that in turn will make performance improvement of this patch smaller, but still substantial. I'll redo it in near future and resubmit version 2. As for your concerns, the locking scheme isn't very sophisticated, with a bit of (much needed) cleanup in log.append it will be easy to follow. 1. it will be present in one way or another, because we have to synchronize access in 2 phases and have a constraint that second synhronization should be done in the same order as first one. I am not familiar with JVM/Scala concurrency primitives, I couldn't find anything in java.utils.concurrent which can help me achieving that out of the box. If you prefer I can abstract it into separate class, then from log.Log point of view it will be sequence of actions: 1) register in the queue and obtain the ticket 2) wait in the queue presenting its ticket, where ticket is pair of semaphores, but that would be such a thin shim, that I decided just to do it all in place. If you have other ideas how to synchronize and resynchornize again, keeping the order, I'd be happy utilize your way of doing it if it makes merging patch easier. 2. Non-compressed case as well as "assignOffset = false" mode should be refactored into separate code paths. My thinking was that once idea got approval, I'll do it. So at the end there should be no impact for non-compressed case 3. There must be more than that. Raw gzip compressor is capable of processing ~40MB/sec, Kafka with single topic,single partition, no replication and 5 netcat clients pushing into it prerecorded messages (== infinite pipelining) is capable of doing 8.18 MB/sec, so the overhead of whole Kafka system is massive, especially given the fact that network handling and parsing is done in separate threads. Paralellizing compression seemed to bring most value for time spent on the patch and wouldn't prevent any other optimisations to take place. 4. That was my thinking the moment I discovered why our Kafka servers are choking on CPU, not network or disks, when massive push from hadoop is happening. Kafka had a chance to implement that when protocol was changing in 0.8, but now it is very intrusive, I certainly would not propose it as my first patch :) Changes to Log.append are self containing, least intrusive, very local and most important give a relief to our immediate problem without thinking about migration to new log format or changing client protocol. When there will be anoter breaking change, please do so, but seeing how kafka 0.7 -> 0.8 migration is ongoing, you wont make many friends. > Fine-grained locking in log append > ---------------------------------- > > Key: KAFKA-1933 > URL: https://issues.apache.org/jira/browse/KAFKA-1933 > Project: Kafka > Issue Type: Improvement > Components: log > Reporter: Maxim Ivanov > Assignee: Jay Kreps > Priority: Minor > Fix For: 0.8.2 > > Attachments: KAFKA-1933.patch > > > This patch adds finer locking when appending to log. It breaks > global append lock into 2 sequential and 1 parallel phase. > Basic idea is to allow every thread to "reserve" offsets in non > overlapping ranges, then do compression in parallel and then > "commit" write to log in the same order offsets where reserved. > On my Core i3 M370 @2.4Ghz (2 cores + HT) it resulted in following > performance boost: > LZ4: 7.2 sec -> 3.9 sec > Gzip: 62.3 sec -> 24.8 sec > Kafka was configured to run 4 IO threads, data was pushed using 5 netcat > instances pushing in parallel batches of 200 msg 6.2 kb each (510 MB in > total, 82180 messages in total) -- This message was sent by Atlassian JIRA (v6.3.4#6332)