Thanks Jay. Will check (1) and (2) and get back to you. The test is not
stand-alone now. It might be a bit of work to extract it to a stand-alone
executable. It might take me a bit of time to get that going.

On Mon, Dec 29, 2014 at 9:45 AM, Jay Kreps <j...@confluent.io> wrote:

> Hey Rajiv,
>
> This sounds like a bug. The more info you can help us get the easier to
> fix. Things that would help:
> 1. Can you check if the the request log on the servers shows latency spikes
> (in which case it is a server problem)?
> 2. It would be worth also getting the jmx stats on the producer as they
> will show things like what percentage of time it is waiting for buffer
> space etc.
>
> If your test is reasonably stand-alone it would be great to file a JIRA and
> attach the test code and the findings you already have so someone can dig
> into what is going on.
>
> -Jay
>
> On Sun, Dec 28, 2014 at 7:15 PM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
> > Hi all,
> >
> > Bumping this up, in case some one has any ideas. I did yet another
> > experiment where I create 4 producers and stripe the send requests across
> > them in a manner such that any one producer only sees 256 partitions
> > instead of the entire 1024. This seems to have helped a bit, and though I
> > still see crazy high 99th (25-30 seconds), the median, mean, 75th and
> 95th
> > percentile have all gone down.
> >
> > Thanks!
> >
> > On Sun, Dec 21, 2014 at 12:27 PM, Thunder Stumpges <tstump...@ntent.com>
> > wrote:
> >
> > > Ah I thought it was restarting the broker that made things better :)
> > >
> > > Yeah I have no experience with the Java client so can't really help
> > there.
> > >
> > > Good luck!
> > >
> > > -----Original Message-----
> > > From: Rajiv Kurian [ra...@signalfuse.com]
> > > Received: Sunday, 21 Dec 2014, 12:25PM
> > > To: users@kafka.apache.org [users@kafka.apache.org]
> > > Subject: Re: Trying to figure out kafka latency issues
> > >
> > > I'll take a look at the GC profile of the brokers Right now I keep a
> tab
> > on
> > > the CPU, Messages in, Bytes in, Bytes out, free memory (on the machine
> > not
> > > JVM heap) free disk space on the broker. I'll need to take a look at
> the
> > > JVM metrics too. What seemed strange is that going from 8 -> 512
> > partitions
> > > increases the latency, but going fro 512-> 8 does not decrease it. I
> have
> > > to restart the producer (but not the broker) for the end to end latency
> > to
> > > go down That made it seem  that the fault was probably with the
> producer
> > > and not the broker. Only restarting the producer made things better.
> I'll
> > > do more extensive measurement on the broker.
> > >
> > > On Sun, Dec 21, 2014 at 9:08 AM, Thunder Stumpges <tstump...@ntent.com
> >
> > > wrote:
> > > >
> > > > Did you see my response and have you checked the server logs
> especially
> > > > the GC logs? It still sounds like you are running out of memory on
> the
> > > > broker. What is your max heap memory and are you thrashing once you
> > start
> > > > writing to all those partitions?
> > > >
> > > > You have measured very thoroughly from an external point of view, i
> > think
> > > > now you'll have to start measuring the internal metrics. Maybe
> someone
> > > else
> > > > will have ideas on what jmx values to watch.
> > > >
> > > > Best,
> > > > Thunder
> > > >
> > > >
> > > > -----Original Message-----
> > > > From: Rajiv Kurian [ra...@signalfuse.com]
> > > > Received: Saturday, 20 Dec 2014, 10:24PM
> > > > To: users@kafka.apache.org [users@kafka.apache.org]
> > > > Subject: Re: Trying to figure out kafka latency issues
> > > >
> > > > Some more work tells me that the end to end latency numbers vary with
> > the
> > > > number of partitions I am writing to. I did an experiment, where
> based
> > > on a
> > > > run time flag I would dynamically select how many of the *1024
> > > partitions*
> > > > I write to. So say I decide I'll write to at most 256 partitions I
> mod
> > > > whatever partition I would actually write to by 256. Basically the
> > number
> > > > of partitions for this topic on the broker remains the same at *1024*
> > > > partitions but the number of partitions my producers write to changes
> > > > dynamically based on a run time flag. So something like this:
> > > >
> > > > int partition = getPartitionForMessage(message);
> > > > int maxPartitionsToWriteTo = maxPartitionsFlag.get();   // This flag
> > can
> > > be
> > > > updated without bringing the application down - just a volatile read
> of
> > > > some number set externally.
> > > > int moddedPartition = partition % maxPartitionsToWrite.
> > > > // Send a message to this Kafka partition.
> > > >
> > > > Here are some interesting things I've noticed:
> > > >
> > > > i) When I start my client and it *never writes* to more than *8
> > > > partitions *(same
> > > > data rate but fewer partitions) - the end to end *99th latency is
> > 300-350
> > > > ms*. Quite a bit of this (numbers in my previous emails) is the
> latency
> > > > from producer -> broker and the latency from broker -> consumer.
> Still
> > > > nowhere as poor as the *20 - 30* seconds I was seeing.
> > > >
> > > > ii) When I increase the maximum number of partitions, end to end
> > latency
> > > > increases dramatically. At *256 partitions* the end to end *99th
> > latency
> > > is
> > > > still 390 - 418 ms.* Worse than the latency figures for *8
> *partitions,
> > > but
> > > > not by much. When I increase this number to *512 partitions *the end
> > > > to end *99th
> > > > latency *becomes an intolerable *19-24 seconds*. At *1024* partitions
> > the
> > > > *99th
> > > > latency is at 25 - 30 seconds*.
> > > > A table of the numbers:
> > > >
> > > > Max number of partitions written to (out of 1024)
> > > >
> > > > End to end latency
> > > >
> > > > 8
> > > >
> > > > 300 - 350 ms
> > > >
> > > > 256
> > > >
> > > > 390 - 418 ms
> > > >
> > > > 512
> > > >
> > > > 19 - 24 seconds
> > > >
> > > > 1024
> > > >
> > > > 25 - 30 seconds
> > > >
> > > >
> > > > iii) Once I make the max number of partitions high enough, reducing
> it
> > > > doesn't help. For eg: If I go up from *8* to *512 *partitions, the
> > > latency
> > > > goes up. But while the producer is running if I go down from  *512*
> to
> > > > *8 *partitions,
> > > > it doesn't reduce the latency numbers. My guess is that the producer
> is
> > > > creating some state lazily per partition and this state is causing
> the
> > > > latency. Once this state is created, writing to fewer partitions
> > doesn't
> > > > seem to help. Only a restart of the producer calms things down.
> > > >
> > > > So my current plan is to reduce the number of partitions on the
> topic,
> > > but
> > > > there seems to be something deeper going on for the latency numbers
> to
> > be
> > > > so poor to begin with and then suffer so much more (non linearly)
> with
> > > > additional partitions.
> > > >
> > > > Thanks!
> > > >
> > > > On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian <ra...@signalfuse.com>
> > > > wrote:
> > > > >
> > > > > I've done some more measurements. I've also started measuring the
> > > latency
> > > > > between when I ask my producer to send a message and when I get an
> > > > > acknowledgement via the callback. Here is my code:
> > > > >
> > > > > // This function is called on every producer once every 30 seconds.
> > > > >
> > > > > public void addLagMarkers(final Histogram enqueueLag) {
> > > > >
> > > > >         final int numberOfPartitions = 1024;
> > > > >
> > > > >         final long timeOfEnqueue = System.currentTimeMillis();
> > > > >
> > > > >         final Callback callback = new Callback() {
> > > > >
> > > > >             @Override
> > > > >
> > > > >             public void onCompletion(RecordMetadata metadata,
> > Exception
> > > > ex)
> > > > > {
> > > > >
> > > > >                 if (metadata != null) {
> > > > >
> > > > >                     // The difference between ack time from broker
> > and
> > > > > enqueue time.
> > > > >
> > > > >                     final long timeOfAck =
> > System.currentTimeMillis();
> > > > >
> > > > >                     final long lag = timeOfAck - timeOfEnqueue;
> > > > >
> > > > >                     enqueueLag.update(lag);
> > > > >
> > > > >                 }
> > > > >
> > > > >             }
> > > > >
> > > > >         };
> > > > >
> > > > >         for (int i = 0; i < numberOfPartitions; i++) {
> > > > >
> > > > >             try {
> > > > >
> > > > >                 byte[] value = LagMarker.serialize(timeOfEnqueue);
> > //
> > > 10
> > > > > bytes -> short version + long timestamp.
> > > > >
> > > > >                 // This message is later used by the consumers to
> > > measure
> > > > > lag.
> > > > >
> > > > >                 ProducerRecord record = new
> ProducerRecord(MY_TOPIC,
> > i,
> > > > > null, value);
> > > > >
> > > > >                 kafkaProducer.send(record, callback);
> > > > >
> > > > >             } catch (Exception e) {
> > > > >
> > > > >                 // We just dropped a lag marker.
> > > > >
> > > > >             }
> > > > >
> > > > >         }
> > > > >
> > > > >     }
> > > > >
> > > > > The* 99th* on this lag is about* 350 - 400* ms. It's not stellar,
> but
> > > > > doesn't account for the *20-30 second 99th* I see on the end to end
> > > lag.
> > > > > I am consuming in a tight loop on the Consumers (using the
> > > > SimpleConsumer)
> > > > > with minimal processing with a *99th fetch time *of *130-140* ms,
> so
> > I
> > > > > don't think that should be a problem either. Completely baffled.
> > > > >
> > > > >
> > > > > Thanks!
> > > > >
> > > > >
> > > > >
> > > > > On Sat, Dec 20, 2014 at 5:51 PM, Rajiv Kurian <
> ra...@signalfuse.com>
> > > > > wrote:
> > > > >>
> > > > >>
> > > > >>
> > > > >> On Sat, Dec 20, 2014 at 3:49 PM, Rajiv Kurian <
> ra...@signalfuse.com
> > >
> > > > >> wrote:
> > > > >>>
> > > > >>> I am trying to replace a Thrift peer to peer API with kafka for a
> > > > >>> particular work flow. I am finding the 99th percentile latency to
> > be
> > > > >>> unacceptable at this time. This entire work load runs in an
> Amazon
> > > > VPC. I'd
> > > > >>> greatly appreciate it if some one has any insights on why I am
> > seeing
> > > > such
> > > > >>> poor numbers. Here are some details and measurements taken:
> > > > >>>
> > > > >>> i) I have a single topic with 1024 partitions that I am writing
> to
> > > from
> > > > >>> six clients using the kafka 0.8.2 beta kafka producer.
> > > > >>>
> > > > >>> ii) I have 3 brokers, each on  a c3 2x machine on ec2. Each of
> > those
> > > > >>> machines has 8 virtual cpus, 15 GB memory and 2 * 80 GB SSDs. The
> > > > broker ->
> > > > >>> partitions mapping was decided by Kafka when I created the topic.
> > > > >>>
> > > > >>> iii) I write about 22 thousand messages per second from across
> the
> > 6
> > > > >>> clients. This number was calculated using distributed counters. I
> > > just
> > > > >>> increment a distributed counter in the callback from my enqueue
> job
> > > if
> > > > the
> > > > >>> metadata returned is not null. I also increment a number o
> dropped
> > > > messages
> > > > >>> counter if the callback has a non-null exception or if there was
> an
> > > > >>> exception in the synchronous send call. The number of dropped
> > > messages
> > > > is
> > > > >>> pretty much always zero. Out of the 6 clients 3 are responsible
> for
> > > > 95% of
> > > > >>> the traffic. Messages are very tiny and have null keys  and 27
> byte
> > > > values
> > > > >>> (2 byte version and 25 byte payload). Again these messages are
> > > written
> > > > >>> using the kafka 0.8.2 client.
> > > > >>>
> > > > >>> iv) I have 3 consumer processes consuming only from this topic.
> > Each
> > > > >>> consumer process is assigned a disjoint set of the 1024
> partitions
> > by
> > > > an
> > > > >>> eternal arbiter. Each consumer process then creates a mapping
> from
> > > > brokers
> > > > >>> -> partitions it has been assigned. It then starts one fetcher
> > thread
> > > > per
> > > > >>> broker. Each thread queries the broker (using the SimpleConsumer)
> > it
> > > > has
> > > > >>> been assigned for partitions such that partitions = (partitions
> on
> > > > broker)
> > > > >>> *∩ *(partitions assigned to the process by the arbiter). So in
> > effect
> > > > >>> there are 9 consumer threads across 3 consumer processes that
> > query a
> > > > >>> disjoint set of partitions for the single topic and amongst
> > > themselves
> > > > they
> > > > >>> manage to consume from every topic. So each thread has a run loop
> > > > where it
> > > > >>> asks for messages, consumes them then asks for more messages.
> When
> > a
> > > > run
> > > > >>> loop starts, it starts by querying for the latest message in a
> > > > partition
> > > > >>> (i.e. discards any previous backup) and then maintains a map of
> > > > partition
> > > > >>> -> nextOffsetToRequest in memory to make sure that it consumes
> > > > messages in
> > > > >>> order.
> > > > >>>
> > > > >> Edit: Mean't external arbiter.
> > > > >>
> > > > >>>
> > > > >>> v) Consumption is really simple. Each message is put on a non
> > > blocking
> > > > >>> efficient ring buffer. If the ring buffer is full the message is
> > > > dropped. I
> > > > >>> measure the mean time between fetches and it is the same as the
> > time
> > > > for
> > > > >>> fetches up to a ms, meaning no matter how many messages are
> > dequeued,
> > > > it
> > > > >>> takes almost no time to process them. At the end of processing a
> > > fetch
> > > > >>> request I increment another distributed counter that counts the
> > > number
> > > > of
> > > > >>> messages processed. This counter tells me that on average I am
> > > > consuming
> > > > >>> the same number of messages/sec that I enqueue on the producers
> > i.e.
> > > > around
> > > > >>> 22 thousand messages/sec.
> > > > >>>
> > > > >>> vi) The 99th percentile of the number of messages fetched per
> fetch
> > > > >>> request is about 500 messages.  The 99th percentile of the time
> it
> > > > takes to
> > > > >>> fetch a batch is abut 130 - 140 ms. I played around with the
> buffer
> > > and
> > > > >>> maxWait settings on the SimpleConsumer and attempting to consume
> > more
> > > > >>> messages was leading the 99th percentile of the fetch time to
> > balloon
> > > > up,
> > > > >>> so I am consuming in smaller batches right now.
> > > > >>>
> > > > >>> vii) Every 30 seconds odd each of the producers inserts a trace
> > > message
> > > > >>> into each partition (so 1024 messages per producer every 30
> > seconds).
> > > > Each
> > > > >>> message only contains the System.currentTimeMillis() at the time
> of
> > > > >>> enqueueing. It is about 9 bytes long. The consumer in step (v)
> > always
> > > > >>> checks to see if a message it dequeued is of this trace type
> > (instead
> > > > of
> > > > >>> the regular message type). This is a simple check on the first 2
> > byte
> > > > >>> version that each message buffer contains. If it is a trace
> message
> > > it
> > > > >>> reads it's payload as a long and uses the difference between the
> > > system
> > > > >>> time on the consumer and this payload and updates a histogram
> with
> > > this
> > > > >>> difference value. Ignoring NTP skew (which I've measured to be in
> > the
> > > > order
> > > > >>> of milliseconds) this is the lag between when a message was
> > enqueued
> > > > on the
> > > > >>> producer and when it was read on the consumer.
> > > > >>>
> > > > >> Edit: Trace messages are 10 bytes long - 2byte version + 8 byte
> long
> > > for
> > > > >> timestamp.
> > > > >>
> > > > >> So pseudocode for every consumer thread (one per broker per
> consumer
> > > > >>> process (9 in total across 3 consumers) is:
> > > > >>>
> > > > >>> void run() {
> > > > >>>
> > > > >>> while (running) {
> > > > >>>
> > > > >>>     FetchRequest fetchRequest = buildFetchRequest(
> > > > >>> partitionsAssignedToThisProcessThatFallOnThisBroker);  // This
> > > > >>> assignment is done by an external arbiter.
> > > > >>>
> > > > >>>     measureTimeBetweenFetchRequests();   // The 99th of this is
> > > > 130-140ms
> > > > >>>
> > > > >>>     FetchResponse fetchResponse = fetchResponse(fetchRequest);
> //
> > > The
> > > > >>> 99th of this is 130-140ms, which is the same as the time between
> > > > fetches.
> > > > >>>
> > > > >>>     processData(fetchResponse);  // The 99th on this is 2-3 ms.
> > > > >>>   }
> > > > >>> }
> > > > >>>
> > > > >>> void processData(FetchResponse response) {
> > > > >>>   try (Timer.Context _ = processWorkerDataTimer.time() {  // The
> > 99th
> > > > on
> > > > >>> this is 2-3 ms.
> > > > >>>     for (Message message : response) {
> > > > >>>        if (typeOf(message) == NORMAL_DATA) {
> > > > >>>          enqueueOnNonBlockingRingBuffer(message);  // Never
> blocks,
> > > > >>> drops message if ring buffer is full.
> > > > >>>        } else if (typeOf(message) == TRACE_DATA) {
> > > > >>>          long timeOfEnqueue = geEnqueueTime(message);
> > > > >>>          long currentTime = System.currentTimeMillis();
> > > > >>>          long difference = currentTime - timeOfEnqueue;
> > > > >>>          lagHistogram.update(difference);
> > > > >>>        }
> > > > >>>      }
> > > > >>>    }
> > > > >>> }
> > > > >>>
> > > > >>> viii) So the kicker is that this lag is really really high:
> > > > >>> Median Lag: *200 ms*. This already seems pretty high and I could
> > even
> > > > >>> discount some of it through NTP skew.
> > > > >>> Mean Lag: *2 - 4 seconds*! Also notice how mean is bigger than
> > median
> > > > >>> kind of telling us how the distribution has a big tail.
> > > > >>> 99th Lag: *20 seconds*!!
> > > > >>>
> > > > >>> I can't quite figure out the source of the lag. Here are some
> other
> > > > >>> things of note:
> > > > >>>
> > > > >>> 1) The brokers in general are at about 40-50% CPU but I see
> > > occasional
> > > > >>> spikes to more than 80% - 95%. This could probably be causing
> > issues.
> > > > I am
> > > > >>> wondering if this is down to the high number of partitions I have
> > and
> > > > how
> > > > >>> each partition ends up receiving not too many messages. 22k
> > messages
> > > > spread
> > > > >>> across 1024 partitions = only 21 odd messages/sec/partition. But
> > each
> > > > >>> broker should be receiving about  7500 messages/sec. It could
> also
> > be
> > > > down
> > > > >>> to the nature of these messages being tiny. I imagine that the
> > kafka
> > > > wire
> > > > >>> protocol is probably close to the 25 byte message payload. It
> also
> > > has
> > > > to
> > > > >>> do a few  CRC32 calculations etc. But again given that these are
> > > C3.2x
> > > > >>> machines and the number of messages is so tiny I'd expect it to
> do
> > > > better.
> > > > >>> But again reading this article
> > > > >>> <
> > > >
> > >
> >
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
> > > >
> > > > about
> > > > >>> 2 million messages/sec across three brokers makes it seem like
> 22k
> > 25
> > > > byte
> > > > >>> messages should be very easy. So my biggest suspicion right now
> is
> > > > that the
> > > > >>> large number of partitions is some how responsible for this
> > latency.
> > > > >>>
> > > > >>> 2) On the consumer given that the 99th time between fetches ==
> 99th
> > > > time
> > > > >>> of a fetch, I don't see how I could do any better. It's pretty
> > close
> > > to
> > > > >>> just getting batches of data, iterating through them and dropping
> > > them
> > > > on
> > > > >>> the floor. My requests on the consumer are built like this:
> > > > >>>
> > > > >>>         FetchRequestBuilder builder = new
> > > > >>> FetchRequestBuilder().clientId(clientName)
> > > > >>>
> > > > >>>                 .maxWait(100).minBytes(1000);  // max wait of 100
> > ms
> > > or
> > > > >>> at least 1000 bytes whichever happens first.
> > > > >>>
> > > > >>>         // Add a fetch request for every partition.
> > > > >>>
> > > > >>>         for (PartitionMetadata partition : partitions) {
> > > > >>>
> > > > >>>             int partitionId = partition.partitionId();
> > > > >>>
> > > > >>>             long offset = readOffsets.get(partition);  // This
> map
> > > > >>> maintains the next partition to be read.
> > > > >>>
> > > > >>>             builder.addFetch(MY_TOPIC, partitionId, offset,
> 3000);
> > > //
> > > > >>> Up to 3000 bytes per fetch request.}
> > > > >>>         }
> > > > >>> 3) On the producer side I use the kafka beta producer. I've
> played
> > > > >>> around with many settings but none of them seem to have made a
> > > > difference.
> > > > >>> Here are my current settings:
> > > > >>>
> > > > >>> ProducerConfig values:
> > > > >>>
> > > > >>>        block.on.buffer.full = false
> > > > >>>
> > > > >>>         retry.backoff.ms = 100
> > > > >>>
> > > > >>>         buffer.memory = 83886080  // Kind of big - could it be
> > adding
> > > > >>> lag?
> > > > >>>
> > > > >>>         batch.size = 40500  // This is also kind of big - could
> it
> > be
> > > > >>> adding lag?
> > > > >>>
> > > > >>>         metrics.sample.window.ms = 30000
> > > > >>>
> > > > >>>         metadata.max.age.ms = 300000
> > > > >>>
> > > > >>>         receive.buffer.bytes = 32768
> > > > >>>
> > > > >>>         timeout.ms = 30000
> > > > >>>
> > > > >>>         max.in.flight.requests.per.connection = 5
> > > > >>>
> > > > >>>         metric.reporters = []
> > > > >>>
> > > > >>>         bootstrap.servers = [broker1, broker2, broker3]
> > > > >>>
> > > > >>>         client.id =
> > > > >>>
> > > > >>>         compression.type = none
> > > > >>>
> > > > >>>         retries = 0
> > > > >>>
> > > > >>>         max.request.size = 1048576
> > > > >>>
> > > > >>>         send.buffer.bytes = 131072
> > > > >>>
> > > > >>>         acks = 1
> > > > >>>
> > > > >>>         reconnect.backoff.ms = 10
> > > > >>>
> > > > >>>         linger.ms = 250  // Based on this config I'd imagine
> that
> > > the
> > > > >>> lag should be bounded to about 250 ms over all.
> > > > >>>
> > > > >>>         metrics.num.samples = 2
> > > > >>>
> > > > >>>         metadata.fetch.timeout.ms = 60000
> > > > >>>
> > > > >>> I know this is a lot of information. I just wanted to provide as
> > much
> > > > >>> context as possible.
> > > > >>>
> > > > >>> Thanks in advance!
> > > > >>>
> > > > >>>
> > > >
> > >
> >
>

Reply via email to