[ 
https://issues.apache.org/jira/browse/KAFKA-6088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reopened KAFKA-6088:
--------------------------------

> Kafka Consumer slows down when reading from highly compacted topics
> -------------------------------------------------------------------
>
>                 Key: KAFKA-6088
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6088
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.10.2.1
>            Reporter: James Cheng
>             Fix For: 0.11.0.0
>
>
> Summary of the issue
> -----
> We found a performance issue with the Kafka Consumer where it gets less 
> efficient if you have frequent gaps in offsets (which happens when there is 
> lots of compaction on the topic).
> The issue is present in 0.10.2.1 and possibly prior.
> It is fixed in 0.11.0.0.
> Summary of cause
> -----
> The fetcher code assumes that there will be no gaps in message offsets. If 
> there are, it does an additional round trip to the broker. For topics with 
> large gaps in offsets, it is possible that most calls to {{poll()}} will 
> generate a roundtrip to the broker.
> Background and details 
> -----
> We have a topic with roughly 8 million records. The topic is log compacted. 
> It turns out that most of the initial records in the topic were never 
> overwritten, whereas in the 2nd half of the topic we had lots of overwritten 
> records. That means that for the first part of the topic, there are no gaps 
> in offsets. But in the 2nd part of the topic, there are frequent gaps in the 
> offsets (due to records being compacted away).
> We have a consumer that starts up and reads the entire topic from beginning 
> to end. We noticed that the consumer would read through the first part of the 
> topic very quickly. When it got to the part of the topic with frequent gaps 
> in offsets, consumption rate slowed down dramatically. This slowdown was 
> consistent across multiple runs.
> What is happening is this:
> 1) A call to {{poll()}} happens. The consumer goes to the broker and returns 
> 1MB of data (the default of {{max.partition.fetch.bytes}}). It then returns 
> to the caller just 500 records (the default of {{max.poll.records}}), and 
> keeps the rest of the data in memory to use in future calls to {{poll()}}. 
> 2) Before returning the 500 records, the consumer library records the *next* 
> offset it should return. It does so by taking the offset of the last record, 
> and adds 1 to it. (The offset of the 500th message from the set, plus 1). It 
> calls this the {{nextOffset}}
> 3) The application finishes processing the 500 messages, and makes another 
> call to {{poll()}} happens. During this call, the consumer library does a 
> sanity check. It checks that the first message of the set *it is about to 
> return* has an offset that matches the value of {{nextOffset}}. That is it 
> checks if the 501th record has an offset that is 1 greater than the 500th 
> record.
>       a. If it matches, then it returns an additional 500 records, and 
> increments the {{nextOffset}} to (offset of the 1000th record, plus 1)
>       b. If it doesn't match, then it throws away the remainder of the 1MB of 
> data that it stored in memory in step 1, and it goes back to the broker to 
> fetch an additional 1MB of data, starting at the offset {{nextOffset}}.
> In topics have no gaps (a non-compacted topic), then the code will always hit 
> the 3a code path.
> If the topic has gaps in offsets and the call to {{poll()}} happens to fall 
> onto a gap, then the code will hit code path 3b.
> If the gaps are frequent, then it will frequently hit code path 3b.
> The worst case scenario that can happen is if you have a large number of 
> gaps, and you run with {{max.poll.records=1}}. Every gap will result in a new 
> fetch to the broker. You may possibly end up only processing one message per 
> fetch. Or, said another way, you will end up doing a single fetch for every 
> single message in the partition.
> Repro
> -----
> We created a repro. It appears that the bug is in 0.10.2.1, but was fixed in 
> 0.11. I've attached the tarball with all the code and instructions. 
> The repro is:
> 1) Create a single partition topic with log compaction turned on 
> 2) Write messages with the following keys: 1 1 2 2 3 3 4 4 5 5 ... (each 
> message key written twice in a row) 
> 3) Let compaction happen. This would mean that that offsets 0 2 4 6 8 10 ... 
> would be compacted away 
> 4) Consume from this topic with {{max.poll.records=1}}
> More concretely,
> Here is the producer code:
> {code}
> Producer<String, String> producer = new KafkaProducer<String, String>(props); 
> for (int i = 0; i < 1000000; i++) { 
>     producer.send(new ProducerRecord<String, String>("compacted", 
> Integer.toString(i), Integer.toString(i))); 
>     producer.send(new ProducerRecord<String, String>("compacted", 
> Integer.toString(i), Integer.toString(i))); 
> } 
> producer.flush(); 
> producer.close();
> {code}
> When consuming with a 0.10.2.1 consumer, you can see this pattern (with 
> Fetcher logs at DEBUG, see file consumer_0.10.2/debug.log):
> {code}
> offset = 1, key = 0, value = 0 
> 22:58:51.262 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring 
> fetched records for compacted-0 at offset 3 since the current position is 2 
> 22:58:51.263 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch 
> for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
> offset = 3, key = 1, value = 1 
> 22:58:51.299 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring 
> fetched records for compacted-0 at offset 5 since the current position is 4 
> 22:58:51.299 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch 
> for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
> offset = 5, key = 2, value = 2 
> 22:58:51.337 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring 
> fetched records for compacted-0 at offset 7 since the current position is 6 
> 22:58:51.337 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch 
> for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
> offset = 7, key = 3, value = 3 
> 22:58:51.361 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring 
> fetched records for compacted-0 at offset 9 since the current position is 8 
> 22:58:51.361 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch 
> for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
> offset = 9, key = 4, value = 4 
> 22:58:51.382 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring 
> fetched records for compacted-0 at offset 11 since the current position is 10 
> 22:58:51.382 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch 
> for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
> offset = 11, key = 5, value = 5 
> 22:58:51.404 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring 
> fetched records for compacted-0 at offset 13 since the current position is 12 
> 22:58:51.404 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch 
> for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
> offset = 13, key = 6, value = 6 
> 22:58:51.424 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring 
> fetched records for compacted-0 at offset 15 since the current position is 14 
> 22:58:51.424 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch 
> for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null)
> {code}
> When consuming with a 0.11.0.1 consumer ,you can see the following pattern: 
> (see file consumer_0.11/debug.log): 
> {code}
> offset = 1, key = 0, value = 0 
> offset = 3, key = 1, value = 1 
> offset = 5, key = 2, value = 2 
> offset = 7, key = 3, value = 3 
> offset = 9, key = 4, value = 4 
> offset = 11, key = 5, value = 5 
> offset = 13, key = 6, value = 6 
> offset = 15, key = 7, value = 7 
> offset = 17, key = 8, value = 8 
> offset = 19, key = 9, value = 9 
> offset = 21, key = 10, value = 10 
> {code}
> From looking at the github history, it appears it was fixed in 
> https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0
> Specifically, this line 
> https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0#diff-b45245913eaae46aa847d2615d62cde0L930
> Was replaced by this line: 
> https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0#diff-b45245913eaae46aa847d2615d62cde0R933
> Mitigation
> -----
> This problem is fixed in 0.11.0.0. If you can upgrade to 0.11.0.0, then you 
> will not be affected by the problem.
> If you cannot upgrade to 0.11.0.0, then you can reduce the impact of this by 
> increasing the value of {{max.poll.records}}. This works because check 
> happens on each call to {{poll()}}, and increasing the value of 
> {{max.poll.records}} will reduce the number of calls to {{poll()}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to