I've done some more measurements. I've also started measuring the latency between when I ask my producer to send a message and when I get an acknowledgement via the callback. Here is my code:
// This function is called on every producer once every 30 seconds. public void addLagMarkers(final Histogram enqueueLag) { final int numberOfPartitions = 1024; final long timeOfEnqueue = System.currentTimeMillis(); final Callback callback = new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception ex) { if (metadata != null) { // The difference between ack time from broker and enqueue time. final long timeOfAck = System.currentTimeMillis(); final long lag = timeOfAck - timeOfEnqueue; enqueueLag.update(lag); } } }; for (int i = 0; i < numberOfPartitions; i++) { try { byte[] value = LagMarker.serialize(timeOfEnqueue); // 10 bytes -> short version + long timestamp. // This message is later used by the consumers to measure lag. ProducerRecord record = new ProducerRecord(MY_TOPIC, i, null, value); kafkaProducer.send(record, callback); } catch (Exception e) { // We just dropped a lag marker. } } } The* 99th* on this lag is about* 350 - 400* ms. It's not stellar, but doesn't account for the *20-30 second 99th* I see on the end to end lag. I am consuming in a tight loop on the Consumers (using the SimpleConsumer) with minimal processing with a *99th fetch time *of *130-140* ms, so I don't think that should be a problem either. Completely baffled. Thanks! On Sat, Dec 20, 2014 at 5:51 PM, Rajiv Kurian <ra...@signalfuse.com> wrote: > > > > On Sat, Dec 20, 2014 at 3:49 PM, Rajiv Kurian <ra...@signalfuse.com> > wrote: >> >> I am trying to replace a Thrift peer to peer API with kafka for a >> particular work flow. I am finding the 99th percentile latency to be >> unacceptable at this time. This entire work load runs in an Amazon VPC. I'd >> greatly appreciate it if some one has any insights on why I am seeing such >> poor numbers. Here are some details and measurements taken: >> >> i) I have a single topic with 1024 partitions that I am writing to from >> six clients using the kafka 0.8.2 beta kafka producer. >> >> ii) I have 3 brokers, each on a c3 2x machine on ec2. Each of those >> machines has 8 virtual cpus, 15 GB memory and 2 * 80 GB SSDs. The broker -> >> partitions mapping was decided by Kafka when I created the topic. >> >> iii) I write about 22 thousand messages per second from across the 6 >> clients. This number was calculated using distributed counters. I just >> increment a distributed counter in the callback from my enqueue job if the >> metadata returned is not null. I also increment a number o dropped messages >> counter if the callback has a non-null exception or if there was an >> exception in the synchronous send call. The number of dropped messages is >> pretty much always zero. Out of the 6 clients 3 are responsible for 95% of >> the traffic. Messages are very tiny and have null keys and 27 byte values >> (2 byte version and 25 byte payload). Again these messages are written >> using the kafka 0.8.2 client. >> >> iv) I have 3 consumer processes consuming only from this topic. Each >> consumer process is assigned a disjoint set of the 1024 partitions by an >> eternal arbiter. Each consumer process then creates a mapping from brokers >> -> partitions it has been assigned. It then starts one fetcher thread per >> broker. Each thread queries the broker (using the SimpleConsumer) it has >> been assigned for partitions such that partitions = (partitions on broker) >> *∩ *(partitions assigned to the process by the arbiter). So in effect >> there are 9 consumer threads across 3 consumer processes that query a >> disjoint set of partitions for the single topic and amongst themselves they >> manage to consume from every topic. So each thread has a run loop where it >> asks for messages, consumes them then asks for more messages. When a run >> loop starts, it starts by querying for the latest message in a partition >> (i.e. discards any previous backup) and then maintains a map of partition >> -> nextOffsetToRequest in memory to make sure that it consumes messages in >> order. >> > Edit: Mean't external arbiter. > >> >> v) Consumption is really simple. Each message is put on a non blocking >> efficient ring buffer. If the ring buffer is full the message is dropped. I >> measure the mean time between fetches and it is the same as the time for >> fetches up to a ms, meaning no matter how many messages are dequeued, it >> takes almost no time to process them. At the end of processing a fetch >> request I increment another distributed counter that counts the number of >> messages processed. This counter tells me that on average I am consuming >> the same number of messages/sec that I enqueue on the producers i.e. around >> 22 thousand messages/sec. >> >> vi) The 99th percentile of the number of messages fetched per fetch >> request is about 500 messages. The 99th percentile of the time it takes to >> fetch a batch is abut 130 - 140 ms. I played around with the buffer and >> maxWait settings on the SimpleConsumer and attempting to consume more >> messages was leading the 99th percentile of the fetch time to balloon up, >> so I am consuming in smaller batches right now. >> >> vii) Every 30 seconds odd each of the producers inserts a trace message >> into each partition (so 1024 messages per producer every 30 seconds). Each >> message only contains the System.currentTimeMillis() at the time of >> enqueueing. It is about 9 bytes long. The consumer in step (v) always >> checks to see if a message it dequeued is of this trace type (instead of >> the regular message type). This is a simple check on the first 2 byte >> version that each message buffer contains. If it is a trace message it >> reads it's payload as a long and uses the difference between the system >> time on the consumer and this payload and updates a histogram with this >> difference value. Ignoring NTP skew (which I've measured to be in the order >> of milliseconds) this is the lag between when a message was enqueued on the >> producer and when it was read on the consumer. >> > Edit: Trace messages are 10 bytes long - 2byte version + 8 byte long for > timestamp. > > So pseudocode for every consumer thread (one per broker per consumer >> process (9 in total across 3 consumers) is: >> >> void run() { >> >> while (running) { >> >> FetchRequest fetchRequest = buildFetchRequest( >> partitionsAssignedToThisProcessThatFallOnThisBroker); // This >> assignment is done by an external arbiter. >> >> measureTimeBetweenFetchRequests(); // The 99th of this is 130-140ms >> >> FetchResponse fetchResponse = fetchResponse(fetchRequest); // The >> 99th of this is 130-140ms, which is the same as the time between fetches. >> >> processData(fetchResponse); // The 99th on this is 2-3 ms. >> } >> } >> >> void processData(FetchResponse response) { >> try (Timer.Context _ = processWorkerDataTimer.time() { // The 99th on >> this is 2-3 ms. >> for (Message message : response) { >> if (typeOf(message) == NORMAL_DATA) { >> enqueueOnNonBlockingRingBuffer(message); // Never blocks, drops >> message if ring buffer is full. >> } else if (typeOf(message) == TRACE_DATA) { >> long timeOfEnqueue = geEnqueueTime(message); >> long currentTime = System.currentTimeMillis(); >> long difference = currentTime - timeOfEnqueue; >> lagHistogram.update(difference); >> } >> } >> } >> } >> >> viii) So the kicker is that this lag is really really high: >> Median Lag: *200 ms*. This already seems pretty high and I could even >> discount some of it through NTP skew. >> Mean Lag: *2 - 4 seconds*! Also notice how mean is bigger than median >> kind of telling us how the distribution has a big tail. >> 99th Lag: *20 seconds*!! >> >> I can't quite figure out the source of the lag. Here are some other >> things of note: >> >> 1) The brokers in general are at about 40-50% CPU but I see occasional >> spikes to more than 80% - 95%. This could probably be causing issues. I am >> wondering if this is down to the high number of partitions I have and how >> each partition ends up receiving not too many messages. 22k messages spread >> across 1024 partitions = only 21 odd messages/sec/partition. But each >> broker should be receiving about 7500 messages/sec. It could also be down >> to the nature of these messages being tiny. I imagine that the kafka wire >> protocol is probably close to the 25 byte message payload. It also has to >> do a few CRC32 calculations etc. But again given that these are C3.2x >> machines and the number of messages is so tiny I'd expect it to do better. >> But again reading this article >> <https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines> >> about >> 2 million messages/sec across three brokers makes it seem like 22k 25 byte >> messages should be very easy. So my biggest suspicion right now is that the >> large number of partitions is some how responsible for this latency. >> >> 2) On the consumer given that the 99th time between fetches == 99th time >> of a fetch, I don't see how I could do any better. It's pretty close to >> just getting batches of data, iterating through them and dropping them on >> the floor. My requests on the consumer are built like this: >> >> FetchRequestBuilder builder = new FetchRequestBuilder().clientId( >> clientName) >> >> .maxWait(100).minBytes(1000); // max wait of 100 ms or >> at least 1000 bytes whichever happens first. >> >> // Add a fetch request for every partition. >> >> for (PartitionMetadata partition : partitions) { >> >> int partitionId = partition.partitionId(); >> >> long offset = readOffsets.get(partition); // This map >> maintains the next partition to be read. >> >> builder.addFetch(MY_TOPIC, partitionId, offset, 3000); // >> Up to 3000 bytes per fetch request.} >> } >> 3) On the producer side I use the kafka beta producer. I've played around >> with many settings but none of them seem to have made a difference. Here >> are my current settings: >> >> ProducerConfig values: >> >> block.on.buffer.full = false >> >> retry.backoff.ms = 100 >> >> buffer.memory = 83886080 // Kind of big - could it be adding lag? >> >> batch.size = 40500 // This is also kind of big - could it be >> adding lag? >> >> metrics.sample.window.ms = 30000 >> >> metadata.max.age.ms = 300000 >> >> receive.buffer.bytes = 32768 >> >> timeout.ms = 30000 >> >> max.in.flight.requests.per.connection = 5 >> >> metric.reporters = [] >> >> bootstrap.servers = [broker1, broker2, broker3] >> >> client.id = >> >> compression.type = none >> >> retries = 0 >> >> max.request.size = 1048576 >> >> send.buffer.bytes = 131072 >> >> acks = 1 >> >> reconnect.backoff.ms = 10 >> >> linger.ms = 250 // Based on this config I'd imagine that the >> lag should be bounded to about 250 ms over all. >> >> metrics.num.samples = 2 >> >> metadata.fetch.timeout.ms = 60000 >> >> I know this is a lot of information. I just wanted to provide as much >> context as possible. >> >> Thanks in advance! >> >>