Hey Rajiv,

This sounds like a bug. The more info you can help us get the easier to
fix. Things that would help:
1. Can you check if the the request log on the servers shows latency spikes
(in which case it is a server problem)?
2. It would be worth also getting the jmx stats on the producer as they
will show things like what percentage of time it is waiting for buffer
space etc.

If your test is reasonably stand-alone it would be great to file a JIRA and
attach the test code and the findings you already have so someone can dig
into what is going on.

-Jay

On Sun, Dec 28, 2014 at 7:15 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Hi all,
>
> Bumping this up, in case some one has any ideas. I did yet another
> experiment where I create 4 producers and stripe the send requests across
> them in a manner such that any one producer only sees 256 partitions
> instead of the entire 1024. This seems to have helped a bit, and though I
> still see crazy high 99th (25-30 seconds), the median, mean, 75th and 95th
> percentile have all gone down.
>
> Thanks!
>
> On Sun, Dec 21, 2014 at 12:27 PM, Thunder Stumpges <tstump...@ntent.com>
> wrote:
>
> > Ah I thought it was restarting the broker that made things better :)
> >
> > Yeah I have no experience with the Java client so can't really help
> there.
> >
> > Good luck!
> >
> > -----Original Message-----
> > From: Rajiv Kurian [ra...@signalfuse.com]
> > Received: Sunday, 21 Dec 2014, 12:25PM
> > To: users@kafka.apache.org [users@kafka.apache.org]
> > Subject: Re: Trying to figure out kafka latency issues
> >
> > I'll take a look at the GC profile of the brokers Right now I keep a tab
> on
> > the CPU, Messages in, Bytes in, Bytes out, free memory (on the machine
> not
> > JVM heap) free disk space on the broker. I'll need to take a look at the
> > JVM metrics too. What seemed strange is that going from 8 -> 512
> partitions
> > increases the latency, but going fro 512-> 8 does not decrease it. I have
> > to restart the producer (but not the broker) for the end to end latency
> to
> > go down That made it seem  that the fault was probably with the producer
> > and not the broker. Only restarting the producer made things better. I'll
> > do more extensive measurement on the broker.
> >
> > On Sun, Dec 21, 2014 at 9:08 AM, Thunder Stumpges <tstump...@ntent.com>
> > wrote:
> > >
> > > Did you see my response and have you checked the server logs especially
> > > the GC logs? It still sounds like you are running out of memory on the
> > > broker. What is your max heap memory and are you thrashing once you
> start
> > > writing to all those partitions?
> > >
> > > You have measured very thoroughly from an external point of view, i
> think
> > > now you'll have to start measuring the internal metrics. Maybe someone
> > else
> > > will have ideas on what jmx values to watch.
> > >
> > > Best,
> > > Thunder
> > >
> > >
> > > -----Original Message-----
> > > From: Rajiv Kurian [ra...@signalfuse.com]
> > > Received: Saturday, 20 Dec 2014, 10:24PM
> > > To: users@kafka.apache.org [users@kafka.apache.org]
> > > Subject: Re: Trying to figure out kafka latency issues
> > >
> > > Some more work tells me that the end to end latency numbers vary with
> the
> > > number of partitions I am writing to. I did an experiment, where based
> > on a
> > > run time flag I would dynamically select how many of the *1024
> > partitions*
> > > I write to. So say I decide I'll write to at most 256 partitions I mod
> > > whatever partition I would actually write to by 256. Basically the
> number
> > > of partitions for this topic on the broker remains the same at *1024*
> > > partitions but the number of partitions my producers write to changes
> > > dynamically based on a run time flag. So something like this:
> > >
> > > int partition = getPartitionForMessage(message);
> > > int maxPartitionsToWriteTo = maxPartitionsFlag.get();   // This flag
> can
> > be
> > > updated without bringing the application down - just a volatile read of
> > > some number set externally.
> > > int moddedPartition = partition % maxPartitionsToWrite.
> > > // Send a message to this Kafka partition.
> > >
> > > Here are some interesting things I've noticed:
> > >
> > > i) When I start my client and it *never writes* to more than *8
> > > partitions *(same
> > > data rate but fewer partitions) - the end to end *99th latency is
> 300-350
> > > ms*. Quite a bit of this (numbers in my previous emails) is the latency
> > > from producer -> broker and the latency from broker -> consumer. Still
> > > nowhere as poor as the *20 - 30* seconds I was seeing.
> > >
> > > ii) When I increase the maximum number of partitions, end to end
> latency
> > > increases dramatically. At *256 partitions* the end to end *99th
> latency
> > is
> > > still 390 - 418 ms.* Worse than the latency figures for *8 *partitions,
> > but
> > > not by much. When I increase this number to *512 partitions *the end
> > > to end *99th
> > > latency *becomes an intolerable *19-24 seconds*. At *1024* partitions
> the
> > > *99th
> > > latency is at 25 - 30 seconds*.
> > > A table of the numbers:
> > >
> > > Max number of partitions written to (out of 1024)
> > >
> > > End to end latency
> > >
> > > 8
> > >
> > > 300 - 350 ms
> > >
> > > 256
> > >
> > > 390 - 418 ms
> > >
> > > 512
> > >
> > > 19 - 24 seconds
> > >
> > > 1024
> > >
> > > 25 - 30 seconds
> > >
> > >
> > > iii) Once I make the max number of partitions high enough, reducing it
> > > doesn't help. For eg: If I go up from *8* to *512 *partitions, the
> > latency
> > > goes up. But while the producer is running if I go down from  *512* to
> > > *8 *partitions,
> > > it doesn't reduce the latency numbers. My guess is that the producer is
> > > creating some state lazily per partition and this state is causing the
> > > latency. Once this state is created, writing to fewer partitions
> doesn't
> > > seem to help. Only a restart of the producer calms things down.
> > >
> > > So my current plan is to reduce the number of partitions on the topic,
> > but
> > > there seems to be something deeper going on for the latency numbers to
> be
> > > so poor to begin with and then suffer so much more (non linearly) with
> > > additional partitions.
> > >
> > > Thanks!
> > >
> > > On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian <ra...@signalfuse.com>
> > > wrote:
> > > >
> > > > I've done some more measurements. I've also started measuring the
> > latency
> > > > between when I ask my producer to send a message and when I get an
> > > > acknowledgement via the callback. Here is my code:
> > > >
> > > > // This function is called on every producer once every 30 seconds.
> > > >
> > > > public void addLagMarkers(final Histogram enqueueLag) {
> > > >
> > > >         final int numberOfPartitions = 1024;
> > > >
> > > >         final long timeOfEnqueue = System.currentTimeMillis();
> > > >
> > > >         final Callback callback = new Callback() {
> > > >
> > > >             @Override
> > > >
> > > >             public void onCompletion(RecordMetadata metadata,
> Exception
> > > ex)
> > > > {
> > > >
> > > >                 if (metadata != null) {
> > > >
> > > >                     // The difference between ack time from broker
> and
> > > > enqueue time.
> > > >
> > > >                     final long timeOfAck =
> System.currentTimeMillis();
> > > >
> > > >                     final long lag = timeOfAck - timeOfEnqueue;
> > > >
> > > >                     enqueueLag.update(lag);
> > > >
> > > >                 }
> > > >
> > > >             }
> > > >
> > > >         };
> > > >
> > > >         for (int i = 0; i < numberOfPartitions; i++) {
> > > >
> > > >             try {
> > > >
> > > >                 byte[] value = LagMarker.serialize(timeOfEnqueue);
> //
> > 10
> > > > bytes -> short version + long timestamp.
> > > >
> > > >                 // This message is later used by the consumers to
> > measure
> > > > lag.
> > > >
> > > >                 ProducerRecord record = new ProducerRecord(MY_TOPIC,
> i,
> > > > null, value);
> > > >
> > > >                 kafkaProducer.send(record, callback);
> > > >
> > > >             } catch (Exception e) {
> > > >
> > > >                 // We just dropped a lag marker.
> > > >
> > > >             }
> > > >
> > > >         }
> > > >
> > > >     }
> > > >
> > > > The* 99th* on this lag is about* 350 - 400* ms. It's not stellar, but
> > > > doesn't account for the *20-30 second 99th* I see on the end to end
> > lag.
> > > > I am consuming in a tight loop on the Consumers (using the
> > > SimpleConsumer)
> > > > with minimal processing with a *99th fetch time *of *130-140* ms, so
> I
> > > > don't think that should be a problem either. Completely baffled.
> > > >
> > > >
> > > > Thanks!
> > > >
> > > >
> > > >
> > > > On Sat, Dec 20, 2014 at 5:51 PM, Rajiv Kurian <ra...@signalfuse.com>
> > > > wrote:
> > > >>
> > > >>
> > > >>
> > > >> On Sat, Dec 20, 2014 at 3:49 PM, Rajiv Kurian <ra...@signalfuse.com
> >
> > > >> wrote:
> > > >>>
> > > >>> I am trying to replace a Thrift peer to peer API with kafka for a
> > > >>> particular work flow. I am finding the 99th percentile latency to
> be
> > > >>> unacceptable at this time. This entire work load runs in an Amazon
> > > VPC. I'd
> > > >>> greatly appreciate it if some one has any insights on why I am
> seeing
> > > such
> > > >>> poor numbers. Here are some details and measurements taken:
> > > >>>
> > > >>> i) I have a single topic with 1024 partitions that I am writing to
> > from
> > > >>> six clients using the kafka 0.8.2 beta kafka producer.
> > > >>>
> > > >>> ii) I have 3 brokers, each on  a c3 2x machine on ec2. Each of
> those
> > > >>> machines has 8 virtual cpus, 15 GB memory and 2 * 80 GB SSDs. The
> > > broker ->
> > > >>> partitions mapping was decided by Kafka when I created the topic.
> > > >>>
> > > >>> iii) I write about 22 thousand messages per second from across the
> 6
> > > >>> clients. This number was calculated using distributed counters. I
> > just
> > > >>> increment a distributed counter in the callback from my enqueue job
> > if
> > > the
> > > >>> metadata returned is not null. I also increment a number o dropped
> > > messages
> > > >>> counter if the callback has a non-null exception or if there was an
> > > >>> exception in the synchronous send call. The number of dropped
> > messages
> > > is
> > > >>> pretty much always zero. Out of the 6 clients 3 are responsible for
> > > 95% of
> > > >>> the traffic. Messages are very tiny and have null keys  and 27 byte
> > > values
> > > >>> (2 byte version and 25 byte payload). Again these messages are
> > written
> > > >>> using the kafka 0.8.2 client.
> > > >>>
> > > >>> iv) I have 3 consumer processes consuming only from this topic.
> Each
> > > >>> consumer process is assigned a disjoint set of the 1024 partitions
> by
> > > an
> > > >>> eternal arbiter. Each consumer process then creates a mapping from
> > > brokers
> > > >>> -> partitions it has been assigned. It then starts one fetcher
> thread
> > > per
> > > >>> broker. Each thread queries the broker (using the SimpleConsumer)
> it
> > > has
> > > >>> been assigned for partitions such that partitions = (partitions on
> > > broker)
> > > >>> *∩ *(partitions assigned to the process by the arbiter). So in
> effect
> > > >>> there are 9 consumer threads across 3 consumer processes that
> query a
> > > >>> disjoint set of partitions for the single topic and amongst
> > themselves
> > > they
> > > >>> manage to consume from every topic. So each thread has a run loop
> > > where it
> > > >>> asks for messages, consumes them then asks for more messages. When
> a
> > > run
> > > >>> loop starts, it starts by querying for the latest message in a
> > > partition
> > > >>> (i.e. discards any previous backup) and then maintains a map of
> > > partition
> > > >>> -> nextOffsetToRequest in memory to make sure that it consumes
> > > messages in
> > > >>> order.
> > > >>>
> > > >> Edit: Mean't external arbiter.
> > > >>
> > > >>>
> > > >>> v) Consumption is really simple. Each message is put on a non
> > blocking
> > > >>> efficient ring buffer. If the ring buffer is full the message is
> > > dropped. I
> > > >>> measure the mean time between fetches and it is the same as the
> time
> > > for
> > > >>> fetches up to a ms, meaning no matter how many messages are
> dequeued,
> > > it
> > > >>> takes almost no time to process them. At the end of processing a
> > fetch
> > > >>> request I increment another distributed counter that counts the
> > number
> > > of
> > > >>> messages processed. This counter tells me that on average I am
> > > consuming
> > > >>> the same number of messages/sec that I enqueue on the producers
> i.e.
> > > around
> > > >>> 22 thousand messages/sec.
> > > >>>
> > > >>> vi) The 99th percentile of the number of messages fetched per fetch
> > > >>> request is about 500 messages.  The 99th percentile of the time it
> > > takes to
> > > >>> fetch a batch is abut 130 - 140 ms. I played around with the buffer
> > and
> > > >>> maxWait settings on the SimpleConsumer and attempting to consume
> more
> > > >>> messages was leading the 99th percentile of the fetch time to
> balloon
> > > up,
> > > >>> so I am consuming in smaller batches right now.
> > > >>>
> > > >>> vii) Every 30 seconds odd each of the producers inserts a trace
> > message
> > > >>> into each partition (so 1024 messages per producer every 30
> seconds).
> > > Each
> > > >>> message only contains the System.currentTimeMillis() at the time of
> > > >>> enqueueing. It is about 9 bytes long. The consumer in step (v)
> always
> > > >>> checks to see if a message it dequeued is of this trace type
> (instead
> > > of
> > > >>> the regular message type). This is a simple check on the first 2
> byte
> > > >>> version that each message buffer contains. If it is a trace message
> > it
> > > >>> reads it's payload as a long and uses the difference between the
> > system
> > > >>> time on the consumer and this payload and updates a histogram with
> > this
> > > >>> difference value. Ignoring NTP skew (which I've measured to be in
> the
> > > order
> > > >>> of milliseconds) this is the lag between when a message was
> enqueued
> > > on the
> > > >>> producer and when it was read on the consumer.
> > > >>>
> > > >> Edit: Trace messages are 10 bytes long - 2byte version + 8 byte long
> > for
> > > >> timestamp.
> > > >>
> > > >> So pseudocode for every consumer thread (one per broker per consumer
> > > >>> process (9 in total across 3 consumers) is:
> > > >>>
> > > >>> void run() {
> > > >>>
> > > >>> while (running) {
> > > >>>
> > > >>>     FetchRequest fetchRequest = buildFetchRequest(
> > > >>> partitionsAssignedToThisProcessThatFallOnThisBroker);  // This
> > > >>> assignment is done by an external arbiter.
> > > >>>
> > > >>>     measureTimeBetweenFetchRequests();   // The 99th of this is
> > > 130-140ms
> > > >>>
> > > >>>     FetchResponse fetchResponse = fetchResponse(fetchRequest);  //
> > The
> > > >>> 99th of this is 130-140ms, which is the same as the time between
> > > fetches.
> > > >>>
> > > >>>     processData(fetchResponse);  // The 99th on this is 2-3 ms.
> > > >>>   }
> > > >>> }
> > > >>>
> > > >>> void processData(FetchResponse response) {
> > > >>>   try (Timer.Context _ = processWorkerDataTimer.time() {  // The
> 99th
> > > on
> > > >>> this is 2-3 ms.
> > > >>>     for (Message message : response) {
> > > >>>        if (typeOf(message) == NORMAL_DATA) {
> > > >>>          enqueueOnNonBlockingRingBuffer(message);  // Never blocks,
> > > >>> drops message if ring buffer is full.
> > > >>>        } else if (typeOf(message) == TRACE_DATA) {
> > > >>>          long timeOfEnqueue = geEnqueueTime(message);
> > > >>>          long currentTime = System.currentTimeMillis();
> > > >>>          long difference = currentTime - timeOfEnqueue;
> > > >>>          lagHistogram.update(difference);
> > > >>>        }
> > > >>>      }
> > > >>>    }
> > > >>> }
> > > >>>
> > > >>> viii) So the kicker is that this lag is really really high:
> > > >>> Median Lag: *200 ms*. This already seems pretty high and I could
> even
> > > >>> discount some of it through NTP skew.
> > > >>> Mean Lag: *2 - 4 seconds*! Also notice how mean is bigger than
> median
> > > >>> kind of telling us how the distribution has a big tail.
> > > >>> 99th Lag: *20 seconds*!!
> > > >>>
> > > >>> I can't quite figure out the source of the lag. Here are some other
> > > >>> things of note:
> > > >>>
> > > >>> 1) The brokers in general are at about 40-50% CPU but I see
> > occasional
> > > >>> spikes to more than 80% - 95%. This could probably be causing
> issues.
> > > I am
> > > >>> wondering if this is down to the high number of partitions I have
> and
> > > how
> > > >>> each partition ends up receiving not too many messages. 22k
> messages
> > > spread
> > > >>> across 1024 partitions = only 21 odd messages/sec/partition. But
> each
> > > >>> broker should be receiving about  7500 messages/sec. It could also
> be
> > > down
> > > >>> to the nature of these messages being tiny. I imagine that the
> kafka
> > > wire
> > > >>> protocol is probably close to the 25 byte message payload. It also
> > has
> > > to
> > > >>> do a few  CRC32 calculations etc. But again given that these are
> > C3.2x
> > > >>> machines and the number of messages is so tiny I'd expect it to do
> > > better.
> > > >>> But again reading this article
> > > >>> <
> > >
> >
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
> > >
> > > about
> > > >>> 2 million messages/sec across three brokers makes it seem like 22k
> 25
> > > byte
> > > >>> messages should be very easy. So my biggest suspicion right now is
> > > that the
> > > >>> large number of partitions is some how responsible for this
> latency.
> > > >>>
> > > >>> 2) On the consumer given that the 99th time between fetches == 99th
> > > time
> > > >>> of a fetch, I don't see how I could do any better. It's pretty
> close
> > to
> > > >>> just getting batches of data, iterating through them and dropping
> > them
> > > on
> > > >>> the floor. My requests on the consumer are built like this:
> > > >>>
> > > >>>         FetchRequestBuilder builder = new
> > > >>> FetchRequestBuilder().clientId(clientName)
> > > >>>
> > > >>>                 .maxWait(100).minBytes(1000);  // max wait of 100
> ms
> > or
> > > >>> at least 1000 bytes whichever happens first.
> > > >>>
> > > >>>         // Add a fetch request for every partition.
> > > >>>
> > > >>>         for (PartitionMetadata partition : partitions) {
> > > >>>
> > > >>>             int partitionId = partition.partitionId();
> > > >>>
> > > >>>             long offset = readOffsets.get(partition);  // This map
> > > >>> maintains the next partition to be read.
> > > >>>
> > > >>>             builder.addFetch(MY_TOPIC, partitionId, offset, 3000);
> > //
> > > >>> Up to 3000 bytes per fetch request.}
> > > >>>         }
> > > >>> 3) On the producer side I use the kafka beta producer. I've played
> > > >>> around with many settings but none of them seem to have made a
> > > difference.
> > > >>> Here are my current settings:
> > > >>>
> > > >>> ProducerConfig values:
> > > >>>
> > > >>>        block.on.buffer.full = false
> > > >>>
> > > >>>         retry.backoff.ms = 100
> > > >>>
> > > >>>         buffer.memory = 83886080  // Kind of big - could it be
> adding
> > > >>> lag?
> > > >>>
> > > >>>         batch.size = 40500  // This is also kind of big - could it
> be
> > > >>> adding lag?
> > > >>>
> > > >>>         metrics.sample.window.ms = 30000
> > > >>>
> > > >>>         metadata.max.age.ms = 300000
> > > >>>
> > > >>>         receive.buffer.bytes = 32768
> > > >>>
> > > >>>         timeout.ms = 30000
> > > >>>
> > > >>>         max.in.flight.requests.per.connection = 5
> > > >>>
> > > >>>         metric.reporters = []
> > > >>>
> > > >>>         bootstrap.servers = [broker1, broker2, broker3]
> > > >>>
> > > >>>         client.id =
> > > >>>
> > > >>>         compression.type = none
> > > >>>
> > > >>>         retries = 0
> > > >>>
> > > >>>         max.request.size = 1048576
> > > >>>
> > > >>>         send.buffer.bytes = 131072
> > > >>>
> > > >>>         acks = 1
> > > >>>
> > > >>>         reconnect.backoff.ms = 10
> > > >>>
> > > >>>         linger.ms = 250  // Based on this config I'd imagine that
> > the
> > > >>> lag should be bounded to about 250 ms over all.
> > > >>>
> > > >>>         metrics.num.samples = 2
> > > >>>
> > > >>>         metadata.fetch.timeout.ms = 60000
> > > >>>
> > > >>> I know this is a lot of information. I just wanted to provide as
> much
> > > >>> context as possible.
> > > >>>
> > > >>> Thanks in advance!
> > > >>>
> > > >>>
> > >
> >
>

Reply via email to