In case the attachments don't work out here is an imgur link - http://imgur.com/NslGpT3,Uw6HFow#0
On Mon, Dec 29, 2014 at 3:13 PM, Rajiv Kurian <[email protected]> wrote: > Never mind about (2). I see these stats are already being output by the > kafka producer. I've attached a couple of screenshots (couldn't copy paste > from jvisualvm ). Do any of these things strike as odd? The > bufferpool-wait-ratio sadly shows up as a NaN. > > I still don't know how to figure out (1). > > Thanks! > > > On Mon, Dec 29, 2014 at 3:02 PM, Rajiv Kurian <[email protected]> > wrote: > >> Hi Jay, >> >> Re (1) - I am not sure how to do this? Actually I am not sure what this >> means. Is this the time every write/fetch request is received on the >> broker? Do I need to enable some specific log level for this to show up? It >> doesn't show up in the usual log. Is this information also available via >> jmx somehow? >> Re (2) - Are you saying that I should instrument the "percentage of time >> waiting for buffer space" stat myself? If so how do I do this. Or are these >> stats already output to jmx by the kafka producer code. This seems like >> it's in the internals of the kafka producer client code. >> >> Thanks again! >> >> >> On Mon, Dec 29, 2014 at 10:22 AM, Rajiv Kurian <[email protected]> >> wrote: >> >>> Thanks Jay. Will check (1) and (2) and get back to you. The test is not >>> stand-alone now. It might be a bit of work to extract it to a stand-alone >>> executable. It might take me a bit of time to get that going. >>> >>> On Mon, Dec 29, 2014 at 9:45 AM, Jay Kreps <[email protected]> wrote: >>> >>>> Hey Rajiv, >>>> >>>> This sounds like a bug. The more info you can help us get the easier to >>>> fix. Things that would help: >>>> 1. Can you check if the the request log on the servers shows latency >>>> spikes >>>> (in which case it is a server problem)? >>>> 2. It would be worth also getting the jmx stats on the producer as they >>>> will show things like what percentage of time it is waiting for buffer >>>> space etc. >>>> >>>> If your test is reasonably stand-alone it would be great to file a JIRA >>>> and >>>> attach the test code and the findings you already have so someone can >>>> dig >>>> into what is going on. >>>> >>>> -Jay >>>> >>>> On Sun, Dec 28, 2014 at 7:15 PM, Rajiv Kurian <[email protected]> >>>> wrote: >>>> >>>> > Hi all, >>>> > >>>> > Bumping this up, in case some one has any ideas. I did yet another >>>> > experiment where I create 4 producers and stripe the send requests >>>> across >>>> > them in a manner such that any one producer only sees 256 partitions >>>> > instead of the entire 1024. This seems to have helped a bit, and >>>> though I >>>> > still see crazy high 99th (25-30 seconds), the median, mean, 75th and >>>> 95th >>>> > percentile have all gone down. >>>> > >>>> > Thanks! >>>> > >>>> > On Sun, Dec 21, 2014 at 12:27 PM, Thunder Stumpges < >>>> [email protected]> >>>> > wrote: >>>> > >>>> > > Ah I thought it was restarting the broker that made things better :) >>>> > > >>>> > > Yeah I have no experience with the Java client so can't really help >>>> > there. >>>> > > >>>> > > Good luck! >>>> > > >>>> > > -----Original Message----- >>>> > > From: Rajiv Kurian [[email protected]] >>>> > > Received: Sunday, 21 Dec 2014, 12:25PM >>>> > > To: [email protected] [[email protected]] >>>> > > Subject: Re: Trying to figure out kafka latency issues >>>> > > >>>> > > I'll take a look at the GC profile of the brokers Right now I keep >>>> a tab >>>> > on >>>> > > the CPU, Messages in, Bytes in, Bytes out, free memory (on the >>>> machine >>>> > not >>>> > > JVM heap) free disk space on the broker. I'll need to take a look >>>> at the >>>> > > JVM metrics too. What seemed strange is that going from 8 -> 512 >>>> > partitions >>>> > > increases the latency, but going fro 512-> 8 does not decrease it. >>>> I have >>>> > > to restart the producer (but not the broker) for the end to end >>>> latency >>>> > to >>>> > > go down That made it seem that the fault was probably with the >>>> producer >>>> > > and not the broker. Only restarting the producer made things >>>> better. I'll >>>> > > do more extensive measurement on the broker. >>>> > > >>>> > > On Sun, Dec 21, 2014 at 9:08 AM, Thunder Stumpges < >>>> [email protected]> >>>> > > wrote: >>>> > > > >>>> > > > Did you see my response and have you checked the server logs >>>> especially >>>> > > > the GC logs? It still sounds like you are running out of memory >>>> on the >>>> > > > broker. What is your max heap memory and are you thrashing once >>>> you >>>> > start >>>> > > > writing to all those partitions? >>>> > > > >>>> > > > You have measured very thoroughly from an external point of view, >>>> i >>>> > think >>>> > > > now you'll have to start measuring the internal metrics. Maybe >>>> someone >>>> > > else >>>> > > > will have ideas on what jmx values to watch. >>>> > > > >>>> > > > Best, >>>> > > > Thunder >>>> > > > >>>> > > > >>>> > > > -----Original Message----- >>>> > > > From: Rajiv Kurian [[email protected]] >>>> > > > Received: Saturday, 20 Dec 2014, 10:24PM >>>> > > > To: [email protected] [[email protected]] >>>> > > > Subject: Re: Trying to figure out kafka latency issues >>>> > > > >>>> > > > Some more work tells me that the end to end latency numbers vary >>>> with >>>> > the >>>> > > > number of partitions I am writing to. I did an experiment, where >>>> based >>>> > > on a >>>> > > > run time flag I would dynamically select how many of the *1024 >>>> > > partitions* >>>> > > > I write to. So say I decide I'll write to at most 256 partitions >>>> I mod >>>> > > > whatever partition I would actually write to by 256. Basically the >>>> > number >>>> > > > of partitions for this topic on the broker remains the same at >>>> *1024* >>>> > > > partitions but the number of partitions my producers write to >>>> changes >>>> > > > dynamically based on a run time flag. So something like this: >>>> > > > >>>> > > > int partition = getPartitionForMessage(message); >>>> > > > int maxPartitionsToWriteTo = maxPartitionsFlag.get(); // This >>>> flag >>>> > can >>>> > > be >>>> > > > updated without bringing the application down - just a volatile >>>> read of >>>> > > > some number set externally. >>>> > > > int moddedPartition = partition % maxPartitionsToWrite. >>>> > > > // Send a message to this Kafka partition. >>>> > > > >>>> > > > Here are some interesting things I've noticed: >>>> > > > >>>> > > > i) When I start my client and it *never writes* to more than *8 >>>> > > > partitions *(same >>>> > > > data rate but fewer partitions) - the end to end *99th latency is >>>> > 300-350 >>>> > > > ms*. Quite a bit of this (numbers in my previous emails) is the >>>> latency >>>> > > > from producer -> broker and the latency from broker -> consumer. >>>> Still >>>> > > > nowhere as poor as the *20 - 30* seconds I was seeing. >>>> > > > >>>> > > > ii) When I increase the maximum number of partitions, end to end >>>> > latency >>>> > > > increases dramatically. At *256 partitions* the end to end *99th >>>> > latency >>>> > > is >>>> > > > still 390 - 418 ms.* Worse than the latency figures for *8 >>>> *partitions, >>>> > > but >>>> > > > not by much. When I increase this number to *512 partitions *the >>>> end >>>> > > > to end *99th >>>> > > > latency *becomes an intolerable *19-24 seconds*. At *1024* >>>> partitions >>>> > the >>>> > > > *99th >>>> > > > latency is at 25 - 30 seconds*. >>>> > > > A table of the numbers: >>>> > > > >>>> > > > Max number of partitions written to (out of 1024) >>>> > > > >>>> > > > End to end latency >>>> > > > >>>> > > > 8 >>>> > > > >>>> > > > 300 - 350 ms >>>> > > > >>>> > > > 256 >>>> > > > >>>> > > > 390 - 418 ms >>>> > > > >>>> > > > 512 >>>> > > > >>>> > > > 19 - 24 seconds >>>> > > > >>>> > > > 1024 >>>> > > > >>>> > > > 25 - 30 seconds >>>> > > > >>>> > > > >>>> > > > iii) Once I make the max number of partitions high enough, >>>> reducing it >>>> > > > doesn't help. For eg: If I go up from *8* to *512 *partitions, the >>>> > > latency >>>> > > > goes up. But while the producer is running if I go down from >>>> *512* to >>>> > > > *8 *partitions, >>>> > > > it doesn't reduce the latency numbers. My guess is that the >>>> producer is >>>> > > > creating some state lazily per partition and this state is >>>> causing the >>>> > > > latency. Once this state is created, writing to fewer partitions >>>> > doesn't >>>> > > > seem to help. Only a restart of the producer calms things down. >>>> > > > >>>> > > > So my current plan is to reduce the number of partitions on the >>>> topic, >>>> > > but >>>> > > > there seems to be something deeper going on for the latency >>>> numbers to >>>> > be >>>> > > > so poor to begin with and then suffer so much more (non linearly) >>>> with >>>> > > > additional partitions. >>>> > > > >>>> > > > Thanks! >>>> > > > >>>> > > > On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian < >>>> [email protected]> >>>> > > > wrote: >>>> > > > > >>>> > > > > I've done some more measurements. I've also started measuring >>>> the >>>> > > latency >>>> > > > > between when I ask my producer to send a message and when I get >>>> an >>>> > > > > acknowledgement via the callback. Here is my code: >>>> > > > > >>>> > > > > // This function is called on every producer once every 30 >>>> seconds. >>>> > > > > >>>> > > > > public void addLagMarkers(final Histogram enqueueLag) { >>>> > > > > >>>> > > > > final int numberOfPartitions = 1024; >>>> > > > > >>>> > > > > final long timeOfEnqueue = System.currentTimeMillis(); >>>> > > > > >>>> > > > > final Callback callback = new Callback() { >>>> > > > > >>>> > > > > @Override >>>> > > > > >>>> > > > > public void onCompletion(RecordMetadata metadata, >>>> > Exception >>>> > > > ex) >>>> > > > > { >>>> > > > > >>>> > > > > if (metadata != null) { >>>> > > > > >>>> > > > > // The difference between ack time from >>>> broker >>>> > and >>>> > > > > enqueue time. >>>> > > > > >>>> > > > > final long timeOfAck = >>>> > System.currentTimeMillis(); >>>> > > > > >>>> > > > > final long lag = timeOfAck - timeOfEnqueue; >>>> > > > > >>>> > > > > enqueueLag.update(lag); >>>> > > > > >>>> > > > > } >>>> > > > > >>>> > > > > } >>>> > > > > >>>> > > > > }; >>>> > > > > >>>> > > > > for (int i = 0; i < numberOfPartitions; i++) { >>>> > > > > >>>> > > > > try { >>>> > > > > >>>> > > > > byte[] value = >>>> LagMarker.serialize(timeOfEnqueue); >>>> > // >>>> > > 10 >>>> > > > > bytes -> short version + long timestamp. >>>> > > > > >>>> > > > > // This message is later used by the consumers >>>> to >>>> > > measure >>>> > > > > lag. >>>> > > > > >>>> > > > > ProducerRecord record = new >>>> ProducerRecord(MY_TOPIC, >>>> > i, >>>> > > > > null, value); >>>> > > > > >>>> > > > > kafkaProducer.send(record, callback); >>>> > > > > >>>> > > > > } catch (Exception e) { >>>> > > > > >>>> > > > > // We just dropped a lag marker. >>>> > > > > >>>> > > > > } >>>> > > > > >>>> > > > > } >>>> > > > > >>>> > > > > } >>>> > > > > >>>> > > > > The* 99th* on this lag is about* 350 - 400* ms. It's not >>>> stellar, but >>>> > > > > doesn't account for the *20-30 second 99th* I see on the end to >>>> end >>>> > > lag. >>>> > > > > I am consuming in a tight loop on the Consumers (using the >>>> > > > SimpleConsumer) >>>> > > > > with minimal processing with a *99th fetch time *of *130-140* >>>> ms, so >>>> > I >>>> > > > > don't think that should be a problem either. Completely baffled. >>>> > > > > >>>> > > > > >>>> > > > > Thanks! >>>> > > > > >>>> > > > > >>>> > > > > >>>> > > > > On Sat, Dec 20, 2014 at 5:51 PM, Rajiv Kurian < >>>> [email protected]> >>>> > > > > wrote: >>>> > > > >> >>>> > > > >> >>>> > > > >> >>>> > > > >> On Sat, Dec 20, 2014 at 3:49 PM, Rajiv Kurian < >>>> [email protected] >>>> > > >>>> > > > >> wrote: >>>> > > > >>> >>>> > > > >>> I am trying to replace a Thrift peer to peer API with kafka >>>> for a >>>> > > > >>> particular work flow. I am finding the 99th percentile >>>> latency to >>>> > be >>>> > > > >>> unacceptable at this time. This entire work load runs in an >>>> Amazon >>>> > > > VPC. I'd >>>> > > > >>> greatly appreciate it if some one has any insights on why I am >>>> > seeing >>>> > > > such >>>> > > > >>> poor numbers. Here are some details and measurements taken: >>>> > > > >>> >>>> > > > >>> i) I have a single topic with 1024 partitions that I am >>>> writing to >>>> > > from >>>> > > > >>> six clients using the kafka 0.8.2 beta kafka producer. >>>> > > > >>> >>>> > > > >>> ii) I have 3 brokers, each on a c3 2x machine on ec2. Each of >>>> > those >>>> > > > >>> machines has 8 virtual cpus, 15 GB memory and 2 * 80 GB SSDs. >>>> The >>>> > > > broker -> >>>> > > > >>> partitions mapping was decided by Kafka when I created the >>>> topic. >>>> > > > >>> >>>> > > > >>> iii) I write about 22 thousand messages per second from >>>> across the >>>> > 6 >>>> > > > >>> clients. This number was calculated using distributed >>>> counters. I >>>> > > just >>>> > > > >>> increment a distributed counter in the callback from my >>>> enqueue job >>>> > > if >>>> > > > the >>>> > > > >>> metadata returned is not null. I also increment a number o >>>> dropped >>>> > > > messages >>>> > > > >>> counter if the callback has a non-null exception or if there >>>> was an >>>> > > > >>> exception in the synchronous send call. The number of dropped >>>> > > messages >>>> > > > is >>>> > > > >>> pretty much always zero. Out of the 6 clients 3 are >>>> responsible for >>>> > > > 95% of >>>> > > > >>> the traffic. Messages are very tiny and have null keys and >>>> 27 byte >>>> > > > values >>>> > > > >>> (2 byte version and 25 byte payload). Again these messages are >>>> > > written >>>> > > > >>> using the kafka 0.8.2 client. >>>> > > > >>> >>>> > > > >>> iv) I have 3 consumer processes consuming only from this >>>> topic. >>>> > Each >>>> > > > >>> consumer process is assigned a disjoint set of the 1024 >>>> partitions >>>> > by >>>> > > > an >>>> > > > >>> eternal arbiter. Each consumer process then creates a mapping >>>> from >>>> > > > brokers >>>> > > > >>> -> partitions it has been assigned. It then starts one fetcher >>>> > thread >>>> > > > per >>>> > > > >>> broker. Each thread queries the broker (using the >>>> SimpleConsumer) >>>> > it >>>> > > > has >>>> > > > >>> been assigned for partitions such that partitions = >>>> (partitions on >>>> > > > broker) >>>> > > > >>> *∩ *(partitions assigned to the process by the arbiter). So in >>>> > effect >>>> > > > >>> there are 9 consumer threads across 3 consumer processes that >>>> > query a >>>> > > > >>> disjoint set of partitions for the single topic and amongst >>>> > > themselves >>>> > > > they >>>> > > > >>> manage to consume from every topic. So each thread has a run >>>> loop >>>> > > > where it >>>> > > > >>> asks for messages, consumes them then asks for more messages. >>>> When >>>> > a >>>> > > > run >>>> > > > >>> loop starts, it starts by querying for the latest message in a >>>> > > > partition >>>> > > > >>> (i.e. discards any previous backup) and then maintains a map >>>> of >>>> > > > partition >>>> > > > >>> -> nextOffsetToRequest in memory to make sure that it consumes >>>> > > > messages in >>>> > > > >>> order. >>>> > > > >>> >>>> > > > >> Edit: Mean't external arbiter. >>>> > > > >> >>>> > > > >>> >>>> > > > >>> v) Consumption is really simple. Each message is put on a non >>>> > > blocking >>>> > > > >>> efficient ring buffer. If the ring buffer is full the message >>>> is >>>> > > > dropped. I >>>> > > > >>> measure the mean time between fetches and it is the same as >>>> the >>>> > time >>>> > > > for >>>> > > > >>> fetches up to a ms, meaning no matter how many messages are >>>> > dequeued, >>>> > > > it >>>> > > > >>> takes almost no time to process them. At the end of >>>> processing a >>>> > > fetch >>>> > > > >>> request I increment another distributed counter that counts >>>> the >>>> > > number >>>> > > > of >>>> > > > >>> messages processed. This counter tells me that on average I am >>>> > > > consuming >>>> > > > >>> the same number of messages/sec that I enqueue on the >>>> producers >>>> > i.e. >>>> > > > around >>>> > > > >>> 22 thousand messages/sec. >>>> > > > >>> >>>> > > > >>> vi) The 99th percentile of the number of messages fetched per >>>> fetch >>>> > > > >>> request is about 500 messages. The 99th percentile of the >>>> time it >>>> > > > takes to >>>> > > > >>> fetch a batch is abut 130 - 140 ms. I played around with the >>>> buffer >>>> > > and >>>> > > > >>> maxWait settings on the SimpleConsumer and attempting to >>>> consume >>>> > more >>>> > > > >>> messages was leading the 99th percentile of the fetch time to >>>> > balloon >>>> > > > up, >>>> > > > >>> so I am consuming in smaller batches right now. >>>> > > > >>> >>>> > > > >>> vii) Every 30 seconds odd each of the producers inserts a >>>> trace >>>> > > message >>>> > > > >>> into each partition (so 1024 messages per producer every 30 >>>> > seconds). >>>> > > > Each >>>> > > > >>> message only contains the System.currentTimeMillis() at the >>>> time of >>>> > > > >>> enqueueing. It is about 9 bytes long. The consumer in step (v) >>>> > always >>>> > > > >>> checks to see if a message it dequeued is of this trace type >>>> > (instead >>>> > > > of >>>> > > > >>> the regular message type). This is a simple check on the >>>> first 2 >>>> > byte >>>> > > > >>> version that each message buffer contains. If it is a trace >>>> message >>>> > > it >>>> > > > >>> reads it's payload as a long and uses the difference between >>>> the >>>> > > system >>>> > > > >>> time on the consumer and this payload and updates a histogram >>>> with >>>> > > this >>>> > > > >>> difference value. Ignoring NTP skew (which I've measured to >>>> be in >>>> > the >>>> > > > order >>>> > > > >>> of milliseconds) this is the lag between when a message was >>>> > enqueued >>>> > > > on the >>>> > > > >>> producer and when it was read on the consumer. >>>> > > > >>> >>>> > > > >> Edit: Trace messages are 10 bytes long - 2byte version + 8 >>>> byte long >>>> > > for >>>> > > > >> timestamp. >>>> > > > >> >>>> > > > >> So pseudocode for every consumer thread (one per broker per >>>> consumer >>>> > > > >>> process (9 in total across 3 consumers) is: >>>> > > > >>> >>>> > > > >>> void run() { >>>> > > > >>> >>>> > > > >>> while (running) { >>>> > > > >>> >>>> > > > >>> FetchRequest fetchRequest = buildFetchRequest( >>>> > > > >>> partitionsAssignedToThisProcessThatFallOnThisBroker); // This >>>> > > > >>> assignment is done by an external arbiter. >>>> > > > >>> >>>> > > > >>> measureTimeBetweenFetchRequests(); // The 99th of this >>>> is >>>> > > > 130-140ms >>>> > > > >>> >>>> > > > >>> FetchResponse fetchResponse = >>>> fetchResponse(fetchRequest); // >>>> > > The >>>> > > > >>> 99th of this is 130-140ms, which is the same as the time >>>> between >>>> > > > fetches. >>>> > > > >>> >>>> > > > >>> processData(fetchResponse); // The 99th on this is 2-3 >>>> ms. >>>> > > > >>> } >>>> > > > >>> } >>>> > > > >>> >>>> > > > >>> void processData(FetchResponse response) { >>>> > > > >>> try (Timer.Context _ = processWorkerDataTimer.time() { // >>>> The >>>> > 99th >>>> > > > on >>>> > > > >>> this is 2-3 ms. >>>> > > > >>> for (Message message : response) { >>>> > > > >>> if (typeOf(message) == NORMAL_DATA) { >>>> > > > >>> enqueueOnNonBlockingRingBuffer(message); // Never >>>> blocks, >>>> > > > >>> drops message if ring buffer is full. >>>> > > > >>> } else if (typeOf(message) == TRACE_DATA) { >>>> > > > >>> long timeOfEnqueue = geEnqueueTime(message); >>>> > > > >>> long currentTime = System.currentTimeMillis(); >>>> > > > >>> long difference = currentTime - timeOfEnqueue; >>>> > > > >>> lagHistogram.update(difference); >>>> > > > >>> } >>>> > > > >>> } >>>> > > > >>> } >>>> > > > >>> } >>>> > > > >>> >>>> > > > >>> viii) So the kicker is that this lag is really really high: >>>> > > > >>> Median Lag: *200 ms*. This already seems pretty high and I >>>> could >>>> > even >>>> > > > >>> discount some of it through NTP skew. >>>> > > > >>> Mean Lag: *2 - 4 seconds*! Also notice how mean is bigger than >>>> > median >>>> > > > >>> kind of telling us how the distribution has a big tail. >>>> > > > >>> 99th Lag: *20 seconds*!! >>>> > > > >>> >>>> > > > >>> I can't quite figure out the source of the lag. Here are some >>>> other >>>> > > > >>> things of note: >>>> > > > >>> >>>> > > > >>> 1) The brokers in general are at about 40-50% CPU but I see >>>> > > occasional >>>> > > > >>> spikes to more than 80% - 95%. This could probably be causing >>>> > issues. >>>> > > > I am >>>> > > > >>> wondering if this is down to the high number of partitions I >>>> have >>>> > and >>>> > > > how >>>> > > > >>> each partition ends up receiving not too many messages. 22k >>>> > messages >>>> > > > spread >>>> > > > >>> across 1024 partitions = only 21 odd messages/sec/partition. >>>> But >>>> > each >>>> > > > >>> broker should be receiving about 7500 messages/sec. It could >>>> also >>>> > be >>>> > > > down >>>> > > > >>> to the nature of these messages being tiny. I imagine that the >>>> > kafka >>>> > > > wire >>>> > > > >>> protocol is probably close to the 25 byte message payload. It >>>> also >>>> > > has >>>> > > > to >>>> > > > >>> do a few CRC32 calculations etc. But again given that these >>>> are >>>> > > C3.2x >>>> > > > >>> machines and the number of messages is so tiny I'd expect it >>>> to do >>>> > > > better. >>>> > > > >>> But again reading this article >>>> > > > >>> < >>>> > > > >>>> > > >>>> > >>>> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines >>>> > > > >>>> > > > about >>>> > > > >>> 2 million messages/sec across three brokers makes it seem >>>> like 22k >>>> > 25 >>>> > > > byte >>>> > > > >>> messages should be very easy. So my biggest suspicion right >>>> now is >>>> > > > that the >>>> > > > >>> large number of partitions is some how responsible for this >>>> > latency. >>>> > > > >>> >>>> > > > >>> 2) On the consumer given that the 99th time between fetches >>>> == 99th >>>> > > > time >>>> > > > >>> of a fetch, I don't see how I could do any better. It's pretty >>>> > close >>>> > > to >>>> > > > >>> just getting batches of data, iterating through them and >>>> dropping >>>> > > them >>>> > > > on >>>> > > > >>> the floor. My requests on the consumer are built like this: >>>> > > > >>> >>>> > > > >>> FetchRequestBuilder builder = new >>>> > > > >>> FetchRequestBuilder().clientId(clientName) >>>> > > > >>> >>>> > > > >>> .maxWait(100).minBytes(1000); // max wait of >>>> 100 >>>> > ms >>>> > > or >>>> > > > >>> at least 1000 bytes whichever happens first. >>>> > > > >>> >>>> > > > >>> // Add a fetch request for every partition. >>>> > > > >>> >>>> > > > >>> for (PartitionMetadata partition : partitions) { >>>> > > > >>> >>>> > > > >>> int partitionId = partition.partitionId(); >>>> > > > >>> >>>> > > > >>> long offset = readOffsets.get(partition); // >>>> This map >>>> > > > >>> maintains the next partition to be read. >>>> > > > >>> >>>> > > > >>> builder.addFetch(MY_TOPIC, partitionId, offset, >>>> 3000); >>>> > > // >>>> > > > >>> Up to 3000 bytes per fetch request.} >>>> > > > >>> } >>>> > > > >>> 3) On the producer side I use the kafka beta producer. I've >>>> played >>>> > > > >>> around with many settings but none of them seem to have made a >>>> > > > difference. >>>> > > > >>> Here are my current settings: >>>> > > > >>> >>>> > > > >>> ProducerConfig values: >>>> > > > >>> >>>> > > > >>> block.on.buffer.full = false >>>> > > > >>> >>>> > > > >>> retry.backoff.ms = 100 >>>> > > > >>> >>>> > > > >>> buffer.memory = 83886080 // Kind of big - could it be >>>> > adding >>>> > > > >>> lag? >>>> > > > >>> >>>> > > > >>> batch.size = 40500 // This is also kind of big - >>>> could it >>>> > be >>>> > > > >>> adding lag? >>>> > > > >>> >>>> > > > >>> metrics.sample.window.ms = 30000 >>>> > > > >>> >>>> > > > >>> metadata.max.age.ms = 300000 >>>> > > > >>> >>>> > > > >>> receive.buffer.bytes = 32768 >>>> > > > >>> >>>> > > > >>> timeout.ms = 30000 >>>> > > > >>> >>>> > > > >>> max.in.flight.requests.per.connection = 5 >>>> > > > >>> >>>> > > > >>> metric.reporters = [] >>>> > > > >>> >>>> > > > >>> bootstrap.servers = [broker1, broker2, broker3] >>>> > > > >>> >>>> > > > >>> client.id = >>>> > > > >>> >>>> > > > >>> compression.type = none >>>> > > > >>> >>>> > > > >>> retries = 0 >>>> > > > >>> >>>> > > > >>> max.request.size = 1048576 >>>> > > > >>> >>>> > > > >>> send.buffer.bytes = 131072 >>>> > > > >>> >>>> > > > >>> acks = 1 >>>> > > > >>> >>>> > > > >>> reconnect.backoff.ms = 10 >>>> > > > >>> >>>> > > > >>> linger.ms = 250 // Based on this config I'd imagine >>>> that >>>> > > the >>>> > > > >>> lag should be bounded to about 250 ms over all. >>>> > > > >>> >>>> > > > >>> metrics.num.samples = 2 >>>> > > > >>> >>>> > > > >>> metadata.fetch.timeout.ms = 60000 >>>> > > > >>> >>>> > > > >>> I know this is a lot of information. I just wanted to provide >>>> as >>>> > much >>>> > > > >>> context as possible. >>>> > > > >>> >>>> > > > >>> Thanks in advance! >>>> > > > >>> >>>> > > > >>> >>>> > > > >>>> > > >>>> > >>>> >>> >>> >> >
