Edit: I had to set kafka.request.logger=TRACE to see the request timings.

On Tue, Dec 30, 2014 at 4:37 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Got it. I had to enable to see the logs. Here are two files with the
> timings I got from the period I had logging enabled:
> Producer Requests :
> https://docs.google.com/spreadsheets/d/1spVFWsf7T8ZmwM0JMYnkBoyYufRhHav50sueFA9ztP8/edit?usp=sharing
>
> Fetch Requests:
> https://docs.google.com/spreadsheets/d/1PMGslvlttNQOd1uFQDkgPOY31nSK9TZYFy7oa75QaWA/edit?usp=sharing
>
> Things I noted:
> 1) The producer request totalTime barely goes above 100 ms and when it
> does, it is always due to a high remoteTime. Again given this is barely
> ever above 100ms this doesn't look that bad. Maybe I need to keep it on for
> longer to get more samples. The overall time on the producer does seem a
> bit on the high side though even though it definitely doesn't account for
> the 20-30 seconds end to end time I saw.
>
> 2) The fetch request totalTime does frequently go to a 100 ms odd. It is
> also always due to a high remoteTime. The remoteTime is in the ballpark of
> 100ms always. I specify a maxWait of 100ms on the client when making my
> fetch requests, so it might be this 100ms that I am seeing. My guess is
> that the buffer sizes I specified don't get filled all the time, so the
> maxWait timeout is hit and we end up waiting a 100 ms on the broker. Again
> I don't see how the end to end time could be so poor.
>
> From these timings (maybe I don't have enough samples) it seems like the
> problem is not in the brokers. Does that seem like a legitimate conclusion?
> I might have to measure for a longer period, to get the tail. I am planning
> on extracting the producer and consumer code to get a self-contained load
> test going. I'll do the same end to end lag measurements and see if it's my
> environment that is adding this lag somehow.
>
> Thanks!
>
>
> On Tue, Dec 30, 2014 at 11:58 AM, Jay Kreps <j...@confluent.io> wrote:
>
>> The produce to the local log is the time taken to write to the local
>> filesystem. If you see that spike up then that is the culprit.
>>
>> What I was referring to is the line in kafka-request.log that looks
>> something like:
>> "Completed request:%s from client
>>
>> %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
>> This breaks down the total request time and how much was spent on local
>> I/O
>> etc. If totalTime is always small the server is not to blame. If it
>> spikes,
>> then the question is where is that time going and this will answer that.
>>
>> You actually raise a good point about that other log message. It says "4
>> bytes written to log xyz" but what it is actually logging is the number of
>> messages not the number of bytes, so that is quite misleading and a bug.
>>
>> -Jay
>>
>> On Mon, Dec 29, 2014 at 6:37 PM, Rajiv Kurian <ra...@signalfuse.com>
>> wrote:
>>
>> > Thanks Jay.
>> >
>> > I just used JMX to change the log level on the broker and checked the
>> logs.
>> > I am still not sure what line is exactly telling me how much time a
>> > producer took to process a request. I see log lines of this format:
>> >
>> > 2014-12-30T02:13:44.507Z DEBUG [kafka-request-handler-0            ]
>> > [kafka.server.KafkaApis              ]: [KafkaApi-11] Produce to local
>> log
>> > in 5 ms
>> >
>> > Is this what I should be looking out for? I see producer requests of the
>> > form "2014-12-30T02:13:44.502Z TRACE [kafka-request-handler-0
>>   ]
>> > [kafka.server.KafkaApis              ]: [KafkaApi-11] Handling request:
>> > Name: ProducerRequest; Version: 0; CorrelationId: 36308;  more stuff"
>> but I
>> > can't quite tell when this request was done with. So right now I see
>> that
>> > requests are logged frequently on the brokers (multiple times per
>> second)
>> > but since I don't know when they are finished with I can't tell the
>> total
>> > time.
>> >
>> > Some other things that I found kind of odd are:
>> >
>> > 1) The producer requests seem to have an ack timeout of 30000 ms. I
>> don't
>> > think I set this on the producer. I don't know if this could have
>> anything
>> > to do with the latency problem.
>> >
>> > 2014-12-30T02:13:44.502Z TRACE [kafka-request-handler-0            ]
>> > [kafka.server.KafkaApis              ]: [KafkaApi-11] Handling request:
>> > Name: ProducerRequest; Version: 0; CorrelationId: 36308; ClientId: ;
>> > RequiredAcks: 1; *AckTimeoutMs: 30000 ms*; rest of the stuff.
>> >
>> > 2) I see a bunch of small writes written to my [topic, partition] logs.
>> My
>> > messages are at a minimum 27 bytes, so maybe this is something else.
>> Again
>> > don't know if this is a problem:
>> >
>> > 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-4            ]
>> > [kafka.server.KafkaApis              ]: [KafkaApi-11] 1 bytes written to
>> > log MY.TOPIC-400 beginning at offset 19221923 and ending at offset
>> 19221923
>> >
>> > 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-4            ]
>> > [kafka.server.KafkaApis              ]: [KafkaApi-11] 4 bytes written to
>> > log MY.TOPIC-208 beginning at offset 29438019 and ending at offset
>> 29438022
>> >
>> > 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-3            ]
>> > [kafka.server.KafkaApis              ]: [KafkaApi-11] 1 bytes written to
>> > log MY.TOPIC-163 beginning at offset 14821977 and ending at offset
>> 14821977
>> >
>> > 2014-12-30T02:13:44.500Z TRACE [kafka-request-handler-1            ]
>> > [kafka.server.KafkaApis              ]: [KafkaApi-11] 1 bytes written to
>> > log MY.TOPIC118 beginning at offset 19321510 and ending at offset
>> 19321510
>> >
>> > 2014-12-30T02:13:44.501Z TRACE [kafka-request-handler-3            ]
>> > [kafka.server.KafkaApis              ]: [KafkaApi-11] 3 bytes written to
>> > log MY.TOPIC-463 beginning at offset 28777627 and ending at offset
>> 28777629
>> >
>> > On Mon, Dec 29, 2014 at 5:43 PM, Jay Kreps <j...@confluent.io> wrote:
>> >
>> > > Hey Rajiv,
>> > >
>> > > Yes, if you uncomment the line
>> > >   #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
>> > > in the example log4j.properties file that will enable logging of each
>> > > request including the time taken processing the request. This is the
>> > first
>> > > step for diagnosing latency spikes since this will rule out the
>> server.
>> > You
>> > > may also want to enable DEBUG or TRACE on the producer and see what is
>> > > happening when those spikes occur.
>> > >
>> > > The JMX stats show that at least the producer is not exhausting the
>> I/O
>> > > thread (it is 92% idle) and doesn't seem to be waiting on memory
>> which is
>> > > somewhat surprising to me (the NaN is an artifact of how we compute
>> that
>> > > stat--no allocations took place in the time period measured so it is
>> kind
>> > > of 0/0).
>> > >
>> > > -Jay
>> > >
>> > > On Mon, Dec 29, 2014 at 4:54 PM, Rajiv Kurian <ra...@signalfuse.com>
>> > > wrote:
>> > >
>> > > > In case the attachments don't work out here is an imgur link -
>> > > > http://imgur.com/NslGpT3,Uw6HFow#0
>> > > >
>> > > > On Mon, Dec 29, 2014 at 3:13 PM, Rajiv Kurian <ra...@signalfuse.com
>> >
>> > > > wrote:
>> > > >
>> > > > > Never mind about (2). I see these stats are already being output
>> by
>> > the
>> > > > > kafka producer. I've attached a couple of screenshots (couldn't
>> copy
>> > > > paste
>> > > > > from jvisualvm ). Do any of these things strike as odd? The
>> > > > > bufferpool-wait-ratio sadly shows up as a NaN.
>> > > > >
>> > > > > I still don't know how to figure out (1).
>> > > > >
>> > > > > Thanks!
>> > > > >
>> > > > >
>> > > > > On Mon, Dec 29, 2014 at 3:02 PM, Rajiv Kurian <
>> ra...@signalfuse.com>
>> > > > > wrote:
>> > > > >
>> > > > >> Hi Jay,
>> > > > >>
>> > > > >> Re (1) - I am not sure how to do this? Actually I am not sure
>> what
>> > > this
>> > > > >> means. Is this the time every write/fetch request is received on
>> the
>> > > > >> broker? Do I need to enable some specific log level for this to
>> show
>> > > > up? It
>> > > > >> doesn't show up in the usual log. Is this information also
>> available
>> > > via
>> > > > >> jmx somehow?
>> > > > >> Re (2) - Are you saying that I should instrument the "percentage
>> of
>> > > time
>> > > > >> waiting for buffer space" stat myself? If so how do I do this. Or
>> > are
>> > > > these
>> > > > >> stats already output to jmx by the kafka producer code. This
>> seems
>> > > like
>> > > > >> it's in the internals of the kafka producer client code.
>> > > > >>
>> > > > >> Thanks again!
>> > > > >>
>> > > > >>
>> > > > >> On Mon, Dec 29, 2014 at 10:22 AM, Rajiv Kurian <
>> > ra...@signalfuse.com>
>> > > > >> wrote:
>> > > > >>
>> > > > >>> Thanks Jay. Will check (1) and (2) and get back to you. The
>> test is
>> > > not
>> > > > >>> stand-alone now. It might be a bit of work to extract it to a
>> > > > stand-alone
>> > > > >>> executable. It might take me a bit of time to get that going.
>> > > > >>>
>> > > > >>> On Mon, Dec 29, 2014 at 9:45 AM, Jay Kreps <j...@confluent.io>
>> > wrote:
>> > > > >>>
>> > > > >>>> Hey Rajiv,
>> > > > >>>>
>> > > > >>>> This sounds like a bug. The more info you can help us get the
>> > easier
>> > > > to
>> > > > >>>> fix. Things that would help:
>> > > > >>>> 1. Can you check if the the request log on the servers shows
>> > latency
>> > > > >>>> spikes
>> > > > >>>> (in which case it is a server problem)?
>> > > > >>>> 2. It would be worth also getting the jmx stats on the
>> producer as
>> > > > they
>> > > > >>>> will show things like what percentage of time it is waiting for
>> > > buffer
>> > > > >>>> space etc.
>> > > > >>>>
>> > > > >>>> If your test is reasonably stand-alone it would be great to
>> file a
>> > > > JIRA
>> > > > >>>> and
>> > > > >>>> attach the test code and the findings you already have so
>> someone
>> > > can
>> > > > >>>> dig
>> > > > >>>> into what is going on.
>> > > > >>>>
>> > > > >>>> -Jay
>> > > > >>>>
>> > > > >>>> On Sun, Dec 28, 2014 at 7:15 PM, Rajiv Kurian <
>> > ra...@signalfuse.com
>> > > >
>> > > > >>>> wrote:
>> > > > >>>>
>> > > > >>>> > Hi all,
>> > > > >>>> >
>> > > > >>>> > Bumping this up, in case some one has any ideas. I did yet
>> > another
>> > > > >>>> > experiment where I create 4 producers and stripe the send
>> > requests
>> > > > >>>> across
>> > > > >>>> > them in a manner such that any one producer only sees 256
>> > > partitions
>> > > > >>>> > instead of the entire 1024. This seems to have helped a bit,
>> and
>> > > > >>>> though I
>> > > > >>>> > still see crazy high 99th (25-30 seconds), the median, mean,
>> > 75th
>> > > > and
>> > > > >>>> 95th
>> > > > >>>> > percentile have all gone down.
>> > > > >>>> >
>> > > > >>>> > Thanks!
>> > > > >>>> >
>> > > > >>>> > On Sun, Dec 21, 2014 at 12:27 PM, Thunder Stumpges <
>> > > > >>>> tstump...@ntent.com>
>> > > > >>>> > wrote:
>> > > > >>>> >
>> > > > >>>> > > Ah I thought it was restarting the broker that made things
>> > > better
>> > > > :)
>> > > > >>>> > >
>> > > > >>>> > > Yeah I have no experience with the Java client so can't
>> really
>> > > > help
>> > > > >>>> > there.
>> > > > >>>> > >
>> > > > >>>> > > Good luck!
>> > > > >>>> > >
>> > > > >>>> > > -----Original Message-----
>> > > > >>>> > > From: Rajiv Kurian [ra...@signalfuse.com]
>> > > > >>>> > > Received: Sunday, 21 Dec 2014, 12:25PM
>> > > > >>>> > > To: users@kafka.apache.org [users@kafka.apache.org]
>> > > > >>>> > > Subject: Re: Trying to figure out kafka latency issues
>> > > > >>>> > >
>> > > > >>>> > > I'll take a look at the GC profile of the brokers Right
>> now I
>> > > keep
>> > > > >>>> a tab
>> > > > >>>> > on
>> > > > >>>> > > the CPU, Messages in, Bytes in, Bytes out, free memory (on
>> the
>> > > > >>>> machine
>> > > > >>>> > not
>> > > > >>>> > > JVM heap) free disk space on the broker. I'll need to take
>> a
>> > > look
>> > > > >>>> at the
>> > > > >>>> > > JVM metrics too. What seemed strange is that going from 8
>> ->
>> > 512
>> > > > >>>> > partitions
>> > > > >>>> > > increases the latency, but going fro 512-> 8 does not
>> decrease
>> > > it.
>> > > > >>>> I have
>> > > > >>>> > > to restart the producer (but not the broker) for the end to
>> > end
>> > > > >>>> latency
>> > > > >>>> > to
>> > > > >>>> > > go down That made it seem  that the fault was probably with
>> > the
>> > > > >>>> producer
>> > > > >>>> > > and not the broker. Only restarting the producer made
>> things
>> > > > >>>> better. I'll
>> > > > >>>> > > do more extensive measurement on the broker.
>> > > > >>>> > >
>> > > > >>>> > > On Sun, Dec 21, 2014 at 9:08 AM, Thunder Stumpges <
>> > > > >>>> tstump...@ntent.com>
>> > > > >>>> > > wrote:
>> > > > >>>> > > >
>> > > > >>>> > > > Did you see my response and have you checked the server
>> logs
>> > > > >>>> especially
>> > > > >>>> > > > the GC logs? It still sounds like you are running out of
>> > > memory
>> > > > >>>> on the
>> > > > >>>> > > > broker. What is your max heap memory and are you
>> thrashing
>> > > once
>> > > > >>>> you
>> > > > >>>> > start
>> > > > >>>> > > > writing to all those partitions?
>> > > > >>>> > > >
>> > > > >>>> > > > You have measured very thoroughly from an external point
>> of
>> > > > view,
>> > > > >>>> i
>> > > > >>>> > think
>> > > > >>>> > > > now you'll have to start measuring the internal metrics.
>> > Maybe
>> > > > >>>> someone
>> > > > >>>> > > else
>> > > > >>>> > > > will have ideas on what jmx values to watch.
>> > > > >>>> > > >
>> > > > >>>> > > > Best,
>> > > > >>>> > > > Thunder
>> > > > >>>> > > >
>> > > > >>>> > > >
>> > > > >>>> > > > -----Original Message-----
>> > > > >>>> > > > From: Rajiv Kurian [ra...@signalfuse.com]
>> > > > >>>> > > > Received: Saturday, 20 Dec 2014, 10:24PM
>> > > > >>>> > > > To: users@kafka.apache.org [users@kafka.apache.org]
>> > > > >>>> > > > Subject: Re: Trying to figure out kafka latency issues
>> > > > >>>> > > >
>> > > > >>>> > > > Some more work tells me that the end to end latency
>> numbers
>> > > vary
>> > > > >>>> with
>> > > > >>>> > the
>> > > > >>>> > > > number of partitions I am writing to. I did an
>> experiment,
>> > > where
>> > > > >>>> based
>> > > > >>>> > > on a
>> > > > >>>> > > > run time flag I would dynamically select how many of the
>> > *1024
>> > > > >>>> > > partitions*
>> > > > >>>> > > > I write to. So say I decide I'll write to at most 256
>> > > partitions
>> > > > >>>> I mod
>> > > > >>>> > > > whatever partition I would actually write to by 256.
>> > Basically
>> > > > the
>> > > > >>>> > number
>> > > > >>>> > > > of partitions for this topic on the broker remains the
>> same
>> > at
>> > > > >>>> *1024*
>> > > > >>>> > > > partitions but the number of partitions my producers
>> write
>> > to
>> > > > >>>> changes
>> > > > >>>> > > > dynamically based on a run time flag. So something like
>> > this:
>> > > > >>>> > > >
>> > > > >>>> > > > int partition = getPartitionForMessage(message);
>> > > > >>>> > > > int maxPartitionsToWriteTo = maxPartitionsFlag.get();
>>  //
>> > > This
>> > > > >>>> flag
>> > > > >>>> > can
>> > > > >>>> > > be
>> > > > >>>> > > > updated without bringing the application down - just a
>> > > volatile
>> > > > >>>> read of
>> > > > >>>> > > > some number set externally.
>> > > > >>>> > > > int moddedPartition = partition % maxPartitionsToWrite.
>> > > > >>>> > > > // Send a message to this Kafka partition.
>> > > > >>>> > > >
>> > > > >>>> > > > Here are some interesting things I've noticed:
>> > > > >>>> > > >
>> > > > >>>> > > > i) When I start my client and it *never writes* to more
>> than
>> > > *8
>> > > > >>>> > > > partitions *(same
>> > > > >>>> > > > data rate but fewer partitions) - the end to end *99th
>> > latency
>> > > > is
>> > > > >>>> > 300-350
>> > > > >>>> > > > ms*. Quite a bit of this (numbers in my previous emails)
>> is
>> > > the
>> > > > >>>> latency
>> > > > >>>> > > > from producer -> broker and the latency from broker ->
>> > > consumer.
>> > > > >>>> Still
>> > > > >>>> > > > nowhere as poor as the *20 - 30* seconds I was seeing.
>> > > > >>>> > > >
>> > > > >>>> > > > ii) When I increase the maximum number of partitions,
>> end to
>> > > end
>> > > > >>>> > latency
>> > > > >>>> > > > increases dramatically. At *256 partitions* the end to
>> end
>> > > *99th
>> > > > >>>> > latency
>> > > > >>>> > > is
>> > > > >>>> > > > still 390 - 418 ms.* Worse than the latency figures for
>> *8
>> > > > >>>> *partitions,
>> > > > >>>> > > but
>> > > > >>>> > > > not by much. When I increase this number to *512
>> partitions
>> > > *the
>> > > > >>>> end
>> > > > >>>> > > > to end *99th
>> > > > >>>> > > > latency *becomes an intolerable *19-24 seconds*. At
>> *1024*
>> > > > >>>> partitions
>> > > > >>>> > the
>> > > > >>>> > > > *99th
>> > > > >>>> > > > latency is at 25 - 30 seconds*.
>> > > > >>>> > > > A table of the numbers:
>> > > > >>>> > > >
>> > > > >>>> > > > Max number of partitions written to (out of 1024)
>> > > > >>>> > > >
>> > > > >>>> > > > End to end latency
>> > > > >>>> > > >
>> > > > >>>> > > > 8
>> > > > >>>> > > >
>> > > > >>>> > > > 300 - 350 ms
>> > > > >>>> > > >
>> > > > >>>> > > > 256
>> > > > >>>> > > >
>> > > > >>>> > > > 390 - 418 ms
>> > > > >>>> > > >
>> > > > >>>> > > > 512
>> > > > >>>> > > >
>> > > > >>>> > > > 19 - 24 seconds
>> > > > >>>> > > >
>> > > > >>>> > > > 1024
>> > > > >>>> > > >
>> > > > >>>> > > > 25 - 30 seconds
>> > > > >>>> > > >
>> > > > >>>> > > >
>> > > > >>>> > > > iii) Once I make the max number of partitions high
>> enough,
>> > > > >>>> reducing it
>> > > > >>>> > > > doesn't help. For eg: If I go up from *8* to *512
>> > *partitions,
>> > > > the
>> > > > >>>> > > latency
>> > > > >>>> > > > goes up. But while the producer is running if I go down
>> from
>> > > > >>>> *512* to
>> > > > >>>> > > > *8 *partitions,
>> > > > >>>> > > > it doesn't reduce the latency numbers. My guess is that
>> the
>> > > > >>>> producer is
>> > > > >>>> > > > creating some state lazily per partition and this state
>> is
>> > > > >>>> causing the
>> > > > >>>> > > > latency. Once this state is created, writing to fewer
>> > > partitions
>> > > > >>>> > doesn't
>> > > > >>>> > > > seem to help. Only a restart of the producer calms things
>> > > down.
>> > > > >>>> > > >
>> > > > >>>> > > > So my current plan is to reduce the number of partitions
>> on
>> > > the
>> > > > >>>> topic,
>> > > > >>>> > > but
>> > > > >>>> > > > there seems to be something deeper going on for the
>> latency
>> > > > >>>> numbers to
>> > > > >>>> > be
>> > > > >>>> > > > so poor to begin with and then suffer so much more (non
>> > > > linearly)
>> > > > >>>> with
>> > > > >>>> > > > additional partitions.
>> > > > >>>> > > >
>> > > > >>>> > > > Thanks!
>> > > > >>>> > > >
>> > > > >>>> > > > On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian <
>> > > > >>>> ra...@signalfuse.com>
>> > > > >>>> > > > wrote:
>> > > > >>>> > > > >
>> > > > >>>> > > > > I've done some more measurements. I've also started
>> > > measuring
>> > > > >>>> the
>> > > > >>>> > > latency
>> > > > >>>> > > > > between when I ask my producer to send a message and
>> when
>> > I
>> > > > get
>> > > > >>>> an
>> > > > >>>> > > > > acknowledgement via the callback. Here is my code:
>> > > > >>>> > > > >
>> > > > >>>> > > > > // This function is called on every producer once
>> every 30
>> > > > >>>> seconds.
>> > > > >>>> > > > >
>> > > > >>>> > > > > public void addLagMarkers(final Histogram enqueueLag) {
>> > > > >>>> > > > >
>> > > > >>>> > > > >         final int numberOfPartitions = 1024;
>> > > > >>>> > > > >
>> > > > >>>> > > > >         final long timeOfEnqueue =
>> > > System.currentTimeMillis();
>> > > > >>>> > > > >
>> > > > >>>> > > > >         final Callback callback = new Callback() {
>> > > > >>>> > > > >
>> > > > >>>> > > > >             @Override
>> > > > >>>> > > > >
>> > > > >>>> > > > >             public void onCompletion(RecordMetadata
>> > > metadata,
>> > > > >>>> > Exception
>> > > > >>>> > > > ex)
>> > > > >>>> > > > > {
>> > > > >>>> > > > >
>> > > > >>>> > > > >                 if (metadata != null) {
>> > > > >>>> > > > >
>> > > > >>>> > > > >                     // The difference between ack time
>> > from
>> > > > >>>> broker
>> > > > >>>> > and
>> > > > >>>> > > > > enqueue time.
>> > > > >>>> > > > >
>> > > > >>>> > > > >                     final long timeOfAck =
>> > > > >>>> > System.currentTimeMillis();
>> > > > >>>> > > > >
>> > > > >>>> > > > >                     final long lag = timeOfAck -
>> > > > timeOfEnqueue;
>> > > > >>>> > > > >
>> > > > >>>> > > > >                     enqueueLag.update(lag);
>> > > > >>>> > > > >
>> > > > >>>> > > > >                 }
>> > > > >>>> > > > >
>> > > > >>>> > > > >             }
>> > > > >>>> > > > >
>> > > > >>>> > > > >         };
>> > > > >>>> > > > >
>> > > > >>>> > > > >         for (int i = 0; i < numberOfPartitions; i++) {
>> > > > >>>> > > > >
>> > > > >>>> > > > >             try {
>> > > > >>>> > > > >
>> > > > >>>> > > > >                 byte[] value =
>> > > > >>>> LagMarker.serialize(timeOfEnqueue);
>> > > > >>>> > //
>> > > > >>>> > > 10
>> > > > >>>> > > > > bytes -> short version + long timestamp.
>> > > > >>>> > > > >
>> > > > >>>> > > > >                 // This message is later used by the
>> > > consumers
>> > > > >>>> to
>> > > > >>>> > > measure
>> > > > >>>> > > > > lag.
>> > > > >>>> > > > >
>> > > > >>>> > > > >                 ProducerRecord record = new
>> > > > >>>> ProducerRecord(MY_TOPIC,
>> > > > >>>> > i,
>> > > > >>>> > > > > null, value);
>> > > > >>>> > > > >
>> > > > >>>> > > > >                 kafkaProducer.send(record, callback);
>> > > > >>>> > > > >
>> > > > >>>> > > > >             } catch (Exception e) {
>> > > > >>>> > > > >
>> > > > >>>> > > > >                 // We just dropped a lag marker.
>> > > > >>>> > > > >
>> > > > >>>> > > > >             }
>> > > > >>>> > > > >
>> > > > >>>> > > > >         }
>> > > > >>>> > > > >
>> > > > >>>> > > > >     }
>> > > > >>>> > > > >
>> > > > >>>> > > > > The* 99th* on this lag is about* 350 - 400* ms. It's
>> not
>> > > > >>>> stellar, but
>> > > > >>>> > > > > doesn't account for the *20-30 second 99th* I see on
>> the
>> > end
>> > > > to
>> > > > >>>> end
>> > > > >>>> > > lag.
>> > > > >>>> > > > > I am consuming in a tight loop on the Consumers (using
>> the
>> > > > >>>> > > > SimpleConsumer)
>> > > > >>>> > > > > with minimal processing with a *99th fetch time *of
>> > > *130-140*
>> > > > >>>> ms, so
>> > > > >>>> > I
>> > > > >>>> > > > > don't think that should be a problem either. Completely
>> > > > baffled.
>> > > > >>>> > > > >
>> > > > >>>> > > > >
>> > > > >>>> > > > > Thanks!
>> > > > >>>> > > > >
>> > > > >>>> > > > >
>> > > > >>>> > > > >
>> > > > >>>> > > > > On Sat, Dec 20, 2014 at 5:51 PM, Rajiv Kurian <
>> > > > >>>> ra...@signalfuse.com>
>> > > > >>>> > > > > wrote:
>> > > > >>>> > > > >>
>> > > > >>>> > > > >>
>> > > > >>>> > > > >>
>> > > > >>>> > > > >> On Sat, Dec 20, 2014 at 3:49 PM, Rajiv Kurian <
>> > > > >>>> ra...@signalfuse.com
>> > > > >>>> > >
>> > > > >>>> > > > >> wrote:
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>> I am trying to replace a Thrift peer to peer API with
>> > > kafka
>> > > > >>>> for a
>> > > > >>>> > > > >>> particular work flow. I am finding the 99th
>> percentile
>> > > > >>>> latency to
>> > > > >>>> > be
>> > > > >>>> > > > >>> unacceptable at this time. This entire work load
>> runs in
>> > > an
>> > > > >>>> Amazon
>> > > > >>>> > > > VPC. I'd
>> > > > >>>> > > > >>> greatly appreciate it if some one has any insights on
>> > why
>> > > I
>> > > > am
>> > > > >>>> > seeing
>> > > > >>>> > > > such
>> > > > >>>> > > > >>> poor numbers. Here are some details and measurements
>> > > taken:
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>> i) I have a single topic with 1024 partitions that I
>> am
>> > > > >>>> writing to
>> > > > >>>> > > from
>> > > > >>>> > > > >>> six clients using the kafka 0.8.2 beta kafka
>> producer.
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>> ii) I have 3 brokers, each on  a c3 2x machine on
>> ec2.
>> > > Each
>> > > > of
>> > > > >>>> > those
>> > > > >>>> > > > >>> machines has 8 virtual cpus, 15 GB memory and 2 * 80
>> GB
>> > > > SSDs.
>> > > > >>>> The
>> > > > >>>> > > > broker ->
>> > > > >>>> > > > >>> partitions mapping was decided by Kafka when I
>> created
>> > the
>> > > > >>>> topic.
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>> iii) I write about 22 thousand messages per second
>> from
>> > > > >>>> across the
>> > > > >>>> > 6
>> > > > >>>> > > > >>> clients. This number was calculated using distributed
>> > > > >>>> counters. I
>> > > > >>>> > > just
>> > > > >>>> > > > >>> increment a distributed counter in the callback from
>> my
>> > > > >>>> enqueue job
>> > > > >>>> > > if
>> > > > >>>> > > > the
>> > > > >>>> > > > >>> metadata returned is not null. I also increment a
>> > number o
>> > > > >>>> dropped
>> > > > >>>> > > > messages
>> > > > >>>> > > > >>> counter if the callback has a non-null exception or
>> if
>> > > there
>> > > > >>>> was an
>> > > > >>>> > > > >>> exception in the synchronous send call. The number of
>> > > > dropped
>> > > > >>>> > > messages
>> > > > >>>> > > > is
>> > > > >>>> > > > >>> pretty much always zero. Out of the 6 clients 3 are
>> > > > >>>> responsible for
>> > > > >>>> > > > 95% of
>> > > > >>>> > > > >>> the traffic. Messages are very tiny and have null
>> keys
>> > > and
>> > > > >>>> 27 byte
>> > > > >>>> > > > values
>> > > > >>>> > > > >>> (2 byte version and 25 byte payload). Again these
>> > messages
>> > > > are
>> > > > >>>> > > written
>> > > > >>>> > > > >>> using the kafka 0.8.2 client.
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>> iv) I have 3 consumer processes consuming only from
>> this
>> > > > >>>> topic.
>> > > > >>>> > Each
>> > > > >>>> > > > >>> consumer process is assigned a disjoint set of the
>> 1024
>> > > > >>>> partitions
>> > > > >>>> > by
>> > > > >>>> > > > an
>> > > > >>>> > > > >>> eternal arbiter. Each consumer process then creates a
>> > > > mapping
>> > > > >>>> from
>> > > > >>>> > > > brokers
>> > > > >>>> > > > >>> -> partitions it has been assigned. It then starts
>> one
>> > > > fetcher
>> > > > >>>> > thread
>> > > > >>>> > > > per
>> > > > >>>> > > > >>> broker. Each thread queries the broker (using the
>> > > > >>>> SimpleConsumer)
>> > > > >>>> > it
>> > > > >>>> > > > has
>> > > > >>>> > > > >>> been assigned for partitions such that partitions =
>> > > > >>>> (partitions on
>> > > > >>>> > > > broker)
>> > > > >>>> > > > >>> *∩ *(partitions assigned to the process by the
>> arbiter).
>> > > So
>> > > > in
>> > > > >>>> > effect
>> > > > >>>> > > > >>> there are 9 consumer threads across 3 consumer
>> processes
>> > > > that
>> > > > >>>> > query a
>> > > > >>>> > > > >>> disjoint set of partitions for the single topic and
>> > > amongst
>> > > > >>>> > > themselves
>> > > > >>>> > > > they
>> > > > >>>> > > > >>> manage to consume from every topic. So each thread
>> has a
>> > > run
>> > > > >>>> loop
>> > > > >>>> > > > where it
>> > > > >>>> > > > >>> asks for messages, consumes them then asks for more
>> > > > messages.
>> > > > >>>> When
>> > > > >>>> > a
>> > > > >>>> > > > run
>> > > > >>>> > > > >>> loop starts, it starts by querying for the latest
>> > message
>> > > > in a
>> > > > >>>> > > > partition
>> > > > >>>> > > > >>> (i.e. discards any previous backup) and then
>> maintains a
>> > > map
>> > > > >>>> of
>> > > > >>>> > > > partition
>> > > > >>>> > > > >>> -> nextOffsetToRequest in memory to make sure that it
>> > > > consumes
>> > > > >>>> > > > messages in
>> > > > >>>> > > > >>> order.
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >> Edit: Mean't external arbiter.
>> > > > >>>> > > > >>
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>> v) Consumption is really simple. Each message is put
>> on
>> > a
>> > > > non
>> > > > >>>> > > blocking
>> > > > >>>> > > > >>> efficient ring buffer. If the ring buffer is full the
>> > > > message
>> > > > >>>> is
>> > > > >>>> > > > dropped. I
>> > > > >>>> > > > >>> measure the mean time between fetches and it is the
>> same
>> > > as
>> > > > >>>> the
>> > > > >>>> > time
>> > > > >>>> > > > for
>> > > > >>>> > > > >>> fetches up to a ms, meaning no matter how many
>> messages
>> > > are
>> > > > >>>> > dequeued,
>> > > > >>>> > > > it
>> > > > >>>> > > > >>> takes almost no time to process them. At the end of
>> > > > >>>> processing a
>> > > > >>>> > > fetch
>> > > > >>>> > > > >>> request I increment another distributed counter that
>> > > counts
>> > > > >>>> the
>> > > > >>>> > > number
>> > > > >>>> > > > of
>> > > > >>>> > > > >>> messages processed. This counter tells me that on
>> > average
>> > > I
>> > > > am
>> > > > >>>> > > > consuming
>> > > > >>>> > > > >>> the same number of messages/sec that I enqueue on the
>> > > > >>>> producers
>> > > > >>>> > i.e.
>> > > > >>>> > > > around
>> > > > >>>> > > > >>> 22 thousand messages/sec.
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>> vi) The 99th percentile of the number of messages
>> > fetched
>> > > > per
>> > > > >>>> fetch
>> > > > >>>> > > > >>> request is about 500 messages.  The 99th percentile
>> of
>> > the
>> > > > >>>> time it
>> > > > >>>> > > > takes to
>> > > > >>>> > > > >>> fetch a batch is abut 130 - 140 ms. I played around
>> with
>> > > the
>> > > > >>>> buffer
>> > > > >>>> > > and
>> > > > >>>> > > > >>> maxWait settings on the SimpleConsumer and
>> attempting to
>> > > > >>>> consume
>> > > > >>>> > more
>> > > > >>>> > > > >>> messages was leading the 99th percentile of the fetch
>> > time
>> > > > to
>> > > > >>>> > balloon
>> > > > >>>> > > > up,
>> > > > >>>> > > > >>> so I am consuming in smaller batches right now.
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>> vii) Every 30 seconds odd each of the producers
>> inserts
>> > a
>> > > > >>>> trace
>> > > > >>>> > > message
>> > > > >>>> > > > >>> into each partition (so 1024 messages per producer
>> every
>> > > 30
>> > > > >>>> > seconds).
>> > > > >>>> > > > Each
>> > > > >>>> > > > >>> message only contains the System.currentTimeMillis()
>> at
>> > > the
>> > > > >>>> time of
>> > > > >>>> > > > >>> enqueueing. It is about 9 bytes long. The consumer in
>> > step
>> > > > (v)
>> > > > >>>> > always
>> > > > >>>> > > > >>> checks to see if a message it dequeued is of this
>> trace
>> > > type
>> > > > >>>> > (instead
>> > > > >>>> > > > of
>> > > > >>>> > > > >>> the regular message type). This is a simple check on
>> the
>> > > > >>>> first 2
>> > > > >>>> > byte
>> > > > >>>> > > > >>> version that each message buffer contains. If it is a
>> > > trace
>> > > > >>>> message
>> > > > >>>> > > it
>> > > > >>>> > > > >>> reads it's payload as a long and uses the difference
>> > > between
>> > > > >>>> the
>> > > > >>>> > > system
>> > > > >>>> > > > >>> time on the consumer and this payload and updates a
>> > > > histogram
>> > > > >>>> with
>> > > > >>>> > > this
>> > > > >>>> > > > >>> difference value. Ignoring NTP skew (which I've
>> measured
>> > > to
>> > > > >>>> be in
>> > > > >>>> > the
>> > > > >>>> > > > order
>> > > > >>>> > > > >>> of milliseconds) this is the lag between when a
>> message
>> > > was
>> > > > >>>> > enqueued
>> > > > >>>> > > > on the
>> > > > >>>> > > > >>> producer and when it was read on the consumer.
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >> Edit: Trace messages are 10 bytes long - 2byte
>> version +
>> > 8
>> > > > >>>> byte long
>> > > > >>>> > > for
>> > > > >>>> > > > >> timestamp.
>> > > > >>>> > > > >>
>> > > > >>>> > > > >> So pseudocode for every consumer thread (one per
>> broker
>> > per
>> > > > >>>> consumer
>> > > > >>>> > > > >>> process (9 in total across 3 consumers) is:
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>> void run() {
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>> while (running) {
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>     FetchRequest fetchRequest = buildFetchRequest(
>> > > > >>>> > > > >>> partitionsAssignedToThisProcessThatFallOnThisBroker);
>> > //
>> > > > This
>> > > > >>>> > > > >>> assignment is done by an external arbiter.
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>     measureTimeBetweenFetchRequests();   // The 99th
>> of
>> > > this
>> > > > >>>> is
>> > > > >>>> > > > 130-140ms
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>     FetchResponse fetchResponse =
>> > > > >>>> fetchResponse(fetchRequest);  //
>> > > > >>>> > > The
>> > > > >>>> > > > >>> 99th of this is 130-140ms, which is the same as the
>> time
>> > > > >>>> between
>> > > > >>>> > > > fetches.
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>     processData(fetchResponse);  // The 99th on this
>> is
>> > > 2-3
>> > > > >>>> ms.
>> > > > >>>> > > > >>>   }
>> > > > >>>> > > > >>> }
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>> void processData(FetchResponse response) {
>> > > > >>>> > > > >>>   try (Timer.Context _ =
>> processWorkerDataTimer.time() {
>> > > //
>> > > > >>>> The
>> > > > >>>> > 99th
>> > > > >>>> > > > on
>> > > > >>>> > > > >>> this is 2-3 ms.
>> > > > >>>> > > > >>>     for (Message message : response) {
>> > > > >>>> > > > >>>        if (typeOf(message) == NORMAL_DATA) {
>> > > > >>>> > > > >>>          enqueueOnNonBlockingRingBuffer(message);  //
>> > > Never
>> > > > >>>> blocks,
>> > > > >>>> > > > >>> drops message if ring buffer is full.
>> > > > >>>> > > > >>>        } else if (typeOf(message) == TRACE_DATA) {
>> > > > >>>> > > > >>>          long timeOfEnqueue = geEnqueueTime(message);
>> > > > >>>> > > > >>>          long currentTime =
>> System.currentTimeMillis();
>> > > > >>>> > > > >>>          long difference = currentTime -
>> timeOfEnqueue;
>> > > > >>>> > > > >>>          lagHistogram.update(difference);
>> > > > >>>> > > > >>>        }
>> > > > >>>> > > > >>>      }
>> > > > >>>> > > > >>>    }
>> > > > >>>> > > > >>> }
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>> viii) So the kicker is that this lag is really really
>> > > high:
>> > > > >>>> > > > >>> Median Lag: *200 ms*. This already seems pretty high
>> > and I
>> > > > >>>> could
>> > > > >>>> > even
>> > > > >>>> > > > >>> discount some of it through NTP skew.
>> > > > >>>> > > > >>> Mean Lag: *2 - 4 seconds*! Also notice how mean is
>> > bigger
>> > > > than
>> > > > >>>> > median
>> > > > >>>> > > > >>> kind of telling us how the distribution has a big
>> tail.
>> > > > >>>> > > > >>> 99th Lag: *20 seconds*!!
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>> I can't quite figure out the source of the lag. Here
>> are
>> > > > some
>> > > > >>>> other
>> > > > >>>> > > > >>> things of note:
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>> 1) The brokers in general are at about 40-50% CPU
>> but I
>> > > see
>> > > > >>>> > > occasional
>> > > > >>>> > > > >>> spikes to more than 80% - 95%. This could probably be
>> > > > causing
>> > > > >>>> > issues.
>> > > > >>>> > > > I am
>> > > > >>>> > > > >>> wondering if this is down to the high number of
>> > > partitions I
>> > > > >>>> have
>> > > > >>>> > and
>> > > > >>>> > > > how
>> > > > >>>> > > > >>> each partition ends up receiving not too many
>> messages.
>> > > 22k
>> > > > >>>> > messages
>> > > > >>>> > > > spread
>> > > > >>>> > > > >>> across 1024 partitions = only 21 odd
>> > > messages/sec/partition.
>> > > > >>>> But
>> > > > >>>> > each
>> > > > >>>> > > > >>> broker should be receiving about  7500 messages/sec.
>> It
>> > > > could
>> > > > >>>> also
>> > > > >>>> > be
>> > > > >>>> > > > down
>> > > > >>>> > > > >>> to the nature of these messages being tiny. I imagine
>> > that
>> > > > the
>> > > > >>>> > kafka
>> > > > >>>> > > > wire
>> > > > >>>> > > > >>> protocol is probably close to the 25 byte message
>> > payload.
>> > > > It
>> > > > >>>> also
>> > > > >>>> > > has
>> > > > >>>> > > > to
>> > > > >>>> > > > >>> do a few  CRC32 calculations etc. But again given
>> that
>> > > these
>> > > > >>>> are
>> > > > >>>> > > C3.2x
>> > > > >>>> > > > >>> machines and the number of messages is so tiny I'd
>> > expect
>> > > it
>> > > > >>>> to do
>> > > > >>>> > > > better.
>> > > > >>>> > > > >>> But again reading this article
>> > > > >>>> > > > >>> <
>> > > > >>>> > > >
>> > > > >>>> > >
>> > > > >>>> >
>> > > > >>>>
>> > > >
>> > >
>> >
>> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
>> > > > >>>> > > >
>> > > > >>>> > > > about
>> > > > >>>> > > > >>> 2 million messages/sec across three brokers makes it
>> > seem
>> > > > >>>> like 22k
>> > > > >>>> > 25
>> > > > >>>> > > > byte
>> > > > >>>> > > > >>> messages should be very easy. So my biggest suspicion
>> > > right
>> > > > >>>> now is
>> > > > >>>> > > > that the
>> > > > >>>> > > > >>> large number of partitions is some how responsible
>> for
>> > > this
>> > > > >>>> > latency.
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>> 2) On the consumer given that the 99th time between
>> > > fetches
>> > > > >>>> == 99th
>> > > > >>>> > > > time
>> > > > >>>> > > > >>> of a fetch, I don't see how I could do any better.
>> It's
>> > > > pretty
>> > > > >>>> > close
>> > > > >>>> > > to
>> > > > >>>> > > > >>> just getting batches of data, iterating through them
>> and
>> > > > >>>> dropping
>> > > > >>>> > > them
>> > > > >>>> > > > on
>> > > > >>>> > > > >>> the floor. My requests on the consumer are built like
>> > > this:
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         FetchRequestBuilder builder = new
>> > > > >>>> > > > >>> FetchRequestBuilder().clientId(clientName)
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>                 .maxWait(100).minBytes(1000);  // max
>> > wait
>> > > > of
>> > > > >>>> 100
>> > > > >>>> > ms
>> > > > >>>> > > or
>> > > > >>>> > > > >>> at least 1000 bytes whichever happens first.
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         // Add a fetch request for every partition.
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         for (PartitionMetadata partition :
>> partitions) {
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>             int partitionId =
>> partition.partitionId();
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>             long offset = readOffsets.get(partition);
>> > //
>> > > > >>>> This map
>> > > > >>>> > > > >>> maintains the next partition to be read.
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>             builder.addFetch(MY_TOPIC, partitionId,
>> > > offset,
>> > > > >>>> 3000);
>> > > > >>>> > > //
>> > > > >>>> > > > >>> Up to 3000 bytes per fetch request.}
>> > > > >>>> > > > >>>         }
>> > > > >>>> > > > >>> 3) On the producer side I use the kafka beta
>> producer.
>> > > I've
>> > > > >>>> played
>> > > > >>>> > > > >>> around with many settings but none of them seem to
>> have
>> > > > made a
>> > > > >>>> > > > difference.
>> > > > >>>> > > > >>> Here are my current settings:
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>> ProducerConfig values:
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>        block.on.buffer.full = false
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         retry.backoff.ms = 100
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         buffer.memory = 83886080  // Kind of big -
>> could
>> > > it
>> > > > be
>> > > > >>>> > adding
>> > > > >>>> > > > >>> lag?
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         batch.size = 40500  // This is also kind of
>> big
>> > -
>> > > > >>>> could it
>> > > > >>>> > be
>> > > > >>>> > > > >>> adding lag?
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         metrics.sample.window.ms = 30000
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         metadata.max.age.ms = 300000
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         receive.buffer.bytes = 32768
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         timeout.ms = 30000
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         max.in.flight.requests.per.connection = 5
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         metric.reporters = []
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         bootstrap.servers = [broker1, broker2,
>> broker3]
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         client.id =
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         compression.type = none
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         retries = 0
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         max.request.size = 1048576
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         send.buffer.bytes = 131072
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         acks = 1
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         reconnect.backoff.ms = 10
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         linger.ms = 250  // Based on this config I'd
>> > > > imagine
>> > > > >>>> that
>> > > > >>>> > > the
>> > > > >>>> > > > >>> lag should be bounded to about 250 ms over all.
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         metrics.num.samples = 2
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>         metadata.fetch.timeout.ms = 60000
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>> I know this is a lot of information. I just wanted to
>> > > > provide
>> > > > >>>> as
>> > > > >>>> > much
>> > > > >>>> > > > >>> context as possible.
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>> Thanks in advance!
>> > > > >>>> > > > >>>
>> > > > >>>> > > > >>>
>> > > > >>>> > > >
>> > > > >>>> > >
>> > > > >>>> >
>> > > > >>>>
>> > > > >>>
>> > > > >>>
>> > > > >>
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to