I am trying to replace a Thrift peer to peer API with kafka for a
particular work flow. I am finding the 99th percentile latency to be
unacceptable at this time. This entire work load runs in an Amazon VPC. I'd
greatly appreciate it if some one has any insights on why I am seeing such
poor numbers. Here are some details and measurements taken:

i) I have a single topic with 1024 partitions that I am writing to from six
clients using the kafka 0.8.2 beta kafka producer.

ii) I have 3 brokers, each on  a c3 2x machine on ec2. Each of those
machines has 8 virtual cpus, 15 GB memory and 2 * 80 GB SSDs. The broker ->
partitions mapping was decided by Kafka when I created the topic.

iii) I write about 22 thousand messages per second from across the 6
clients. This number was calculated using distributed counters. I just
increment a distributed counter in the callback from my enqueue job if the
metadata returned is not null. I also increment a number o dropped messages
counter if the callback has a non-null exception or if there was an
exception in the synchronous send call. The number of dropped messages is
pretty much always zero. Out of the 6 clients 3 are responsible for 95% of
the traffic. Messages are very tiny and have null keys  and 27 byte values
(2 byte version and 25 byte payload). Again these messages are written
using the kafka 0.8.2 client.

iv) I have 3 consumer processes consuming only from this topic. Each
consumer process is assigned a disjoint set of the 1024 partitions by an
eternal arbiter. Each consumer process then creates a mapping from brokers
-> partitions it has been assigned. It then starts one fetcher thread per
broker. Each thread queries the broker (using the SimpleConsumer) it has
been assigned for partitions such that partitions = (partitions on broker)
*∩ *(partitions assigned to the process by the arbiter). So in effect there
are 9 consumer threads across 3 consumer processes that query a disjoint
set of partitions for the single topic and amongst themselves they manage
to consume from every topic. So each thread has a run loop where it asks
for messages, consumes them then asks for more messages. When a run loop
starts, it starts by querying for the latest message in a partition (i.e.
discards any previous backup) and then maintains a map of partition ->
nextOffsetToRequest in memory to make sure that it consumes messages in
order.

v) Consumption is really simple. Each message is put on a non blocking
efficient ring buffer. If the ring buffer is full the message is dropped. I
measure the mean time between fetches and it is the same as the time for
fetches up to a ms, meaning no matter how many messages are dequeued, it
takes almost no time to process them. At the end of processing a fetch
request I increment another distributed counter that counts the number of
messages processed. This counter tells me that on average I am consuming
the same number of messages/sec that I enqueue on the producers i.e. around
22 thousand messages/sec.

vi) The 99th percentile of the number of messages fetched per fetch request
is about 500 messages.  The 99th percentile of the time it takes to fetch a
batch is abut 130 - 140 ms. I played around with the buffer and maxWait
settings on the SimpleConsumer and attempting to consume more messages was
leading the 99th percentile of the fetch time to balloon up, so I am
consuming in smaller batches right now.

vii) Every 30 seconds odd each of the producers inserts a trace message
into each partition (so 1024 messages per producer every 30 seconds). Each
message only contains the System.currentTimeMillis() at the time of
enqueueing. It is about 9 bytes long. The consumer in step (v) always
checks to see if a message it dequeued is of this trace type (instead of
the regular message type). This is a simple check on the first 2 byte
version that each message buffer contains. If it is a trace message it
reads it's payload as a long and uses the difference between the system
time on the consumer and this payload and updates a histogram with this
difference value. Ignoring NTP skew (which I've measured to be in the order
of milliseconds) this is the lag between when a message was enqueued on the
producer and when it was read on the consumer.

