On Sat, Dec 20, 2014 at 3:49 PM, Rajiv Kurian <ra...@signalfuse.com> wrote: > > I am trying to replace a Thrift peer to peer API with kafka for a > particular work flow. I am finding the 99th percentile latency to be > unacceptable at this time. This entire work load runs in an Amazon VPC. I'd > greatly appreciate it if some one has any insights on why I am seeing such > poor numbers. Here are some details and measurements taken: > > i) I have a single topic with 1024 partitions that I am writing to from > six clients using the kafka 0.8.2 beta kafka producer. > > ii) I have 3 brokers, each on a c3 2x machine on ec2. Each of those > machines has 8 virtual cpus, 15 GB memory and 2 * 80 GB SSDs. The broker -> > partitions mapping was decided by Kafka when I created the topic. > > iii) I write about 22 thousand messages per second from across the 6 > clients. This number was calculated using distributed counters. I just > increment a distributed counter in the callback from my enqueue job if the > metadata returned is not null. I also increment a number o dropped messages > counter if the callback has a non-null exception or if there was an > exception in the synchronous send call. The number of dropped messages is > pretty much always zero. Out of the 6 clients 3 are responsible for 95% of > the traffic. Messages are very tiny and have null keys and 27 byte values > (2 byte version and 25 byte payload). Again these messages are written > using the kafka 0.8.2 client. > > iv) I have 3 consumer processes consuming only from this topic. Each > consumer process is assigned a disjoint set of the 1024 partitions by an > eternal arbiter. Each consumer process then creates a mapping from brokers > -> partitions it has been assigned. It then starts one fetcher thread per > broker. Each thread queries the broker (using the SimpleConsumer) it has > been assigned for partitions such that partitions = (partitions on broker) > *∩ *(partitions assigned to the process by the arbiter). So in effect > there are 9 consumer threads across 3 consumer processes that query a > disjoint set of partitions for the single topic and amongst themselves they > manage to consume from every topic. So each thread has a run loop where it > asks for messages, consumes them then asks for more messages. When a run > loop starts, it starts by querying for the latest message in a partition > (i.e. discards any previous backup) and then maintains a map of partition > -> nextOffsetToRequest in memory to make sure that it consumes messages in > order. > Edit: Mean't external arbiter.
> > v) Consumption is really simple. Each message is put on a non blocking > efficient ring buffer. If the ring buffer is full the message is dropped. I > measure the mean time between fetches and it is the same as the time for > fetches up to a ms, meaning no matter how many messages are dequeued, it > takes almost no time to process them. At the end of processing a fetch > request I increment another distributed counter that counts the number of > messages processed. This counter tells me that on average I am consuming > the same number of messages/sec that I enqueue on the producers i.e. around > 22 thousand messages/sec. > > vi) The 99th percentile of the number of messages fetched per fetch > request is about 500 messages. The 99th percentile of the time it takes to > fetch a batch is abut 130 - 140 ms. I played around with the buffer and > maxWait settings on the SimpleConsumer and attempting to consume more > messages was leading the 99th percentile of the fetch time to balloon up, > so I am consuming in smaller batches right now. > > vii) Every 30 seconds odd each of the producers inserts a trace message > into each partition (so 1024 messages per producer every 30 seconds). Each > message only contains the System.currentTimeMillis() at the time of > enqueueing. It is about 9 bytes long. The consumer in step (v) always > checks to see if a message it dequeued is of this trace type (instead of > the regular message type). This is a simple check on the first 2 byte > version that each message buffer contains. If it is a trace message it > reads it's payload as a long and uses the difference between the system > time on the consumer and this payload and updates a histogram with this > difference value. Ignoring NTP skew (which I've measured to be in the order > of milliseconds) this is the lag between when a message was enqueued on the > producer and when it was read on the consumer. > Edit: Trace messages are 10 bytes long - 2byte version + 8 byte long for timestamp. So pseudocode for every consumer thread (one per broker per consumer > process (9 in total across 3 consumers) is: > > void run() { > > while (running) { > > FetchRequest fetchRequest = buildFetchRequest( > partitionsAssignedToThisProcessThatFallOnThisBroker); // This assignment > is done by an external arbiter. > > measureTimeBetweenFetchRequests(); // The 99th of this is 130-140ms > > FetchResponse fetchResponse = fetchResponse(fetchRequest); // The > 99th of this is 130-140ms, which is the same as the time between fetches. > > processData(fetchResponse); // The 99th on this is 2-3 ms. > } > } > > void processData(FetchResponse response) { > try (Timer.Context _ = processWorkerDataTimer.time() { // The 99th on > this is 2-3 ms. > for (Message message : response) { > if (typeOf(message) == NORMAL_DATA) { > enqueueOnNonBlockingRingBuffer(message); // Never blocks, drops > message if ring buffer is full. > } else if (typeOf(message) == TRACE_DATA) { > long timeOfEnqueue = geEnqueueTime(message); > long currentTime = System.currentTimeMillis(); > long difference = currentTime - timeOfEnqueue; > lagHistogram.update(difference); > } > } > } > } > > viii) So the kicker is that this lag is really really high: > Median Lag: *200 ms*. This already seems pretty high and I could even > discount some of it through NTP skew. > Mean Lag: *2 - 4 seconds*! Also notice how mean is bigger than median > kind of telling us how the distribution has a big tail. > 99th Lag: *20 seconds*!! > > I can't quite figure out the source of the lag. Here are some other things > of note: > > 1) The brokers in general are at about 40-50% CPU but I see occasional > spikes to more than 80% - 95%. This could probably be causing issues. I am > wondering if this is down to the high number of partitions I have and how > each partition ends up receiving not too many messages. 22k messages spread > across 1024 partitions = only 21 odd messages/sec/partition. But each > broker should be receiving about 7500 messages/sec. It could also be down > to the nature of these messages being tiny. I imagine that the kafka wire > protocol is probably close to the 25 byte message payload. It also has to > do a few CRC32 calculations etc. But again given that these are C3.2x > machines and the number of messages is so tiny I'd expect it to do better. > But again reading this article > <https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines> > about > 2 million messages/sec across three brokers makes it seem like 22k 25 byte > messages should be very easy. So my biggest suspicion right now is that the > large number of partitions is some how responsible for this latency. > > 2) On the consumer given that the 99th time between fetches == 99th time > of a fetch, I don't see how I could do any better. It's pretty close to > just getting batches of data, iterating through them and dropping them on > the floor. My requests on the consumer are built like this: > > FetchRequestBuilder builder = new FetchRequestBuilder().clientId( > clientName) > > .maxWait(100).minBytes(1000); // max wait of 100 ms or > at least 1000 bytes whichever happens first. > > // Add a fetch request for every partition. > > for (PartitionMetadata partition : partitions) { > > int partitionId = partition.partitionId(); > > long offset = readOffsets.get(partition); // This map > maintains the next partition to be read. > > builder.addFetch(MY_TOPIC, partitionId, offset, 3000); // Up > to 3000 bytes per fetch request.} > } > 3) On the producer side I use the kafka beta producer. I've played around > with many settings but none of them seem to have made a difference. Here > are my current settings: > > ProducerConfig values: > > block.on.buffer.full = false > > retry.backoff.ms = 100 > > buffer.memory = 83886080 // Kind of big - could it be adding lag? > > batch.size = 40500 // This is also kind of big - could it be > adding lag? > > metrics.sample.window.ms = 30000 > > metadata.max.age.ms = 300000 > > receive.buffer.bytes = 32768 > > timeout.ms = 30000 > > max.in.flight.requests.per.connection = 5 > > metric.reporters = [] > > bootstrap.servers = [broker1, broker2, broker3] > > client.id = > > compression.type = none > > retries = 0 > > max.request.size = 1048576 > > send.buffer.bytes = 131072 > > acks = 1 > > reconnect.backoff.ms = 10 > > linger.ms = 250 // Based on this config I'd imagine that the lag > should be bounded to about 250 ms over all. > > metrics.num.samples = 2 > > metadata.fetch.timeout.ms = 60000 > > I know this is a lot of information. I just wanted to provide as much > context as possible. > > Thanks in advance! > >