Edit: I had to set kafka.request.logger=TRACE to see the request timings. On Tue, Dec 30, 2014 at 4:37 PM, Rajiv Kurian <[email protected]> wrote:
> Got it. I had to enable to see the logs. Here are two files with the > timings I got from the period I had logging enabled: > Producer Requests : > https://docs.google.com/spreadsheets/d/1spVFWsf7T8ZmwM0JMYnkBoyYufRhHav50sueFA9ztP8/edit?usp=sharing > > Fetch Requests: > https://docs.google.com/spreadsheets/d/1PMGslvlttNQOd1uFQDkgPOY31nSK9TZYFy7oa75QaWA/edit?usp=sharing > > Things I noted: > 1) The producer request totalTime barely goes above 100 ms and when it > does, it is always due to a high remoteTime. Again given this is barely > ever above 100ms this doesn't look that bad. Maybe I need to keep it on for > longer to get more samples. The overall time on the producer does seem a > bit on the high side though even though it definitely doesn't account for > the 20-30 seconds end to end time I saw. > > 2) The fetch request totalTime does frequently go to a 100 ms odd. It is > also always due to a high remoteTime. The remoteTime is in the ballpark of > 100ms always. I specify a maxWait of 100ms on the client when making my > fetch requests, so it might be this 100ms that I am seeing. My guess is > that the buffer sizes I specified don't get filled all the time, so the > maxWait timeout is hit and we end up waiting a 100 ms on the broker. Again > I don't see how the end to end time could be so poor. > > From these timings (maybe I don't have enough samples) it seems like the > problem is not in the brokers. Does that seem like a legitimate conclusion? > I might have to measure for a longer period, to get the tail. I am planning > on extracting the producer and consumer code to get a self-contained load > test going. I'll do the same end to end lag measurements and see if it's my > environment that is adding this lag somehow. > > Thanks! > > > On Tue, Dec 30, 2014 at 11:58 AM, Jay Kreps <[email protected]> wrote: > >> The produce to the local log is the time taken to write to the local >> filesystem. If you see that spike up then that is the culprit. >> >> What I was referring to is the line in kafka-request.log that looks >> something like: >> "Completed request:%s from client >> >> %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" >> This breaks down the total request time and how much was spent on local >> I/O >> etc. If totalTime is always small the server is not to blame. If it >> spikes, >> then the question is where is that time going and this will answer that. >> >> You actually raise a good point about that other log message. It says "4 >> bytes written to log xyz" but what it is actually logging is the number of >> messages not the number of bytes, so that is quite misleading and a bug. >> >> -Jay >> >> On Mon, Dec 29, 2014 at 6:37 PM, Rajiv Kurian <[email protected]> >> wrote: >> >> > Thanks Jay. >> > >> > I just used JMX to change the log level on the broker and checked the >> logs. >> > I am still not sure what line is exactly telling me how much time a >> > producer took to process a request. I see log lines of this format: >> > >> > 2014-12-30T02:13:44.507Z DEBUG [kafka-request-handler-0 ] >> > [kafka.server.KafkaApis ]: [KafkaApi-11] Produce to local >> log >> > in 5 ms >> > >> > Is this what I should be looking out for? I see producer requests of the >> > form "2014-12-30T02:13:44.502Z TRACE [kafka-request-handler-0 >> ] >> > [kafka.server.KafkaApis ]: [KafkaApi-11] Handling request: >> > Name: ProducerRequest; Version: 0; CorrelationId: 36308; more stuff" >> but I >> > can't quite tell when this request was done with. So right now I see >> that >> > requests are logged frequently on the brokers (multiple times per >> second) >> > but since I don't know when they are finished with I can't tell the >> total >> > time. >> > >> > Some other things that I found kind of odd are: >> > >> > 1) The producer requests seem to have an ack timeout of 30000 ms. I >> don't >> > think I set this on the producer. I don't know if this could have >> anything >> > to do with the latency problem. >> > >> > 2014-12-30T02:13:44.502Z TRACE [kafka-request-handler-0 ] >> > [kafka.server.KafkaApis ]: [KafkaApi-11] Handling request: >> > Name: ProducerRequest; Version: 0; CorrelationId: 36308; ClientId: ; >> > RequiredAcks: 1; *AckTimeoutMs: 30000 ms*; rest of the stuff. >> > >> > 2) I see a bunch of small writes written to my [topic, partition] logs. >> My >> > messages are at a minimum 27 bytes, so maybe this is something else. >> Again >> > don't know if this is a problem: >> > >> > 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-4 ] >> > [kafka.server.KafkaApis ]: [KafkaApi-11] 1 bytes written to >> > log MY.TOPIC-400 beginning at offset 19221923 and ending at offset >> 19221923 >> > >> > 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-4 ] >> > [kafka.server.KafkaApis ]: [KafkaApi-11] 4 bytes written to >> > log MY.TOPIC-208 beginning at offset 29438019 and ending at offset >> 29438022 >> > >> > 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-3 ] >> > [kafka.server.KafkaApis ]: [KafkaApi-11] 1 bytes written to >> > log MY.TOPIC-163 beginning at offset 14821977 and ending at offset >> 14821977 >> > >> > 2014-12-30T02:13:44.500Z TRACE [kafka-request-handler-1 ] >> > [kafka.server.KafkaApis ]: [KafkaApi-11] 1 bytes written to >> > log MY.TOPIC118 beginning at offset 19321510 and ending at offset >> 19321510 >> > >> > 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-3 ] >> > [kafka.server.KafkaApis ]: [KafkaApi-11] 3 bytes written to >> > log MY.TOPIC-463 beginning at offset 28777627 and ending at offset >> 28777629 >> > >> > On Mon, Dec 29, 2014 at 5:43 PM, Jay Kreps <[email protected]> wrote: >> > >> > > Hey Rajiv, >> > > >> > > Yes, if you uncomment the line >> > > #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender >> > > in the example log4j.properties file that will enable logging of each >> > > request including the time taken processing the request. This is the >> > first >> > > step for diagnosing latency spikes since this will rule out the >> server. >> > You >> > > may also want to enable DEBUG or TRACE on the producer and see what is >> > > happening when those spikes occur. >> > > >> > > The JMX stats show that at least the producer is not exhausting the >> I/O >> > > thread (it is 92% idle) and doesn't seem to be waiting on memory >> which is >> > > somewhat surprising to me (the NaN is an artifact of how we compute >> that >> > > stat--no allocations took place in the time period measured so it is >> kind >> > > of 0/0). >> > > >> > > -Jay >> > > >> > > On Mon, Dec 29, 2014 at 4:54 PM, Rajiv Kurian <[email protected]> >> > > wrote: >> > > >> > > > In case the attachments don't work out here is an imgur link - >> > > > http://imgur.com/NslGpT3,Uw6HFow#0 >> > > > >> > > > On Mon, Dec 29, 2014 at 3:13 PM, Rajiv Kurian <[email protected] >> > >> > > > wrote: >> > > > >> > > > > Never mind about (2). I see these stats are already being output >> by >> > the >> > > > > kafka producer. I've attached a couple of screenshots (couldn't >> copy >> > > > paste >> > > > > from jvisualvm ). Do any of these things strike as odd? The >> > > > > bufferpool-wait-ratio sadly shows up as a NaN. >> > > > > >> > > > > I still don't know how to figure out (1). >> > > > > >> > > > > Thanks! >> > > > > >> > > > > >> > > > > On Mon, Dec 29, 2014 at 3:02 PM, Rajiv Kurian < >> [email protected]> >> > > > > wrote: >> > > > > >> > > > >> Hi Jay, >> > > > >> >> > > > >> Re (1) - I am not sure how to do this? Actually I am not sure >> what >> > > this >> > > > >> means. Is this the time every write/fetch request is received on >> the >> > > > >> broker? Do I need to enable some specific log level for this to >> show >> > > > up? It >> > > > >> doesn't show up in the usual log. Is this information also >> available >> > > via >> > > > >> jmx somehow? >> > > > >> Re (2) - Are you saying that I should instrument the "percentage >> of >> > > time >> > > > >> waiting for buffer space" stat myself? If so how do I do this. Or >> > are >> > > > these >> > > > >> stats already output to jmx by the kafka producer code. This >> seems >> > > like >> > > > >> it's in the internals of the kafka producer client code. >> > > > >> >> > > > >> Thanks again! >> > > > >> >> > > > >> >> > > > >> On Mon, Dec 29, 2014 at 10:22 AM, Rajiv Kurian < >> > [email protected]> >> > > > >> wrote: >> > > > >> >> > > > >>> Thanks Jay. Will check (1) and (2) and get back to you. The >> test is >> > > not >> > > > >>> stand-alone now. It might be a bit of work to extract it to a >> > > > stand-alone >> > > > >>> executable. It might take me a bit of time to get that going. >> > > > >>> >> > > > >>> On Mon, Dec 29, 2014 at 9:45 AM, Jay Kreps <[email protected]> >> > wrote: >> > > > >>> >> > > > >>>> Hey Rajiv, >> > > > >>>> >> > > > >>>> This sounds like a bug. The more info you can help us get the >> > easier >> > > > to >> > > > >>>> fix. Things that would help: >> > > > >>>> 1. Can you check if the the request log on the servers shows >> > latency >> > > > >>>> spikes >> > > > >>>> (in which case it is a server problem)? >> > > > >>>> 2. It would be worth also getting the jmx stats on the >> producer as >> > > > they >> > > > >>>> will show things like what percentage of time it is waiting for >> > > buffer >> > > > >>>> space etc. >> > > > >>>> >> > > > >>>> If your test is reasonably stand-alone it would be great to >> file a >> > > > JIRA >> > > > >>>> and >> > > > >>>> attach the test code and the findings you already have so >> someone >> > > can >> > > > >>>> dig >> > > > >>>> into what is going on. >> > > > >>>> >> > > > >>>> -Jay >> > > > >>>> >> > > > >>>> On Sun, Dec 28, 2014 at 7:15 PM, Rajiv Kurian < >> > [email protected] >> > > > >> > > > >>>> wrote: >> > > > >>>> >> > > > >>>> > Hi all, >> > > > >>>> > >> > > > >>>> > Bumping this up, in case some one has any ideas. I did yet >> > another >> > > > >>>> > experiment where I create 4 producers and stripe the send >> > requests >> > > > >>>> across >> > > > >>>> > them in a manner such that any one producer only sees 256 >> > > partitions >> > > > >>>> > instead of the entire 1024. This seems to have helped a bit, >> and >> > > > >>>> though I >> > > > >>>> > still see crazy high 99th (25-30 seconds), the median, mean, >> > 75th >> > > > and >> > > > >>>> 95th >> > > > >>>> > percentile have all gone down. >> > > > >>>> > >> > > > >>>> > Thanks! >> > > > >>>> > >> > > > >>>> > On Sun, Dec 21, 2014 at 12:27 PM, Thunder Stumpges < >> > > > >>>> [email protected]> >> > > > >>>> > wrote: >> > > > >>>> > >> > > > >>>> > > Ah I thought it was restarting the broker that made things >> > > better >> > > > :) >> > > > >>>> > > >> > > > >>>> > > Yeah I have no experience with the Java client so can't >> really >> > > > help >> > > > >>>> > there. >> > > > >>>> > > >> > > > >>>> > > Good luck! >> > > > >>>> > > >> > > > >>>> > > -----Original Message----- >> > > > >>>> > > From: Rajiv Kurian [[email protected]] >> > > > >>>> > > Received: Sunday, 21 Dec 2014, 12:25PM >> > > > >>>> > > To: [email protected] [[email protected]] >> > > > >>>> > > Subject: Re: Trying to figure out kafka latency issues >> > > > >>>> > > >> > > > >>>> > > I'll take a look at the GC profile of the brokers Right >> now I >> > > keep >> > > > >>>> a tab >> > > > >>>> > on >> > > > >>>> > > the CPU, Messages in, Bytes in, Bytes out, free memory (on >> the >> > > > >>>> machine >> > > > >>>> > not >> > > > >>>> > > JVM heap) free disk space on the broker. I'll need to take >> a >> > > look >> > > > >>>> at the >> > > > >>>> > > JVM metrics too. What seemed strange is that going from 8 >> -> >> > 512 >> > > > >>>> > partitions >> > > > >>>> > > increases the latency, but going fro 512-> 8 does not >> decrease >> > > it. >> > > > >>>> I have >> > > > >>>> > > to restart the producer (but not the broker) for the end to >> > end >> > > > >>>> latency >> > > > >>>> > to >> > > > >>>> > > go down That made it seem that the fault was probably with >> > the >> > > > >>>> producer >> > > > >>>> > > and not the broker. Only restarting the producer made >> things >> > > > >>>> better. I'll >> > > > >>>> > > do more extensive measurement on the broker. >> > > > >>>> > > >> > > > >>>> > > On Sun, Dec 21, 2014 at 9:08 AM, Thunder Stumpges < >> > > > >>>> [email protected]> >> > > > >>>> > > wrote: >> > > > >>>> > > > >> > > > >>>> > > > Did you see my response and have you checked the server >> logs >> > > > >>>> especially >> > > > >>>> > > > the GC logs? It still sounds like you are running out of >> > > memory >> > > > >>>> on the >> > > > >>>> > > > broker. What is your max heap memory and are you >> thrashing >> > > once >> > > > >>>> you >> > > > >>>> > start >> > > > >>>> > > > writing to all those partitions? >> > > > >>>> > > > >> > > > >>>> > > > You have measured very thoroughly from an external point >> of >> > > > view, >> > > > >>>> i >> > > > >>>> > think >> > > > >>>> > > > now you'll have to start measuring the internal metrics. >> > Maybe >> > > > >>>> someone >> > > > >>>> > > else >> > > > >>>> > > > will have ideas on what jmx values to watch. >> > > > >>>> > > > >> > > > >>>> > > > Best, >> > > > >>>> > > > Thunder >> > > > >>>> > > > >> > > > >>>> > > > >> > > > >>>> > > > -----Original Message----- >> > > > >>>> > > > From: Rajiv Kurian [[email protected]] >> > > > >>>> > > > Received: Saturday, 20 Dec 2014, 10:24PM >> > > > >>>> > > > To: [email protected] [[email protected]] >> > > > >>>> > > > Subject: Re: Trying to figure out kafka latency issues >> > > > >>>> > > > >> > > > >>>> > > > Some more work tells me that the end to end latency >> numbers >> > > vary >> > > > >>>> with >> > > > >>>> > the >> > > > >>>> > > > number of partitions I am writing to. I did an >> experiment, >> > > where >> > > > >>>> based >> > > > >>>> > > on a >> > > > >>>> > > > run time flag I would dynamically select how many of the >> > *1024 >> > > > >>>> > > partitions* >> > > > >>>> > > > I write to. So say I decide I'll write to at most 256 >> > > partitions >> > > > >>>> I mod >> > > > >>>> > > > whatever partition I would actually write to by 256. >> > Basically >> > > > the >> > > > >>>> > number >> > > > >>>> > > > of partitions for this topic on the broker remains the >> same >> > at >> > > > >>>> *1024* >> > > > >>>> > > > partitions but the number of partitions my producers >> write >> > to >> > > > >>>> changes >> > > > >>>> > > > dynamically based on a run time flag. So something like >> > this: >> > > > >>>> > > > >> > > > >>>> > > > int partition = getPartitionForMessage(message); >> > > > >>>> > > > int maxPartitionsToWriteTo = maxPartitionsFlag.get(); >> // >> > > This >> > > > >>>> flag >> > > > >>>> > can >> > > > >>>> > > be >> > > > >>>> > > > updated without bringing the application down - just a >> > > volatile >> > > > >>>> read of >> > > > >>>> > > > some number set externally. >> > > > >>>> > > > int moddedPartition = partition % maxPartitionsToWrite. >> > > > >>>> > > > // Send a message to this Kafka partition. >> > > > >>>> > > > >> > > > >>>> > > > Here are some interesting things I've noticed: >> > > > >>>> > > > >> > > > >>>> > > > i) When I start my client and it *never writes* to more >> than >> > > *8 >> > > > >>>> > > > partitions *(same >> > > > >>>> > > > data rate but fewer partitions) - the end to end *99th >> > latency >> > > > is >> > > > >>>> > 300-350 >> > > > >>>> > > > ms*. Quite a bit of this (numbers in my previous emails) >> is >> > > the >> > > > >>>> latency >> > > > >>>> > > > from producer -> broker and the latency from broker -> >> > > consumer. >> > > > >>>> Still >> > > > >>>> > > > nowhere as poor as the *20 - 30* seconds I was seeing. >> > > > >>>> > > > >> > > > >>>> > > > ii) When I increase the maximum number of partitions, >> end to >> > > end >> > > > >>>> > latency >> > > > >>>> > > > increases dramatically. At *256 partitions* the end to >> end >> > > *99th >> > > > >>>> > latency >> > > > >>>> > > is >> > > > >>>> > > > still 390 - 418 ms.* Worse than the latency figures for >> *8 >> > > > >>>> *partitions, >> > > > >>>> > > but >> > > > >>>> > > > not by much. When I increase this number to *512 >> partitions >> > > *the >> > > > >>>> end >> > > > >>>> > > > to end *99th >> > > > >>>> > > > latency *becomes an intolerable *19-24 seconds*. At >> *1024* >> > > > >>>> partitions >> > > > >>>> > the >> > > > >>>> > > > *99th >> > > > >>>> > > > latency is at 25 - 30 seconds*. >> > > > >>>> > > > A table of the numbers: >> > > > >>>> > > > >> > > > >>>> > > > Max number of partitions written to (out of 1024) >> > > > >>>> > > > >> > > > >>>> > > > End to end latency >> > > > >>>> > > > >> > > > >>>> > > > 8 >> > > > >>>> > > > >> > > > >>>> > > > 300 - 350 ms >> > > > >>>> > > > >> > > > >>>> > > > 256 >> > > > >>>> > > > >> > > > >>>> > > > 390 - 418 ms >> > > > >>>> > > > >> > > > >>>> > > > 512 >> > > > >>>> > > > >> > > > >>>> > > > 19 - 24 seconds >> > > > >>>> > > > >> > > > >>>> > > > 1024 >> > > > >>>> > > > >> > > > >>>> > > > 25 - 30 seconds >> > > > >>>> > > > >> > > > >>>> > > > >> > > > >>>> > > > iii) Once I make the max number of partitions high >> enough, >> > > > >>>> reducing it >> > > > >>>> > > > doesn't help. For eg: If I go up from *8* to *512 >> > *partitions, >> > > > the >> > > > >>>> > > latency >> > > > >>>> > > > goes up. But while the producer is running if I go down >> from >> > > > >>>> *512* to >> > > > >>>> > > > *8 *partitions, >> > > > >>>> > > > it doesn't reduce the latency numbers. My guess is that >> the >> > > > >>>> producer is >> > > > >>>> > > > creating some state lazily per partition and this state >> is >> > > > >>>> causing the >> > > > >>>> > > > latency. Once this state is created, writing to fewer >> > > partitions >> > > > >>>> > doesn't >> > > > >>>> > > > seem to help. Only a restart of the producer calms things >> > > down. >> > > > >>>> > > > >> > > > >>>> > > > So my current plan is to reduce the number of partitions >> on >> > > the >> > > > >>>> topic, >> > > > >>>> > > but >> > > > >>>> > > > there seems to be something deeper going on for the >> latency >> > > > >>>> numbers to >> > > > >>>> > be >> > > > >>>> > > > so poor to begin with and then suffer so much more (non >> > > > linearly) >> > > > >>>> with >> > > > >>>> > > > additional partitions. >> > > > >>>> > > > >> > > > >>>> > > > Thanks! >> > > > >>>> > > > >> > > > >>>> > > > On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian < >> > > > >>>> [email protected]> >> > > > >>>> > > > wrote: >> > > > >>>> > > > > >> > > > >>>> > > > > I've done some more measurements. I've also started >> > > measuring >> > > > >>>> the >> > > > >>>> > > latency >> > > > >>>> > > > > between when I ask my producer to send a message and >> when >> > I >> > > > get >> > > > >>>> an >> > > > >>>> > > > > acknowledgement via the callback. Here is my code: >> > > > >>>> > > > > >> > > > >>>> > > > > // This function is called on every producer once >> every 30 >> > > > >>>> seconds. >> > > > >>>> > > > > >> > > > >>>> > > > > public void addLagMarkers(final Histogram enqueueLag) { >> > > > >>>> > > > > >> > > > >>>> > > > > final int numberOfPartitions = 1024; >> > > > >>>> > > > > >> > > > >>>> > > > > final long timeOfEnqueue = >> > > System.currentTimeMillis(); >> > > > >>>> > > > > >> > > > >>>> > > > > final Callback callback = new Callback() { >> > > > >>>> > > > > >> > > > >>>> > > > > @Override >> > > > >>>> > > > > >> > > > >>>> > > > > public void onCompletion(RecordMetadata >> > > metadata, >> > > > >>>> > Exception >> > > > >>>> > > > ex) >> > > > >>>> > > > > { >> > > > >>>> > > > > >> > > > >>>> > > > > if (metadata != null) { >> > > > >>>> > > > > >> > > > >>>> > > > > // The difference between ack time >> > from >> > > > >>>> broker >> > > > >>>> > and >> > > > >>>> > > > > enqueue time. >> > > > >>>> > > > > >> > > > >>>> > > > > final long timeOfAck = >> > > > >>>> > System.currentTimeMillis(); >> > > > >>>> > > > > >> > > > >>>> > > > > final long lag = timeOfAck - >> > > > timeOfEnqueue; >> > > > >>>> > > > > >> > > > >>>> > > > > enqueueLag.update(lag); >> > > > >>>> > > > > >> > > > >>>> > > > > } >> > > > >>>> > > > > >> > > > >>>> > > > > } >> > > > >>>> > > > > >> > > > >>>> > > > > }; >> > > > >>>> > > > > >> > > > >>>> > > > > for (int i = 0; i < numberOfPartitions; i++) { >> > > > >>>> > > > > >> > > > >>>> > > > > try { >> > > > >>>> > > > > >> > > > >>>> > > > > byte[] value = >> > > > >>>> LagMarker.serialize(timeOfEnqueue); >> > > > >>>> > // >> > > > >>>> > > 10 >> > > > >>>> > > > > bytes -> short version + long timestamp. >> > > > >>>> > > > > >> > > > >>>> > > > > // This message is later used by the >> > > consumers >> > > > >>>> to >> > > > >>>> > > measure >> > > > >>>> > > > > lag. >> > > > >>>> > > > > >> > > > >>>> > > > > ProducerRecord record = new >> > > > >>>> ProducerRecord(MY_TOPIC, >> > > > >>>> > i, >> > > > >>>> > > > > null, value); >> > > > >>>> > > > > >> > > > >>>> > > > > kafkaProducer.send(record, callback); >> > > > >>>> > > > > >> > > > >>>> > > > > } catch (Exception e) { >> > > > >>>> > > > > >> > > > >>>> > > > > // We just dropped a lag marker. >> > > > >>>> > > > > >> > > > >>>> > > > > } >> > > > >>>> > > > > >> > > > >>>> > > > > } >> > > > >>>> > > > > >> > > > >>>> > > > > } >> > > > >>>> > > > > >> > > > >>>> > > > > The* 99th* on this lag is about* 350 - 400* ms. It's >> not >> > > > >>>> stellar, but >> > > > >>>> > > > > doesn't account for the *20-30 second 99th* I see on >> the >> > end >> > > > to >> > > > >>>> end >> > > > >>>> > > lag. >> > > > >>>> > > > > I am consuming in a tight loop on the Consumers (using >> the >> > > > >>>> > > > SimpleConsumer) >> > > > >>>> > > > > with minimal processing with a *99th fetch time *of >> > > *130-140* >> > > > >>>> ms, so >> > > > >>>> > I >> > > > >>>> > > > > don't think that should be a problem either. Completely >> > > > baffled. >> > > > >>>> > > > > >> > > > >>>> > > > > >> > > > >>>> > > > > Thanks! >> > > > >>>> > > > > >> > > > >>>> > > > > >> > > > >>>> > > > > >> > > > >>>> > > > > On Sat, Dec 20, 2014 at 5:51 PM, Rajiv Kurian < >> > > > >>>> [email protected]> >> > > > >>>> > > > > wrote: >> > > > >>>> > > > >> >> > > > >>>> > > > >> >> > > > >>>> > > > >> >> > > > >>>> > > > >> On Sat, Dec 20, 2014 at 3:49 PM, Rajiv Kurian < >> > > > >>>> [email protected] >> > > > >>>> > > >> > > > >>>> > > > >> wrote: >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> I am trying to replace a Thrift peer to peer API with >> > > kafka >> > > > >>>> for a >> > > > >>>> > > > >>> particular work flow. I am finding the 99th >> percentile >> > > > >>>> latency to >> > > > >>>> > be >> > > > >>>> > > > >>> unacceptable at this time. This entire work load >> runs in >> > > an >> > > > >>>> Amazon >> > > > >>>> > > > VPC. I'd >> > > > >>>> > > > >>> greatly appreciate it if some one has any insights on >> > why >> > > I >> > > > am >> > > > >>>> > seeing >> > > > >>>> > > > such >> > > > >>>> > > > >>> poor numbers. Here are some details and measurements >> > > taken: >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> i) I have a single topic with 1024 partitions that I >> am >> > > > >>>> writing to >> > > > >>>> > > from >> > > > >>>> > > > >>> six clients using the kafka 0.8.2 beta kafka >> producer. >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> ii) I have 3 brokers, each on a c3 2x machine on >> ec2. >> > > Each >> > > > of >> > > > >>>> > those >> > > > >>>> > > > >>> machines has 8 virtual cpus, 15 GB memory and 2 * 80 >> GB >> > > > SSDs. >> > > > >>>> The >> > > > >>>> > > > broker -> >> > > > >>>> > > > >>> partitions mapping was decided by Kafka when I >> created >> > the >> > > > >>>> topic. >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> iii) I write about 22 thousand messages per second >> from >> > > > >>>> across the >> > > > >>>> > 6 >> > > > >>>> > > > >>> clients. This number was calculated using distributed >> > > > >>>> counters. I >> > > > >>>> > > just >> > > > >>>> > > > >>> increment a distributed counter in the callback from >> my >> > > > >>>> enqueue job >> > > > >>>> > > if >> > > > >>>> > > > the >> > > > >>>> > > > >>> metadata returned is not null. I also increment a >> > number o >> > > > >>>> dropped >> > > > >>>> > > > messages >> > > > >>>> > > > >>> counter if the callback has a non-null exception or >> if >> > > there >> > > > >>>> was an >> > > > >>>> > > > >>> exception in the synchronous send call. The number of >> > > > dropped >> > > > >>>> > > messages >> > > > >>>> > > > is >> > > > >>>> > > > >>> pretty much always zero. Out of the 6 clients 3 are >> > > > >>>> responsible for >> > > > >>>> > > > 95% of >> > > > >>>> > > > >>> the traffic. Messages are very tiny and have null >> keys >> > > and >> > > > >>>> 27 byte >> > > > >>>> > > > values >> > > > >>>> > > > >>> (2 byte version and 25 byte payload). Again these >> > messages >> > > > are >> > > > >>>> > > written >> > > > >>>> > > > >>> using the kafka 0.8.2 client. >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> iv) I have 3 consumer processes consuming only from >> this >> > > > >>>> topic. >> > > > >>>> > Each >> > > > >>>> > > > >>> consumer process is assigned a disjoint set of the >> 1024 >> > > > >>>> partitions >> > > > >>>> > by >> > > > >>>> > > > an >> > > > >>>> > > > >>> eternal arbiter. Each consumer process then creates a >> > > > mapping >> > > > >>>> from >> > > > >>>> > > > brokers >> > > > >>>> > > > >>> -> partitions it has been assigned. It then starts >> one >> > > > fetcher >> > > > >>>> > thread >> > > > >>>> > > > per >> > > > >>>> > > > >>> broker. Each thread queries the broker (using the >> > > > >>>> SimpleConsumer) >> > > > >>>> > it >> > > > >>>> > > > has >> > > > >>>> > > > >>> been assigned for partitions such that partitions = >> > > > >>>> (partitions on >> > > > >>>> > > > broker) >> > > > >>>> > > > >>> *∩ *(partitions assigned to the process by the >> arbiter). >> > > So >> > > > in >> > > > >>>> > effect >> > > > >>>> > > > >>> there are 9 consumer threads across 3 consumer >> processes >> > > > that >> > > > >>>> > query a >> > > > >>>> > > > >>> disjoint set of partitions for the single topic and >> > > amongst >> > > > >>>> > > themselves >> > > > >>>> > > > they >> > > > >>>> > > > >>> manage to consume from every topic. So each thread >> has a >> > > run >> > > > >>>> loop >> > > > >>>> > > > where it >> > > > >>>> > > > >>> asks for messages, consumes them then asks for more >> > > > messages. >> > > > >>>> When >> > > > >>>> > a >> > > > >>>> > > > run >> > > > >>>> > > > >>> loop starts, it starts by querying for the latest >> > message >> > > > in a >> > > > >>>> > > > partition >> > > > >>>> > > > >>> (i.e. discards any previous backup) and then >> maintains a >> > > map >> > > > >>>> of >> > > > >>>> > > > partition >> > > > >>>> > > > >>> -> nextOffsetToRequest in memory to make sure that it >> > > > consumes >> > > > >>>> > > > messages in >> > > > >>>> > > > >>> order. >> > > > >>>> > > > >>> >> > > > >>>> > > > >> Edit: Mean't external arbiter. >> > > > >>>> > > > >> >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> v) Consumption is really simple. Each message is put >> on >> > a >> > > > non >> > > > >>>> > > blocking >> > > > >>>> > > > >>> efficient ring buffer. If the ring buffer is full the >> > > > message >> > > > >>>> is >> > > > >>>> > > > dropped. I >> > > > >>>> > > > >>> measure the mean time between fetches and it is the >> same >> > > as >> > > > >>>> the >> > > > >>>> > time >> > > > >>>> > > > for >> > > > >>>> > > > >>> fetches up to a ms, meaning no matter how many >> messages >> > > are >> > > > >>>> > dequeued, >> > > > >>>> > > > it >> > > > >>>> > > > >>> takes almost no time to process them. At the end of >> > > > >>>> processing a >> > > > >>>> > > fetch >> > > > >>>> > > > >>> request I increment another distributed counter that >> > > counts >> > > > >>>> the >> > > > >>>> > > number >> > > > >>>> > > > of >> > > > >>>> > > > >>> messages processed. This counter tells me that on >> > average >> > > I >> > > > am >> > > > >>>> > > > consuming >> > > > >>>> > > > >>> the same number of messages/sec that I enqueue on the >> > > > >>>> producers >> > > > >>>> > i.e. >> > > > >>>> > > > around >> > > > >>>> > > > >>> 22 thousand messages/sec. >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> vi) The 99th percentile of the number of messages >> > fetched >> > > > per >> > > > >>>> fetch >> > > > >>>> > > > >>> request is about 500 messages. The 99th percentile >> of >> > the >> > > > >>>> time it >> > > > >>>> > > > takes to >> > > > >>>> > > > >>> fetch a batch is abut 130 - 140 ms. I played around >> with >> > > the >> > > > >>>> buffer >> > > > >>>> > > and >> > > > >>>> > > > >>> maxWait settings on the SimpleConsumer and >> attempting to >> > > > >>>> consume >> > > > >>>> > more >> > > > >>>> > > > >>> messages was leading the 99th percentile of the fetch >> > time >> > > > to >> > > > >>>> > balloon >> > > > >>>> > > > up, >> > > > >>>> > > > >>> so I am consuming in smaller batches right now. >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> vii) Every 30 seconds odd each of the producers >> inserts >> > a >> > > > >>>> trace >> > > > >>>> > > message >> > > > >>>> > > > >>> into each partition (so 1024 messages per producer >> every >> > > 30 >> > > > >>>> > seconds). >> > > > >>>> > > > Each >> > > > >>>> > > > >>> message only contains the System.currentTimeMillis() >> at >> > > the >> > > > >>>> time of >> > > > >>>> > > > >>> enqueueing. It is about 9 bytes long. The consumer in >> > step >> > > > (v) >> > > > >>>> > always >> > > > >>>> > > > >>> checks to see if a message it dequeued is of this >> trace >> > > type >> > > > >>>> > (instead >> > > > >>>> > > > of >> > > > >>>> > > > >>> the regular message type). This is a simple check on >> the >> > > > >>>> first 2 >> > > > >>>> > byte >> > > > >>>> > > > >>> version that each message buffer contains. If it is a >> > > trace >> > > > >>>> message >> > > > >>>> > > it >> > > > >>>> > > > >>> reads it's payload as a long and uses the difference >> > > between >> > > > >>>> the >> > > > >>>> > > system >> > > > >>>> > > > >>> time on the consumer and this payload and updates a >> > > > histogram >> > > > >>>> with >> > > > >>>> > > this >> > > > >>>> > > > >>> difference value. Ignoring NTP skew (which I've >> measured >> > > to >> > > > >>>> be in >> > > > >>>> > the >> > > > >>>> > > > order >> > > > >>>> > > > >>> of milliseconds) this is the lag between when a >> message >> > > was >> > > > >>>> > enqueued >> > > > >>>> > > > on the >> > > > >>>> > > > >>> producer and when it was read on the consumer. >> > > > >>>> > > > >>> >> > > > >>>> > > > >> Edit: Trace messages are 10 bytes long - 2byte >> version + >> > 8 >> > > > >>>> byte long >> > > > >>>> > > for >> > > > >>>> > > > >> timestamp. >> > > > >>>> > > > >> >> > > > >>>> > > > >> So pseudocode for every consumer thread (one per >> broker >> > per >> > > > >>>> consumer >> > > > >>>> > > > >>> process (9 in total across 3 consumers) is: >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> void run() { >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> while (running) { >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> FetchRequest fetchRequest = buildFetchRequest( >> > > > >>>> > > > >>> partitionsAssignedToThisProcessThatFallOnThisBroker); >> > // >> > > > This >> > > > >>>> > > > >>> assignment is done by an external arbiter. >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> measureTimeBetweenFetchRequests(); // The 99th >> of >> > > this >> > > > >>>> is >> > > > >>>> > > > 130-140ms >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> FetchResponse fetchResponse = >> > > > >>>> fetchResponse(fetchRequest); // >> > > > >>>> > > The >> > > > >>>> > > > >>> 99th of this is 130-140ms, which is the same as the >> time >> > > > >>>> between >> > > > >>>> > > > fetches. >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> processData(fetchResponse); // The 99th on this >> is >> > > 2-3 >> > > > >>>> ms. >> > > > >>>> > > > >>> } >> > > > >>>> > > > >>> } >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> void processData(FetchResponse response) { >> > > > >>>> > > > >>> try (Timer.Context _ = >> processWorkerDataTimer.time() { >> > > // >> > > > >>>> The >> > > > >>>> > 99th >> > > > >>>> > > > on >> > > > >>>> > > > >>> this is 2-3 ms. >> > > > >>>> > > > >>> for (Message message : response) { >> > > > >>>> > > > >>> if (typeOf(message) == NORMAL_DATA) { >> > > > >>>> > > > >>> enqueueOnNonBlockingRingBuffer(message); // >> > > Never >> > > > >>>> blocks, >> > > > >>>> > > > >>> drops message if ring buffer is full. >> > > > >>>> > > > >>> } else if (typeOf(message) == TRACE_DATA) { >> > > > >>>> > > > >>> long timeOfEnqueue = geEnqueueTime(message); >> > > > >>>> > > > >>> long currentTime = >> System.currentTimeMillis(); >> > > > >>>> > > > >>> long difference = currentTime - >> timeOfEnqueue; >> > > > >>>> > > > >>> lagHistogram.update(difference); >> > > > >>>> > > > >>> } >> > > > >>>> > > > >>> } >> > > > >>>> > > > >>> } >> > > > >>>> > > > >>> } >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> viii) So the kicker is that this lag is really really >> > > high: >> > > > >>>> > > > >>> Median Lag: *200 ms*. This already seems pretty high >> > and I >> > > > >>>> could >> > > > >>>> > even >> > > > >>>> > > > >>> discount some of it through NTP skew. >> > > > >>>> > > > >>> Mean Lag: *2 - 4 seconds*! Also notice how mean is >> > bigger >> > > > than >> > > > >>>> > median >> > > > >>>> > > > >>> kind of telling us how the distribution has a big >> tail. >> > > > >>>> > > > >>> 99th Lag: *20 seconds*!! >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> I can't quite figure out the source of the lag. Here >> are >> > > > some >> > > > >>>> other >> > > > >>>> > > > >>> things of note: >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> 1) The brokers in general are at about 40-50% CPU >> but I >> > > see >> > > > >>>> > > occasional >> > > > >>>> > > > >>> spikes to more than 80% - 95%. This could probably be >> > > > causing >> > > > >>>> > issues. >> > > > >>>> > > > I am >> > > > >>>> > > > >>> wondering if this is down to the high number of >> > > partitions I >> > > > >>>> have >> > > > >>>> > and >> > > > >>>> > > > how >> > > > >>>> > > > >>> each partition ends up receiving not too many >> messages. >> > > 22k >> > > > >>>> > messages >> > > > >>>> > > > spread >> > > > >>>> > > > >>> across 1024 partitions = only 21 odd >> > > messages/sec/partition. >> > > > >>>> But >> > > > >>>> > each >> > > > >>>> > > > >>> broker should be receiving about 7500 messages/sec. >> It >> > > > could >> > > > >>>> also >> > > > >>>> > be >> > > > >>>> > > > down >> > > > >>>> > > > >>> to the nature of these messages being tiny. I imagine >> > that >> > > > the >> > > > >>>> > kafka >> > > > >>>> > > > wire >> > > > >>>> > > > >>> protocol is probably close to the 25 byte message >> > payload. >> > > > It >> > > > >>>> also >> > > > >>>> > > has >> > > > >>>> > > > to >> > > > >>>> > > > >>> do a few CRC32 calculations etc. But again given >> that >> > > these >> > > > >>>> are >> > > > >>>> > > C3.2x >> > > > >>>> > > > >>> machines and the number of messages is so tiny I'd >> > expect >> > > it >> > > > >>>> to do >> > > > >>>> > > > better. >> > > > >>>> > > > >>> But again reading this article >> > > > >>>> > > > >>> < >> > > > >>>> > > > >> > > > >>>> > > >> > > > >>>> > >> > > > >>>> >> > > > >> > > >> > >> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines >> > > > >>>> > > > >> > > > >>>> > > > about >> > > > >>>> > > > >>> 2 million messages/sec across three brokers makes it >> > seem >> > > > >>>> like 22k >> > > > >>>> > 25 >> > > > >>>> > > > byte >> > > > >>>> > > > >>> messages should be very easy. So my biggest suspicion >> > > right >> > > > >>>> now is >> > > > >>>> > > > that the >> > > > >>>> > > > >>> large number of partitions is some how responsible >> for >> > > this >> > > > >>>> > latency. >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> 2) On the consumer given that the 99th time between >> > > fetches >> > > > >>>> == 99th >> > > > >>>> > > > time >> > > > >>>> > > > >>> of a fetch, I don't see how I could do any better. >> It's >> > > > pretty >> > > > >>>> > close >> > > > >>>> > > to >> > > > >>>> > > > >>> just getting batches of data, iterating through them >> and >> > > > >>>> dropping >> > > > >>>> > > them >> > > > >>>> > > > on >> > > > >>>> > > > >>> the floor. My requests on the consumer are built like >> > > this: >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> FetchRequestBuilder builder = new >> > > > >>>> > > > >>> FetchRequestBuilder().clientId(clientName) >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> .maxWait(100).minBytes(1000); // max >> > wait >> > > > of >> > > > >>>> 100 >> > > > >>>> > ms >> > > > >>>> > > or >> > > > >>>> > > > >>> at least 1000 bytes whichever happens first. >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> // Add a fetch request for every partition. >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> for (PartitionMetadata partition : >> partitions) { >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> int partitionId = >> partition.partitionId(); >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> long offset = readOffsets.get(partition); >> > // >> > > > >>>> This map >> > > > >>>> > > > >>> maintains the next partition to be read. >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> builder.addFetch(MY_TOPIC, partitionId, >> > > offset, >> > > > >>>> 3000); >> > > > >>>> > > // >> > > > >>>> > > > >>> Up to 3000 bytes per fetch request.} >> > > > >>>> > > > >>> } >> > > > >>>> > > > >>> 3) On the producer side I use the kafka beta >> producer. >> > > I've >> > > > >>>> played >> > > > >>>> > > > >>> around with many settings but none of them seem to >> have >> > > > made a >> > > > >>>> > > > difference. >> > > > >>>> > > > >>> Here are my current settings: >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> ProducerConfig values: >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> block.on.buffer.full = false >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> retry.backoff.ms = 100 >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> buffer.memory = 83886080 // Kind of big - >> could >> > > it >> > > > be >> > > > >>>> > adding >> > > > >>>> > > > >>> lag? >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> batch.size = 40500 // This is also kind of >> big >> > - >> > > > >>>> could it >> > > > >>>> > be >> > > > >>>> > > > >>> adding lag? >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> metrics.sample.window.ms = 30000 >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> metadata.max.age.ms = 300000 >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> receive.buffer.bytes = 32768 >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> timeout.ms = 30000 >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> max.in.flight.requests.per.connection = 5 >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> metric.reporters = [] >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> bootstrap.servers = [broker1, broker2, >> broker3] >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> client.id = >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> compression.type = none >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> retries = 0 >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> max.request.size = 1048576 >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> send.buffer.bytes = 131072 >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> acks = 1 >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> reconnect.backoff.ms = 10 >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> linger.ms = 250 // Based on this config I'd >> > > > imagine >> > > > >>>> that >> > > > >>>> > > the >> > > > >>>> > > > >>> lag should be bounded to about 250 ms over all. >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> metrics.num.samples = 2 >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> metadata.fetch.timeout.ms = 60000 >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> I know this is a lot of information. I just wanted to >> > > > provide >> > > > >>>> as >> > > > >>>> > much >> > > > >>>> > > > >>> context as possible. >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> Thanks in advance! >> > > > >>>> > > > >>> >> > > > >>>> > > > >>> >> > > > >>>> > > > >> > > > >>>> > > >> > > > >>>> > >> > > > >>>> >> > > > >>> >> > > > >>> >> > > > >> >> > > > > >> > > > >> > > >> > >> > >