So pseudocode for every consumer thread (one per broker per consumer
process (9 in total across 3 consumers) is:

void run() {

while (running) {

    FetchRequest fetchRequest = buildFetchRequest(
partitionsAssignedToThisProcessThatFallOnThisBroker);  // This assignment
is done by an external arbiter.

    measureTimeBetweenFetchRequests();   // The 99th of this is 130-140ms

    FetchResponse fetchResponse = fetchResponse(fetchRequest);  // The 99th
of this is 130-140ms, which is the same as the time between fetches.

    processData(fetchResponse);  // The 99th on this is 2-3 ms.
  }
}

void processData(FetchResponse response) {
  try (Timer.Context _ = processWorkerDataTimer.time() {  // The 99th on
this is 2-3 ms.
    for (Message message : response) {
       if (typeOf(message) == NORMAL_DATA) {
         enqueueOnNonBlockingRingBuffer(message);  // Never blocks, drops
message if ring buffer is full.
       } else if (typeOf(message) == TRACE_DATA) {
         long timeOfEnqueue = geEnqueueTime(message);
         long currentTime = System.currentTimeMillis();
         long difference = currentTime - timeOfEnqueue;
         lagHistogram.update(difference);
       }
     }
   }
}

viii) So the kicker is that this lag is really really high:
Median Lag: *200 ms*. This already seems pretty high and I could even
discount some of it through NTP skew.
Mean Lag: *2 - 4 seconds*! Also notice how mean is bigger than median kind
of telling us how the distribution has a big tail.
99th Lag: *20 seconds*!!

I can't quite figure out the source of the lag. Here are some other things
of note:

1) The brokers in general are at about 40-50% CPU but I see occasional
spikes to more than 80% - 95%. This could probably be causing issues. I am
wondering if this is down to the high number of partitions I have and how
each partition ends up receiving not too many messages. 22k messages spread
across 1024 partitions = only 21 odd messages/sec/partition. But each
broker should be receiving about  7500 messages/sec. It could also be down
to the nature of these messages being tiny. I imagine that the kafka wire
protocol is probably close to the 25 byte message payload. It also has to
do a few  CRC32 calculations etc. But again given that these are C3.2x
machines and the number of messages is so tiny I'd expect it to do better.
But again reading this article
<https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines>
about
2 million messages/sec across three brokers makes it seem like 22k 25 byte
messages should be very easy. So my biggest suspicion right now is that the
large number of partitions is some how responsible for this latency.

2) On the consumer given that the 99th time between fetches == 99th time of
a fetch, I don't see how I could do any better. It's pretty close to just
getting batches of data, iterating through them and dropping them on the
floor. My requests on the consumer are built like this:

        FetchRequestBuilder builder = new FetchRequestBuilder().clientId(
clientName)

                .maxWait(100).minBytes(1000);  // max wait of 100 ms or at
least 1000 bytes whichever happens first.

        // Add a fetch request for every partition.

        for (PartitionMetadata partition : partitions) {

            int partitionId = partition.partitionId();

            long offset = readOffsets.get(partition);  // This map
maintains the next partition to be read.

            builder.addFetch(MY_TOPIC, partitionId, offset, 3000);  // Up
to 3000 bytes per fetch request.

3) On the producer side I use the kafka beta producer. I've played around
with many settings but none of them seem to have made a difference. Here
are my current settings:

ProducerConfig values:

       block.on.buffer.full = false

        retry.backoff.ms = 100

        buffer.memory = 83886080  // Kind of big - could it be adding lag?

        batch.size = 40500  // This is also kind of big - could it be
adding lag?

        metrics.sample.window.ms = 30000

        metadata.max.age.ms = 300000

        receive.buffer.bytes = 32768

        timeout.ms = 30000

        max.in.flight.requests.per.connection = 5

        metric.reporters = []

        bootstrap.servers = [broker1, broker2, broker3]

        client.id =

        compression.type = none

        retries = 0

        max.request.size = 1048576

        send.buffer.bytes = 131072

        acks = 1

        reconnect.backoff.ms = 10

        linger.ms = 250  // Based on this config I'd imagine that the lag
should be bounded to about 250 ms over all.

        metrics.num.samples = 2

        metadata.fetch.timeout.ms = 60000

I know this is a lot of information. I just wanted to provide as much
context as possible.

Thanks in advance!

Reply via email to