In case the attachments don't work out here is an imgur link -
http://imgur.com/NslGpT3,Uw6HFow#0

On Mon, Dec 29, 2014 at 3:13 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Never mind about (2). I see these stats are already being output by the
> kafka producer. I've attached a couple of screenshots (couldn't copy paste
> from jvisualvm ). Do any of these things strike as odd? The
> bufferpool-wait-ratio sadly shows up as a NaN.
>
> I still don't know how to figure out (1).
>
> Thanks!
>
>
> On Mon, Dec 29, 2014 at 3:02 PM, Rajiv Kurian <ra...@signalfuse.com>
> wrote:
>
>> Hi Jay,
>>
>> Re (1) - I am not sure how to do this? Actually I am not sure what this
>> means. Is this the time every write/fetch request is received on the
>> broker? Do I need to enable some specific log level for this to show up? It
>> doesn't show up in the usual log. Is this information also available via
>> jmx somehow?
>> Re (2) - Are you saying that I should instrument the "percentage of time
>> waiting for buffer space" stat myself? If so how do I do this. Or are these
>> stats already output to jmx by the kafka producer code. This seems like
>> it's in the internals of the kafka producer client code.
>>
>> Thanks again!
>>
>>
>> On Mon, Dec 29, 2014 at 10:22 AM, Rajiv Kurian <ra...@signalfuse.com>
>> wrote:
>>
>>> Thanks Jay. Will check (1) and (2) and get back to you. The test is not
>>> stand-alone now. It might be a bit of work to extract it to a stand-alone
>>> executable. It might take me a bit of time to get that going.
>>>
>>> On Mon, Dec 29, 2014 at 9:45 AM, Jay Kreps <j...@confluent.io> wrote:
>>>
>>>> Hey Rajiv,
>>>>
>>>> This sounds like a bug. The more info you can help us get the easier to
>>>> fix. Things that would help:
>>>> 1. Can you check if the the request log on the servers shows latency
>>>> spikes
>>>> (in which case it is a server problem)?
>>>> 2. It would be worth also getting the jmx stats on the producer as they
>>>> will show things like what percentage of time it is waiting for buffer
>>>> space etc.
>>>>
>>>> If your test is reasonably stand-alone it would be great to file a JIRA
>>>> and
>>>> attach the test code and the findings you already have so someone can
>>>> dig
>>>> into what is going on.
>>>>
>>>> -Jay
>>>>
>>>> On Sun, Dec 28, 2014 at 7:15 PM, Rajiv Kurian <ra...@signalfuse.com>
>>>> wrote:
>>>>
>>>> > Hi all,
>>>> >
>>>> > Bumping this up, in case some one has any ideas. I did yet another
>>>> > experiment where I create 4 producers and stripe the send requests
>>>> across
>>>> > them in a manner such that any one producer only sees 256 partitions
>>>> > instead of the entire 1024. This seems to have helped a bit, and
>>>> though I
>>>> > still see crazy high 99th (25-30 seconds), the median, mean, 75th and
>>>> 95th
>>>> > percentile have all gone down.
>>>> >
>>>> > Thanks!
>>>> >
>>>> > On Sun, Dec 21, 2014 at 12:27 PM, Thunder Stumpges <
>>>> tstump...@ntent.com>
>>>> > wrote:
>>>> >
>>>> > > Ah I thought it was restarting the broker that made things better :)
>>>> > >
>>>> > > Yeah I have no experience with the Java client so can't really help
>>>> > there.
>>>> > >
>>>> > > Good luck!
>>>> > >
>>>> > > -----Original Message-----
>>>> > > From: Rajiv Kurian [ra...@signalfuse.com]
>>>> > > Received: Sunday, 21 Dec 2014, 12:25PM
>>>> > > To: users@kafka.apache.org [users@kafka.apache.org]
>>>> > > Subject: Re: Trying to figure out kafka latency issues
>>>> > >
>>>> > > I'll take a look at the GC profile of the brokers Right now I keep
>>>> a tab
>>>> > on
>>>> > > the CPU, Messages in, Bytes in, Bytes out, free memory (on the
>>>> machine
>>>> > not
>>>> > > JVM heap) free disk space on the broker. I'll need to take a look
>>>> at the
>>>> > > JVM metrics too. What seemed strange is that going from 8 -> 512
>>>> > partitions
>>>> > > increases the latency, but going fro 512-> 8 does not decrease it.
>>>> I have
>>>> > > to restart the producer (but not the broker) for the end to end
>>>> latency
>>>> > to
>>>> > > go down That made it seem  that the fault was probably with the
>>>> producer
>>>> > > and not the broker. Only restarting the producer made things
>>>> better. I'll
>>>> > > do more extensive measurement on the broker.
>>>> > >
>>>> > > On Sun, Dec 21, 2014 at 9:08 AM, Thunder Stumpges <
>>>> tstump...@ntent.com>
>>>> > > wrote:
>>>> > > >
>>>> > > > Did you see my response and have you checked the server logs
>>>> especially
>>>> > > > the GC logs? It still sounds like you are running out of memory
>>>> on the
>>>> > > > broker. What is your max heap memory and are you thrashing once
>>>> you
>>>> > start
>>>> > > > writing to all those partitions?
>>>> > > >
>>>> > > > You have measured very thoroughly from an external point of view,
>>>> i
>>>> > think
>>>> > > > now you'll have to start measuring the internal metrics. Maybe
>>>> someone
>>>> > > else
>>>> > > > will have ideas on what jmx values to watch.
>>>> > > >
>>>> > > > Best,
>>>> > > > Thunder
>>>> > > >
>>>> > > >
>>>> > > > -----Original Message-----
>>>> > > > From: Rajiv Kurian [ra...@signalfuse.com]
>>>> > > > Received: Saturday, 20 Dec 2014, 10:24PM
>>>> > > > To: users@kafka.apache.org [users@kafka.apache.org]
>>>> > > > Subject: Re: Trying to figure out kafka latency issues
>>>> > > >
>>>> > > > Some more work tells me that the end to end latency numbers vary
>>>> with
>>>> > the
>>>> > > > number of partitions I am writing to. I did an experiment, where
>>>> based
>>>> > > on a
>>>> > > > run time flag I would dynamically select how many of the *1024
>>>> > > partitions*
>>>> > > > I write to. So say I decide I'll write to at most 256 partitions
>>>> I mod
>>>> > > > whatever partition I would actually write to by 256. Basically the
>>>> > number
>>>> > > > of partitions for this topic on the broker remains the same at
>>>> *1024*
>>>> > > > partitions but the number of partitions my producers write to
>>>> changes
>>>> > > > dynamically based on a run time flag. So something like this:
>>>> > > >
>>>> > > > int partition = getPartitionForMessage(message);
>>>> > > > int maxPartitionsToWriteTo = maxPartitionsFlag.get();   // This
>>>> flag
>>>> > can
>>>> > > be
>>>> > > > updated without bringing the application down - just a volatile
>>>> read of
>>>> > > > some number set externally.
>>>> > > > int moddedPartition = partition % maxPartitionsToWrite.
>>>> > > > // Send a message to this Kafka partition.
>>>> > > >
>>>> > > > Here are some interesting things I've noticed:
>>>> > > >
>>>> > > > i) When I start my client and it *never writes* to more than *8
>>>> > > > partitions *(same
>>>> > > > data rate but fewer partitions) - the end to end *99th latency is
>>>> > 300-350
>>>> > > > ms*. Quite a bit of this (numbers in my previous emails) is the
>>>> latency
>>>> > > > from producer -> broker and the latency from broker -> consumer.
>>>> Still
>>>> > > > nowhere as poor as the *20 - 30* seconds I was seeing.
>>>> > > >
>>>> > > > ii) When I increase the maximum number of partitions, end to end
>>>> > latency
>>>> > > > increases dramatically. At *256 partitions* the end to end *99th
>>>> > latency
>>>> > > is
>>>> > > > still 390 - 418 ms.* Worse than the latency figures for *8
>>>> *partitions,
>>>> > > but
>>>> > > > not by much. When I increase this number to *512 partitions *the
>>>> end
>>>> > > > to end *99th
>>>> > > > latency *becomes an intolerable *19-24 seconds*. At *1024*
>>>> partitions
>>>> > the
>>>> > > > *99th
>>>> > > > latency is at 25 - 30 seconds*.
>>>> > > > A table of the numbers:
>>>> > > >
>>>> > > > Max number of partitions written to (out of 1024)
>>>> > > >
>>>> > > > End to end latency
>>>> > > >
>>>> > > > 8
>>>> > > >
>>>> > > > 300 - 350 ms
>>>> > > >
>>>> > > > 256
>>>> > > >
>>>> > > > 390 - 418 ms
>>>> > > >
>>>> > > > 512
>>>> > > >
>>>> > > > 19 - 24 seconds
>>>> > > >
>>>> > > > 1024
>>>> > > >
>>>> > > > 25 - 30 seconds
>>>> > > >
>>>> > > >
>>>> > > > iii) Once I make the max number of partitions high enough,
>>>> reducing it
>>>> > > > doesn't help. For eg: If I go up from *8* to *512 *partitions, the
>>>> > > latency
>>>> > > > goes up. But while the producer is running if I go down from
>>>> *512* to
>>>> > > > *8 *partitions,
>>>> > > > it doesn't reduce the latency numbers. My guess is that the
>>>> producer is
>>>> > > > creating some state lazily per partition and this state is
>>>> causing the
>>>> > > > latency. Once this state is created, writing to fewer partitions
>>>> > doesn't
>>>> > > > seem to help. Only a restart of the producer calms things down.
>>>> > > >
>>>> > > > So my current plan is to reduce the number of partitions on the
>>>> topic,
>>>> > > but
>>>> > > > there seems to be something deeper going on for the latency
>>>> numbers to
>>>> > be
>>>> > > > so poor to begin with and then suffer so much more (non linearly)
>>>> with
>>>> > > > additional partitions.
>>>> > > >
>>>> > > > Thanks!
>>>> > > >
>>>> > > > On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian <
>>>> ra...@signalfuse.com>
>>>> > > > wrote:
>>>> > > > >
>>>> > > > > I've done some more measurements. I've also started measuring
>>>> the
>>>> > > latency
>>>> > > > > between when I ask my producer to send a message and when I get
>>>> an
>>>> > > > > acknowledgement via the callback. Here is my code:
>>>> > > > >
>>>> > > > > // This function is called on every producer once every 30
>>>> seconds.
>>>> > > > >
>>>> > > > > public void addLagMarkers(final Histogram enqueueLag) {
>>>> > > > >
>>>> > > > >         final int numberOfPartitions = 1024;
>>>> > > > >
>>>> > > > >         final long timeOfEnqueue = System.currentTimeMillis();
>>>> > > > >
>>>> > > > >         final Callback callback = new Callback() {
>>>> > > > >
>>>> > > > >             @Override
>>>> > > > >
>>>> > > > >             public void onCompletion(RecordMetadata metadata,
>>>> > Exception
>>>> > > > ex)
>>>> > > > > {
>>>> > > > >
>>>> > > > >                 if (metadata != null) {
>>>> > > > >
>>>> > > > >                     // The difference between ack time from
>>>> broker
>>>> > and
>>>> > > > > enqueue time.
>>>> > > > >
>>>> > > > >                     final long timeOfAck =
>>>> > System.currentTimeMillis();
>>>> > > > >
>>>> > > > >                     final long lag = timeOfAck - timeOfEnqueue;
>>>> > > > >
>>>> > > > >                     enqueueLag.update(lag);
>>>> > > > >
>>>> > > > >                 }
>>>> > > > >
>>>> > > > >             }
>>>> > > > >
>>>> > > > >         };
>>>> > > > >
>>>> > > > >         for (int i = 0; i < numberOfPartitions; i++) {
>>>> > > > >
>>>> > > > >             try {
>>>> > > > >
>>>> > > > >                 byte[] value =
>>>> LagMarker.serialize(timeOfEnqueue);
>>>> > //
>>>> > > 10
>>>> > > > > bytes -> short version + long timestamp.
>>>> > > > >
>>>> > > > >                 // This message is later used by the consumers
>>>> to
>>>> > > measure
>>>> > > > > lag.
>>>> > > > >
>>>> > > > >                 ProducerRecord record = new
>>>> ProducerRecord(MY_TOPIC,
>>>> > i,
>>>> > > > > null, value);
>>>> > > > >
>>>> > > > >                 kafkaProducer.send(record, callback);
>>>> > > > >
>>>> > > > >             } catch (Exception e) {
>>>> > > > >
>>>> > > > >                 // We just dropped a lag marker.
>>>> > > > >
>>>> > > > >             }
>>>> > > > >
>>>> > > > >         }
>>>> > > > >
>>>> > > > >     }
>>>> > > > >
>>>> > > > > The* 99th* on this lag is about* 350 - 400* ms. It's not
>>>> stellar, but
>>>> > > > > doesn't account for the *20-30 second 99th* I see on the end to
>>>> end
>>>> > > lag.
>>>> > > > > I am consuming in a tight loop on the Consumers (using the
>>>> > > > SimpleConsumer)
>>>> > > > > with minimal processing with a *99th fetch time *of *130-140*
>>>> ms, so
>>>> > I
>>>> > > > > don't think that should be a problem either. Completely baffled.
>>>> > > > >
>>>> > > > >
>>>> > > > > Thanks!
>>>> > > > >
>>>> > > > >
>>>> > > > >
>>>> > > > > On Sat, Dec 20, 2014 at 5:51 PM, Rajiv Kurian <
>>>> ra...@signalfuse.com>
>>>> > > > > wrote:
>>>> > > > >>
>>>> > > > >>
>>>> > > > >>
>>>> > > > >> On Sat, Dec 20, 2014 at 3:49 PM, Rajiv Kurian <
>>>> ra...@signalfuse.com
>>>> > >
>>>> > > > >> wrote:
>>>> > > > >>>
>>>> > > > >>> I am trying to replace a Thrift peer to peer API with kafka
>>>> for a
>>>> > > > >>> particular work flow. I am finding the 99th percentile
>>>> latency to
>>>> > be
>>>> > > > >>> unacceptable at this time. This entire work load runs in an
>>>> Amazon
>>>> > > > VPC. I'd
>>>> > > > >>> greatly appreciate it if some one has any insights on why I am
>>>> > seeing
>>>> > > > such
>>>> > > > >>> poor numbers. Here are some details and measurements taken:
>>>> > > > >>>
>>>> > > > >>> i) I have a single topic with 1024 partitions that I am
>>>> writing to
>>>> > > from
>>>> > > > >>> six clients using the kafka 0.8.2 beta kafka producer.
>>>> > > > >>>
>>>> > > > >>> ii) I have 3 brokers, each on  a c3 2x machine on ec2. Each of
>>>> > those
>>>> > > > >>> machines has 8 virtual cpus, 15 GB memory and 2 * 80 GB SSDs.
>>>> The
>>>> > > > broker ->
>>>> > > > >>> partitions mapping was decided by Kafka when I created the
>>>> topic.
>>>> > > > >>>
>>>> > > > >>> iii) I write about 22 thousand messages per second from
>>>> across the
>>>> > 6
>>>> > > > >>> clients. This number was calculated using distributed
>>>> counters. I
>>>> > > just
>>>> > > > >>> increment a distributed counter in the callback from my
>>>> enqueue job
>>>> > > if
>>>> > > > the
>>>> > > > >>> metadata returned is not null. I also increment a number o
>>>> dropped
>>>> > > > messages
>>>> > > > >>> counter if the callback has a non-null exception or if there
>>>> was an
>>>> > > > >>> exception in the synchronous send call. The number of dropped
>>>> > > messages
>>>> > > > is
>>>> > > > >>> pretty much always zero. Out of the 6 clients 3 are
>>>> responsible for
>>>> > > > 95% of
>>>> > > > >>> the traffic. Messages are very tiny and have null keys  and
>>>> 27 byte
>>>> > > > values
>>>> > > > >>> (2 byte version and 25 byte payload). Again these messages are
>>>> > > written
>>>> > > > >>> using the kafka 0.8.2 client.
>>>> > > > >>>
>>>> > > > >>> iv) I have 3 consumer processes consuming only from this
>>>> topic.
>>>> > Each
>>>> > > > >>> consumer process is assigned a disjoint set of the 1024
>>>> partitions
>>>> > by
>>>> > > > an
>>>> > > > >>> eternal arbiter. Each consumer process then creates a mapping
>>>> from
>>>> > > > brokers
>>>> > > > >>> -> partitions it has been assigned. It then starts one fetcher
>>>> > thread
>>>> > > > per
>>>> > > > >>> broker. Each thread queries the broker (using the
>>>> SimpleConsumer)
>>>> > it
>>>> > > > has
>>>> > > > >>> been assigned for partitions such that partitions =
>>>> (partitions on
>>>> > > > broker)
>>>> > > > >>> *∩ *(partitions assigned to the process by the arbiter). So in
>>>> > effect
>>>> > > > >>> there are 9 consumer threads across 3 consumer processes that
>>>> > query a
>>>> > > > >>> disjoint set of partitions for the single topic and amongst
>>>> > > themselves
>>>> > > > they
>>>> > > > >>> manage to consume from every topic. So each thread has a run
>>>> loop
>>>> > > > where it
>>>> > > > >>> asks for messages, consumes them then asks for more messages.
>>>> When
>>>> > a
>>>> > > > run
>>>> > > > >>> loop starts, it starts by querying for the latest message in a
>>>> > > > partition
>>>> > > > >>> (i.e. discards any previous backup) and then maintains a map
>>>> of
>>>> > > > partition
>>>> > > > >>> -> nextOffsetToRequest in memory to make sure that it consumes
>>>> > > > messages in
>>>> > > > >>> order.
>>>> > > > >>>
>>>> > > > >> Edit: Mean't external arbiter.
>>>> > > > >>
>>>> > > > >>>
>>>> > > > >>> v) Consumption is really simple. Each message is put on a non
>>>> > > blocking
>>>> > > > >>> efficient ring buffer. If the ring buffer is full the message
>>>> is
>>>> > > > dropped. I
>>>> > > > >>> measure the mean time between fetches and it is the same as
>>>> the
>>>> > time
>>>> > > > for
>>>> > > > >>> fetches up to a ms, meaning no matter how many messages are
>>>> > dequeued,
>>>> > > > it
>>>> > > > >>> takes almost no time to process them. At the end of
>>>> processing a
>>>> > > fetch
>>>> > > > >>> request I increment another distributed counter that counts
>>>> the
>>>> > > number
>>>> > > > of
>>>> > > > >>> messages processed. This counter tells me that on average I am
>>>> > > > consuming
>>>> > > > >>> the same number of messages/sec that I enqueue on the
>>>> producers
>>>> > i.e.
>>>> > > > around
>>>> > > > >>> 22 thousand messages/sec.
>>>> > > > >>>
>>>> > > > >>> vi) The 99th percentile of the number of messages fetched per
>>>> fetch
>>>> > > > >>> request is about 500 messages.  The 99th percentile of the
>>>> time it
>>>> > > > takes to
>>>> > > > >>> fetch a batch is abut 130 - 140 ms. I played around with the
>>>> buffer
>>>> > > and
>>>> > > > >>> maxWait settings on the SimpleConsumer and attempting to
>>>> consume
>>>> > more
>>>> > > > >>> messages was leading the 99th percentile of the fetch time to
>>>> > balloon
>>>> > > > up,
>>>> > > > >>> so I am consuming in smaller batches right now.
>>>> > > > >>>
>>>> > > > >>> vii) Every 30 seconds odd each of the producers inserts a
>>>> trace
>>>> > > message
>>>> > > > >>> into each partition (so 1024 messages per producer every 30
>>>> > seconds).
>>>> > > > Each
>>>> > > > >>> message only contains the System.currentTimeMillis() at the
>>>> time of
>>>> > > > >>> enqueueing. It is about 9 bytes long. The consumer in step (v)
>>>> > always
>>>> > > > >>> checks to see if a message it dequeued is of this trace type
>>>> > (instead
>>>> > > > of
>>>> > > > >>> the regular message type). This is a simple check on the
>>>> first 2
>>>> > byte
>>>> > > > >>> version that each message buffer contains. If it is a trace
>>>> message
>>>> > > it
>>>> > > > >>> reads it's payload as a long and uses the difference between
>>>> the
>>>> > > system
>>>> > > > >>> time on the consumer and this payload and updates a histogram
>>>> with
>>>> > > this
>>>> > > > >>> difference value. Ignoring NTP skew (which I've measured to
>>>> be in
>>>> > the
>>>> > > > order
>>>> > > > >>> of milliseconds) this is the lag between when a message was
>>>> > enqueued
>>>> > > > on the
>>>> > > > >>> producer and when it was read on the consumer.
>>>> > > > >>>
>>>> > > > >> Edit: Trace messages are 10 bytes long - 2byte version + 8
>>>> byte long
>>>> > > for
>>>> > > > >> timestamp.
>>>> > > > >>
>>>> > > > >> So pseudocode for every consumer thread (one per broker per
>>>> consumer
>>>> > > > >>> process (9 in total across 3 consumers) is:
>>>> > > > >>>
>>>> > > > >>> void run() {
>>>> > > > >>>
>>>> > > > >>> while (running) {
>>>> > > > >>>
>>>> > > > >>>     FetchRequest fetchRequest = buildFetchRequest(
>>>> > > > >>> partitionsAssignedToThisProcessThatFallOnThisBroker);  // This
>>>> > > > >>> assignment is done by an external arbiter.
>>>> > > > >>>
>>>> > > > >>>     measureTimeBetweenFetchRequests();   // The 99th of this
>>>> is
>>>> > > > 130-140ms
>>>> > > > >>>
>>>> > > > >>>     FetchResponse fetchResponse =
>>>> fetchResponse(fetchRequest);  //
>>>> > > The
>>>> > > > >>> 99th of this is 130-140ms, which is the same as the time
>>>> between
>>>> > > > fetches.
>>>> > > > >>>
>>>> > > > >>>     processData(fetchResponse);  // The 99th on this is 2-3
>>>> ms.
>>>> > > > >>>   }
>>>> > > > >>> }
>>>> > > > >>>
>>>> > > > >>> void processData(FetchResponse response) {
>>>> > > > >>>   try (Timer.Context _ = processWorkerDataTimer.time() {  //
>>>> The
>>>> > 99th
>>>> > > > on
>>>> > > > >>> this is 2-3 ms.
>>>> > > > >>>     for (Message message : response) {
>>>> > > > >>>        if (typeOf(message) == NORMAL_DATA) {
>>>> > > > >>>          enqueueOnNonBlockingRingBuffer(message);  // Never
>>>> blocks,
>>>> > > > >>> drops message if ring buffer is full.
>>>> > > > >>>        } else if (typeOf(message) == TRACE_DATA) {
>>>> > > > >>>          long timeOfEnqueue = geEnqueueTime(message);
>>>> > > > >>>          long currentTime = System.currentTimeMillis();
>>>> > > > >>>          long difference = currentTime - timeOfEnqueue;
>>>> > > > >>>          lagHistogram.update(difference);
>>>> > > > >>>        }
>>>> > > > >>>      }
>>>> > > > >>>    }
>>>> > > > >>> }
>>>> > > > >>>
>>>> > > > >>> viii) So the kicker is that this lag is really really high:
>>>> > > > >>> Median Lag: *200 ms*. This already seems pretty high and I
>>>> could
>>>> > even
>>>> > > > >>> discount some of it through NTP skew.
>>>> > > > >>> Mean Lag: *2 - 4 seconds*! Also notice how mean is bigger than
>>>> > median
>>>> > > > >>> kind of telling us how the distribution has a big tail.
>>>> > > > >>> 99th Lag: *20 seconds*!!
>>>> > > > >>>
>>>> > > > >>> I can't quite figure out the source of the lag. Here are some
>>>> other
>>>> > > > >>> things of note:
>>>> > > > >>>
>>>> > > > >>> 1) The brokers in general are at about 40-50% CPU but I see
>>>> > > occasional
>>>> > > > >>> spikes to more than 80% - 95%. This could probably be causing
>>>> > issues.
>>>> > > > I am
>>>> > > > >>> wondering if this is down to the high number of partitions I
>>>> have
>>>> > and
>>>> > > > how
>>>> > > > >>> each partition ends up receiving not too many messages. 22k
>>>> > messages
>>>> > > > spread
>>>> > > > >>> across 1024 partitions = only 21 odd messages/sec/partition.
>>>> But
>>>> > each
>>>> > > > >>> broker should be receiving about  7500 messages/sec. It could
>>>> also
>>>> > be
>>>> > > > down
>>>> > > > >>> to the nature of these messages being tiny. I imagine that the
>>>> > kafka
>>>> > > > wire
>>>> > > > >>> protocol is probably close to the 25 byte message payload. It
>>>> also
>>>> > > has
>>>> > > > to
>>>> > > > >>> do a few  CRC32 calculations etc. But again given that these
>>>> are
>>>> > > C3.2x
>>>> > > > >>> machines and the number of messages is so tiny I'd expect it
>>>> to do
>>>> > > > better.
>>>> > > > >>> But again reading this article
>>>> > > > >>> <
>>>> > > >
>>>> > >
>>>> >
>>>> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
>>>> > > >
>>>> > > > about
>>>> > > > >>> 2 million messages/sec across three brokers makes it seem
>>>> like 22k
>>>> > 25
>>>> > > > byte
>>>> > > > >>> messages should be very easy. So my biggest suspicion right
>>>> now is
>>>> > > > that the
>>>> > > > >>> large number of partitions is some how responsible for this
>>>> > latency.
>>>> > > > >>>
>>>> > > > >>> 2) On the consumer given that the 99th time between fetches
>>>> == 99th
>>>> > > > time
>>>> > > > >>> of a fetch, I don't see how I could do any better. It's pretty
>>>> > close
>>>> > > to
>>>> > > > >>> just getting batches of data, iterating through them and
>>>> dropping
>>>> > > them
>>>> > > > on
>>>> > > > >>> the floor. My requests on the consumer are built like this:
>>>> > > > >>>
>>>> > > > >>>         FetchRequestBuilder builder = new
>>>> > > > >>> FetchRequestBuilder().clientId(clientName)
>>>> > > > >>>
>>>> > > > >>>                 .maxWait(100).minBytes(1000);  // max wait of
>>>> 100
>>>> > ms
>>>> > > or
>>>> > > > >>> at least 1000 bytes whichever happens first.
>>>> > > > >>>
>>>> > > > >>>         // Add a fetch request for every partition.
>>>> > > > >>>
>>>> > > > >>>         for (PartitionMetadata partition : partitions) {
>>>> > > > >>>
>>>> > > > >>>             int partitionId = partition.partitionId();
>>>> > > > >>>
>>>> > > > >>>             long offset = readOffsets.get(partition);  //
>>>> This map
>>>> > > > >>> maintains the next partition to be read.
>>>> > > > >>>
>>>> > > > >>>             builder.addFetch(MY_TOPIC, partitionId, offset,
>>>> 3000);
>>>> > > //
>>>> > > > >>> Up to 3000 bytes per fetch request.}
>>>> > > > >>>         }
>>>> > > > >>> 3) On the producer side I use the kafka beta producer. I've
>>>> played
>>>> > > > >>> around with many settings but none of them seem to have made a
>>>> > > > difference.
>>>> > > > >>> Here are my current settings:
>>>> > > > >>>
>>>> > > > >>> ProducerConfig values:
>>>> > > > >>>
>>>> > > > >>>        block.on.buffer.full = false
>>>> > > > >>>
>>>> > > > >>>         retry.backoff.ms = 100
>>>> > > > >>>
>>>> > > > >>>         buffer.memory = 83886080  // Kind of big - could it be
>>>> > adding
>>>> > > > >>> lag?
>>>> > > > >>>
>>>> > > > >>>         batch.size = 40500  // This is also kind of big -
>>>> could it
>>>> > be
>>>> > > > >>> adding lag?
>>>> > > > >>>
>>>> > > > >>>         metrics.sample.window.ms = 30000
>>>> > > > >>>
>>>> > > > >>>         metadata.max.age.ms = 300000
>>>> > > > >>>
>>>> > > > >>>         receive.buffer.bytes = 32768
>>>> > > > >>>
>>>> > > > >>>         timeout.ms = 30000
>>>> > > > >>>
>>>> > > > >>>         max.in.flight.requests.per.connection = 5
>>>> > > > >>>
>>>> > > > >>>         metric.reporters = []
>>>> > > > >>>
>>>> > > > >>>         bootstrap.servers = [broker1, broker2, broker3]
>>>> > > > >>>
>>>> > > > >>>         client.id =
>>>> > > > >>>
>>>> > > > >>>         compression.type = none
>>>> > > > >>>
>>>> > > > >>>         retries = 0
>>>> > > > >>>
>>>> > > > >>>         max.request.size = 1048576
>>>> > > > >>>
>>>> > > > >>>         send.buffer.bytes = 131072
>>>> > > > >>>
>>>> > > > >>>         acks = 1
>>>> > > > >>>
>>>> > > > >>>         reconnect.backoff.ms = 10
>>>> > > > >>>
>>>> > > > >>>         linger.ms = 250  // Based on this config I'd imagine
>>>> that
>>>> > > the
>>>> > > > >>> lag should be bounded to about 250 ms over all.
>>>> > > > >>>
>>>> > > > >>>         metrics.num.samples = 2
>>>> > > > >>>
>>>> > > > >>>         metadata.fetch.timeout.ms = 60000
>>>> > > > >>>
>>>> > > > >>> I know this is a lot of information. I just wanted to provide
>>>> as
>>>> > much
>>>> > > > >>> context as possible.
>>>> > > > >>>
>>>> > > > >>> Thanks in advance!
>>>> > > > >>>
>>>> > > > >>>
>>>> > > >
>>>> > >
>>>> >
>>>>
>>>
>>>
>>
>

Reply via email to