I've done some more measurements. I've also started measuring the latency
between when I ask my producer to send a message and when I get an
acknowledgement via the callback. Here is my code:

// This function is called on every producer once every 30 seconds.

public void addLagMarkers(final Histogram enqueueLag) {

        final int numberOfPartitions = 1024;

        final long timeOfEnqueue = System.currentTimeMillis();

        final Callback callback = new Callback() {

            @Override

            public void onCompletion(RecordMetadata metadata, Exception ex)
{

                if (metadata != null) {

                    // The difference between ack time from broker and
enqueue time.

                    final long timeOfAck = System.currentTimeMillis();

                    final long lag = timeOfAck - timeOfEnqueue;

                    enqueueLag.update(lag);

                }

            }

        };

        for (int i = 0; i < numberOfPartitions; i++) {

            try {

                byte[] value = LagMarker.serialize(timeOfEnqueue);  // 10
bytes -> short version + long timestamp.

                // This message is later used by the consumers to measure
lag.

                ProducerRecord record = new ProducerRecord(MY_TOPIC, i, null,
value);

                kafkaProducer.send(record, callback);

            } catch (Exception e) {

                // We just dropped a lag marker.

            }

        }

    }

The* 99th* on this lag is about* 350 - 400* ms. It's not stellar, but
doesn't account for the *20-30 second 99th* I see on the end to end lag. I
am consuming in a tight loop on the Consumers (using the SimpleConsumer)
with minimal processing with a *99th fetch time *of *130-140* ms, so I
don't think that should be a problem either. Completely baffled.


Thanks!



