On Sat, Dec 20, 2014 at 3:49 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:
>
> I am trying to replace a Thrift peer to peer API with kafka for a
> particular work flow. I am finding the 99th percentile latency to be
> unacceptable at this time. This entire work load runs in an Amazon VPC. I'd
> greatly appreciate it if some one has any insights on why I am seeing such
> poor numbers. Here are some details and measurements taken:
>
> i) I have a single topic with 1024 partitions that I am writing to from
> six clients using the kafka 0.8.2 beta kafka producer.
>
> ii) I have 3 brokers, each on  a c3 2x machine on ec2. Each of those
> machines has 8 virtual cpus, 15 GB memory and 2 * 80 GB SSDs. The broker ->
> partitions mapping was decided by Kafka when I created the topic.
>
> iii) I write about 22 thousand messages per second from across the 6
> clients. This number was calculated using distributed counters. I just
> increment a distributed counter in the callback from my enqueue job if the
> metadata returned is not null. I also increment a number o dropped messages
> counter if the callback has a non-null exception or if there was an
> exception in the synchronous send call. The number of dropped messages is
> pretty much always zero. Out of the 6 clients 3 are responsible for 95% of
> the traffic. Messages are very tiny and have null keys  and 27 byte values
> (2 byte version and 25 byte payload). Again these messages are written
> using the kafka 0.8.2 client.
>
> iv) I have 3 consumer processes consuming only from this topic. Each
> consumer process is assigned a disjoint set of the 1024 partitions by an
> eternal arbiter. Each consumer process then creates a mapping from brokers
> -> partitions it has been assigned. It then starts one fetcher thread per
> broker. Each thread queries the broker (using the SimpleConsumer) it has
> been assigned for partitions such that partitions = (partitions on broker)
> *∩ *(partitions assigned to the process by the arbiter). So in effect
> there are 9 consumer threads across 3 consumer processes that query a
> disjoint set of partitions for the single topic and amongst themselves they
> manage to consume from every topic. So each thread has a run loop where it
> asks for messages, consumes them then asks for more messages. When a run
> loop starts, it starts by querying for the latest message in a partition
> (i.e. discards any previous backup) and then maintains a map of partition
> -> nextOffsetToRequest in memory to make sure that it consumes messages in
> order.
>
Edit: Mean't external arbiter.

>
> v) Consumption is really simple. Each message is put on a non blocking
> efficient ring buffer. If the ring buffer is full the message is dropped. I
> measure the mean time between fetches and it is the same as the time for
> fetches up to a ms, meaning no matter how many messages are dequeued, it
> takes almost no time to process them. At the end of processing a fetch
> request I increment another distributed counter that counts the number of
> messages processed. This counter tells me that on average I am consuming
> the same number of messages/sec that I enqueue on the producers i.e. around
> 22 thousand messages/sec.
>
> vi) The 99th percentile of the number of messages fetched per fetch
> request is about 500 messages.  The 99th percentile of the time it takes to
> fetch a batch is abut 130 - 140 ms. I played around with the buffer and
> maxWait settings on the SimpleConsumer and attempting to consume more
> messages was leading the 99th percentile of the fetch time to balloon up,
> so I am consuming in smaller batches right now.
>
> vii) Every 30 seconds odd each of the producers inserts a trace message
> into each partition (so 1024 messages per producer every 30 seconds). Each
> message only contains the System.currentTimeMillis() at the time of
> enqueueing. It is about 9 bytes long. The consumer in step (v) always
> checks to see if a message it dequeued is of this trace type (instead of
> the regular message type). This is a simple check on the first 2 byte
> version that each message buffer contains. If it is a trace message it
> reads it's payload as a long and uses the difference between the system
> time on the consumer and this payload and updates a histogram with this
> difference value. Ignoring NTP skew (which I've measured to be in the order
> of milliseconds) this is the lag between when a message was enqueued on the
> producer and when it was read on the consumer.
>
Edit: Trace messages are 10 bytes long - 2byte version + 8 byte long for
timestamp.