On Sat, Dec 20, 2014 at 5:51 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:
>
>
>
> On Sat, Dec 20, 2014 at 3:49 PM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>>
>> I am trying to replace a Thrift peer to peer API with kafka for a
>> particular work flow. I am finding the 99th percentile latency to be
>> unacceptable at this time. This entire work load runs in an Amazon VPC. I'd
>> greatly appreciate it if some one has any insights on why I am seeing such
>> poor numbers. Here are some details and measurements taken:
>>
>> i) I have a single topic with 1024 partitions that I am writing to from
>> six clients using the kafka 0.8.2 beta kafka producer.
>>
>> ii) I have 3 brokers, each on  a c3 2x machine on ec2. Each of those
>> machines has 8 virtual cpus, 15 GB memory and 2 * 80 GB SSDs. The broker ->
>> partitions mapping was decided by Kafka when I created the topic.
>>
>> iii) I write about 22 thousand messages per second from across the 6
>> clients. This number was calculated using distributed counters. I just
>> increment a distributed counter in the callback from my enqueue job if the
>> metadata returned is not null. I also increment a number o dropped messages
>> counter if the callback has a non-null exception or if there was an
>> exception in the synchronous send call. The number of dropped messages is
>> pretty much always zero. Out of the 6 clients 3 are responsible for 95% of
>> the traffic. Messages are very tiny and have null keys  and 27 byte values
>> (2 byte version and 25 byte payload). Again these messages are written
>> using the kafka 0.8.2 client.
>>
>> iv) I have 3 consumer processes consuming only from this topic. Each
>> consumer process is assigned a disjoint set of the 1024 partitions by an
>> eternal arbiter. Each consumer process then creates a mapping from brokers
>> -> partitions it has been assigned. It then starts one fetcher thread per
>> broker. Each thread queries the broker (using the SimpleConsumer) it has
>> been assigned for partitions such that partitions = (partitions on broker)
>> *∩ *(partitions assigned to the process by the arbiter). So in effect
>> there are 9 consumer threads across 3 consumer processes that query a
>> disjoint set of partitions for the single topic and amongst themselves they
>> manage to consume from every topic. So each thread has a run loop where it
>> asks for messages, consumes them then asks for more messages. When a run
>> loop starts, it starts by querying for the latest message in a partition
>> (i.e. discards any previous backup) and then maintains a map of partition
>> -> nextOffsetToRequest in memory to make sure that it consumes messages in
>> order.
>>
> Edit: Mean't external arbiter.
>
>>
>> v) Consumption is really simple. Each message is put on a non blocking
>> efficient ring buffer. If the ring buffer is full the message is dropped. I
>> measure the mean time between fetches and it is the same as the time for
>> fetches up to a ms, meaning no matter how many messages are dequeued, it
>> takes almost no time to process them. At the end of processing a fetch
>> request I increment another distributed counter that counts the number of
>> messages processed. This counter tells me that on average I am consuming
>> the same number of messages/sec that I enqueue on the producers i.e. around
>> 22 thousand messages/sec.
>>
>> vi) The 99th percentile of the number of messages fetched per fetch
>> request is about 500 messages.  The 99th percentile of the time it takes to
>> fetch a batch is abut 130 - 140 ms. I played around with the buffer and
>> maxWait settings on the SimpleConsumer and attempting to consume more
>> messages was leading the 99th percentile of the fetch time to balloon up,
>> so I am consuming in smaller batches right now.
>>
>> vii) Every 30 seconds odd each of the producers inserts a trace message
>> into each partition (so 1024 messages per producer every 30 seconds). Each
>> message only contains the System.currentTimeMillis() at the time of
>> enqueueing. It is about 9 bytes long. The consumer in step (v) always
>> checks to see if a message it dequeued is of this trace type (instead of
>> the regular message type). This is a simple check on the first 2 byte
>> version that each message buffer contains. If it is a trace message it
>> reads it's payload as a long and uses the difference between the system
>> time on the consumer and this payload and updates a histogram with this
>> difference value. Ignoring NTP skew (which I've measured to be in the order
>> of milliseconds) this is the lag between when a message was enqueued on the
>> producer and when it was read on the consumer.
>>
> Edit: Trace messages are 10 bytes long - 2byte version + 8 byte long for
> timestamp.
>
> So pseudocode for every consumer thread (one per broker per consumer
>> process (9 in total across 3 consumers) is:
>>
>> void run() {
>>
>> while (running) {
>>
>>     FetchRequest fetchRequest = buildFetchRequest(
>> partitionsAssignedToThisProcessThatFallOnThisBroker);  // This
>> assignment is done by an external arbiter.
>>
>>     measureTimeBetweenFetchRequests();   // The 99th of this is 130-140ms
>>
>>     FetchResponse fetchResponse = fetchResponse(fetchRequest);  // The
>> 99th of this is 130-140ms, which is the same as the time between fetches.
>>
>>     processData(fetchResponse);  // The 99th on this is 2-3 ms.
>>   }
>> }
>>
>> void processData(FetchResponse response) {
>>   try (Timer.Context _ = processWorkerDataTimer.time() {  // The 99th on
>> this is 2-3 ms.
>>     for (Message message : response) {
>>        if (typeOf(message) == NORMAL_DATA) {
>>          enqueueOnNonBlockingRingBuffer(message);  // Never blocks, drops
>> message if ring buffer is full.
>>        } else if (typeOf(message) == TRACE_DATA) {
>>          long timeOfEnqueue = geEnqueueTime(message);
>>          long currentTime = System.currentTimeMillis();
>>          long difference = currentTime - timeOfEnqueue;
>>          lagHistogram.update(difference);
>>        }
>>      }
>>    }
>> }
>>
>> viii) So the kicker is that this lag is really really high:
>> Median Lag: *200 ms*. This already seems pretty high and I could even
>> discount some of it through NTP skew.
>> Mean Lag: *2 - 4 seconds*! Also notice how mean is bigger than median
>> kind of telling us how the distribution has a big tail.
>> 99th Lag: *20 seconds*!!
>>
>> I can't quite figure out the source of the lag. Here are some other
>> things of note:
>>
>> 1) The brokers in general are at about 40-50% CPU but I see occasional
>> spikes to more than 80% - 95%. This could probably be causing issues. I am
>> wondering if this is down to the high number of partitions I have and how
>> each partition ends up receiving not too many messages. 22k messages spread
>> across 1024 partitions = only 21 odd messages/sec/partition. But each
>> broker should be receiving about  7500 messages/sec. It could also be down
>> to the nature of these messages being tiny. I imagine that the kafka wire
>> protocol is probably close to the 25 byte message payload. It also has to
>> do a few  CRC32 calculations etc. But again given that these are C3.2x
>> machines and the number of messages is so tiny I'd expect it to do better.
>> But again reading this article
>> <https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines>
>>  about
>> 2 million messages/sec across three brokers makes it seem like 22k 25 byte
>> messages should be very easy. So my biggest suspicion right now is that the
>> large number of partitions is some how responsible for this latency.
>>
>> 2) On the consumer given that the 99th time between fetches == 99th time
>> of a fetch, I don't see how I could do any better. It's pretty close to
>> just getting batches of data, iterating through them and dropping them on
>> the floor. My requests on the consumer are built like this:
>>
>>         FetchRequestBuilder builder = new FetchRequestBuilder().clientId(
>> clientName)
>>
>>                 .maxWait(100).minBytes(1000);  // max wait of 100 ms or
>> at least 1000 bytes whichever happens first.
>>
>>         // Add a fetch request for every partition.
>>
>>         for (PartitionMetadata partition : partitions) {
>>
>>             int partitionId = partition.partitionId();
>>
>>             long offset = readOffsets.get(partition);  // This map
>> maintains the next partition to be read.
>>
>>             builder.addFetch(MY_TOPIC, partitionId, offset, 3000);  //
>> Up to 3000 bytes per fetch request.}
>>         }
>> 3) On the producer side I use the kafka beta producer. I've played around
>> with many settings but none of them seem to have made a difference. Here
>> are my current settings:
>>
>> ProducerConfig values:
>>
>>        block.on.buffer.full = false
>>
>>         retry.backoff.ms = 100
>>
>>         buffer.memory = 83886080  // Kind of big - could it be adding lag?
>>
>>         batch.size = 40500  // This is also kind of big - could it be
>> adding lag?
>>
>>         metrics.sample.window.ms = 30000
>>
>>         metadata.max.age.ms = 300000
>>
>>         receive.buffer.bytes = 32768
>>
>>         timeout.ms = 30000
>>
>>         max.in.flight.requests.per.connection = 5
>>
>>         metric.reporters = []
>>
>>         bootstrap.servers = [broker1, broker2, broker3]
>>
>>         client.id =
>>
>>         compression.type = none
>>
>>         retries = 0
>>
>>         max.request.size = 1048576
>>
>>         send.buffer.bytes = 131072
>>
>>         acks = 1
>>
>>         reconnect.backoff.ms = 10
>>
>>         linger.ms = 250  // Based on this config I'd imagine that the
>> lag should be bounded to about 250 ms over all.
>>
>>         metrics.num.samples = 2
>>
>>         metadata.fetch.timeout.ms = 60000
>>
>> I know this is a lot of information. I just wanted to provide as much
>> context as possible.
>>
>> Thanks in advance!
>>
>>

Reply via email to