So pseudocode for every consumer thread (one per broker per consumer
> process (9 in total across 3 consumers) is:
>
> void run() {
>
> while (running) {
>
>     FetchRequest fetchRequest = buildFetchRequest(
> partitionsAssignedToThisProcessThatFallOnThisBroker);  // This assignment
> is done by an external arbiter.
>
>     measureTimeBetweenFetchRequests();   // The 99th of this is 130-140ms
>
>     FetchResponse fetchResponse = fetchResponse(fetchRequest);  // The
> 99th of this is 130-140ms, which is the same as the time between fetches.
>
>     processData(fetchResponse);  // The 99th on this is 2-3 ms.
>   }
> }
>
> void processData(FetchResponse response) {
>   try (Timer.Context _ = processWorkerDataTimer.time() {  // The 99th on
> this is 2-3 ms.
>     for (Message message : response) {
>        if (typeOf(message) == NORMAL_DATA) {
>          enqueueOnNonBlockingRingBuffer(message);  // Never blocks, drops
> message if ring buffer is full.
>        } else if (typeOf(message) == TRACE_DATA) {
>          long timeOfEnqueue = geEnqueueTime(message);
>          long currentTime = System.currentTimeMillis();
>          long difference = currentTime - timeOfEnqueue;
>          lagHistogram.update(difference);
>        }
>      }
>    }
> }
>
> viii) So the kicker is that this lag is really really high:
> Median Lag: *200 ms*. This already seems pretty high and I could even
> discount some of it through NTP skew.
> Mean Lag: *2 - 4 seconds*! Also notice how mean is bigger than median
> kind of telling us how the distribution has a big tail.
> 99th Lag: *20 seconds*!!
>
> I can't quite figure out the source of the lag. Here are some other things
> of note:
>
> 1) The brokers in general are at about 40-50% CPU but I see occasional
> spikes to more than 80% - 95%. This could probably be causing issues. I am
> wondering if this is down to the high number of partitions I have and how
> each partition ends up receiving not too many messages. 22k messages spread
> across 1024 partitions = only 21 odd messages/sec/partition. But each
> broker should be receiving about  7500 messages/sec. It could also be down
> to the nature of these messages being tiny. I imagine that the kafka wire
> protocol is probably close to the 25 byte message payload. It also has to
> do a few  CRC32 calculations etc. But again given that these are C3.2x
> machines and the number of messages is so tiny I'd expect it to do better.
> But again reading this article
> <https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines>
>  about
> 2 million messages/sec across three brokers makes it seem like 22k 25 byte
> messages should be very easy. So my biggest suspicion right now is that the
> large number of partitions is some how responsible for this latency.
>
> 2) On the consumer given that the 99th time between fetches == 99th time
> of a fetch, I don't see how I could do any better. It's pretty close to
> just getting batches of data, iterating through them and dropping them on
> the floor. My requests on the consumer are built like this:
>
>         FetchRequestBuilder builder = new FetchRequestBuilder().clientId(
> clientName)
>
>                 .maxWait(100).minBytes(1000);  // max wait of 100 ms or
> at least 1000 bytes whichever happens first.
>
>         // Add a fetch request for every partition.
>
>         for (PartitionMetadata partition : partitions) {
>
>             int partitionId = partition.partitionId();
>
>             long offset = readOffsets.get(partition);  // This map
> maintains the next partition to be read.
>
>             builder.addFetch(MY_TOPIC, partitionId, offset, 3000);  // Up
> to 3000 bytes per fetch request.}
>         }
> 3) On the producer side I use the kafka beta producer. I've played around
> with many settings but none of them seem to have made a difference. Here
> are my current settings:
>
> ProducerConfig values:
>
>        block.on.buffer.full = false
>
>         retry.backoff.ms = 100
>
>         buffer.memory = 83886080  // Kind of big - could it be adding lag?
>
>         batch.size = 40500  // This is also kind of big - could it be
> adding lag?
>
>         metrics.sample.window.ms = 30000
>
>         metadata.max.age.ms = 300000
>
>         receive.buffer.bytes = 32768
>
>         timeout.ms = 30000
>
>         max.in.flight.requests.per.connection = 5
>
>         metric.reporters = []
>
>         bootstrap.servers = [broker1, broker2, broker3]
>
>         client.id =
>
>         compression.type = none
>
>         retries = 0
>
>         max.request.size = 1048576
>
>         send.buffer.bytes = 131072
>
>         acks = 1
>
>         reconnect.backoff.ms = 10
>
>         linger.ms = 250  // Based on this config I'd imagine that the lag
> should be bounded to about 250 ms over all.
>
>         metrics.num.samples = 2
>
>         metadata.fetch.timeout.ms = 60000
>
> I know this is a lot of information. I just wanted to provide as much
> context as possible.
>
> Thanks in advance!
>
>

Reply via email to