When a message is exposed to the consumer

2015-09-04 Thread Yuheng Du
According to the section 3.1 of the paper "Kafka: a Distributed Messaging
System for Log Processing":

"a message is only exposed to the consumers after it is flushed"?

Is it still true in the current kafka? like the message can only be
available after it is flushed to disk?

Thanks.


Re: latency test

2015-09-04 Thread Yuheng Du
When I using 32 partitions, the 4 brokers latency becomes larger than the 8
brokers latency.

So is it always true that using more brokers can give less latency when the
number of partitions is at least the size of the brokers?

Thanks.

On Thu, Sep 3, 2015 at 10:45 PM, Yuheng Du  wrote:

> I am running a producer latency test. When using 92 producers in 92
> physical node publishing to 4 brokers, the latency is slightly lower than
> using 8 brokers, I am using 8 partitions for the topic.
>
> I have rerun the test and it gives me the same result, the 4 brokers
> scenario still has lower latency than the 8 brokers scenarios.
>
> It is weird because I tested 1broker, 2 brokers, 4 brokers, 8 brokers, 16
> brokers and 32 brokers. For the rest of the case the latency decreases as
> the number of brokers increase.
>
> 4 brokers/8 brokers is the only pair that doesn't satisfy this rule. What
> could be the cause?
>
> I am using a 200 bytes message, the test let each producer publishes 500k
> messages to a given topic. Every test run when I change the number of
> brokers, I use a new topic.
>
> Thanks for any advices.
>


Re: latency test

2015-09-04 Thread Helleren, Erik
WellŠ not to be contrarian, but latency depends much more on the latency
between the producer and the broker that is the leader for the partition
you are publishing to.  At least when your brokers are not saturated with
messages, and acks are set to 1.  If acks are set to ALL, latency on an
non-saturated kafka cluster will be: Round Trip Latency from producer to
leader for partition + Max( slowest Round Trip Latency to a replicas of
that partition).  If a cluster is saturated with messages, we have to
assume that all partitions receive an equal distribution of messages to
avoid linear algebra and queueing theory models.  I don¹t like linear
algebra :P  

Since you are probably putting all your latencies into a single histogram
per producer, or worse, just an average, this pattern would have been
obscured.  Obligatory lecture about measuring latency by Gil Tene
(https://www.youtube.com/watch?v=9MKY4KypBzg).  To verify this hypothesis,
you should re-write the benchmark to plot the latencies for each write to
a partition for each producer into a histogram. (HRD histogram is pretty
good for that).  This would give you producers*partitions histograms,
which might be unwieldy for that many producers. But wait, there is hope!

To verify that this hypothesis holds, you just have to see that there is a
significant difference between different partitions on a SINGLE producing
client. So, pick one producing client at random and use the data from
that. The easy way to do that is just plot all the partition latency
histograms on top of each other in the same plot, that way you have a
pretty plot to show people.  If you don¹t want to setup plotting, you can
just compare the medians (50¹th percentile) of the partitions¹ histograms.
 If there is a lot of variance, your latency anomaly is explained by
brokers 4-7 being slower than nodes 0-3!  If there isn¹t a lot of variance
at 50%, look at higher percentiles.  And if higher percentiles for all the
partitions look the same, this hypothesis is disproved.

If you want to make a general statement about latency of writing to kafka,
you can merge all the histograms into a single histogram and plot that.

To Yuheng¹s credit, more brokers always results in more throughput. But
throughput and latency are two different creatures.  Its worth noting that
kafka is designed to be high throughput first and low latency second.  And
it does a really good job at both.

Disclaimer: I might not like linear algebra, but I do like statistics.
Let me know if there are topics that need more explanation above that
aren¹t covered by Gil¹s lecture.
-Erik

On 9/4/15, 9:03 AM, "Yuheng Du"  wrote:

>When I using 32 partitions, the 4 brokers latency becomes larger than the
>8
>brokers latency.
>
>So is it always true that using more brokers can give less latency when
>the
>number of partitions is at least the size of the brokers?
>
>Thanks.
>
>On Thu, Sep 3, 2015 at 10:45 PM, Yuheng Du 
>wrote:
>
>> I am running a producer latency test. When using 92 producers in 92
>> physical node publishing to 4 brokers, the latency is slightly lower
>>than
>> using 8 brokers, I am using 8 partitions for the topic.
>>
>> I have rerun the test and it gives me the same result, the 4 brokers
>> scenario still has lower latency than the 8 brokers scenarios.
>>
>> It is weird because I tested 1broker, 2 brokers, 4 brokers, 8 brokers,
>>16
>> brokers and 32 brokers. For the rest of the case the latency decreases
>>as
>> the number of brokers increase.
>>
>> 4 brokers/8 brokers is the only pair that doesn't satisfy this rule.
>>What
>> could be the cause?
>>
>> I am using a 200 bytes message, the test let each producer publishes
>>500k
>> messages to a given topic. Every test run when I change the number of
>> brokers, I use a new topic.
>>
>> Thanks for any advices.
>>



Re: Slow ISR catch-up

2015-09-04 Thread Prabhjot Bharaj
Hi,

I am experiencing super slow throughput when using acks=-1
Some further progress in continuation to the test in my previous email:-

*Topic details -*

Topic:temp PartitionCount:1 ReplicationFactor:3 Configs:

Topic: temp Partition: 0 Leader: 5 Replicas: 5,1,2 Isr: 5,2,1
*This is the command I'm running - *

kafka-producer-perf-test.sh --broker-list 96.7.250.122:9092,
96.17.183.53:9092,96.17.183.54:9092,96.7.250.117:9092,96.7.250.118:9092
--messages 10 --message-size 500 --topics temp --show-detailed-stats
--threads 30 --request-num-acks -1 --batch-size 1000 --request-timeout-ms
1

*Server.properties:-*

broker.id=0

port=9092

*num.network.threads=6*

num.io.threads=8

*socket.send.buffer.bytes=10485760*

*socket.receive.buffer.bytes=10485760*

socket.request.max.bytes=104857600

log.dirs=/tmp/kafka-logs

num.partitions=1

num.recovery.threads.per.data.dir=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=30

log.cleaner.enable=false

zookeeper.connect=localhost:2181

zookeeper.connection.timeout.ms=6000

*num.replica.fetchers=6*
*Observation:-*

I have also noticed that if I use acks=1 (without --sync) and immediately
use acks=-1 (without --sync), the test completes very quickly. Also, after
running  this, if I describe the topic, it is still not in sync, which
means acks=-1 is treated as 1 only

Also, when running on a freshly created topic with just acks=-1, it takes 8
minutes to complete with --sync

time kafka-producer-perf-test.sh --broker-list 96.7.250.122:9092,
96.17.183.53:9092,96.17.183.54:9092,96.7.250.117:9092,96.7.250.118:9092
--messages 10 --message-size 500 --topics temp --show-detailed-stats
--threads 30 --request-num-acks -1 --batch-size 1000 --request-timeout-ms
1 --compression-codec 2 --sync

start.time, end.time, compression, message.size, batch.size,
total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec

2015-09-04 12:19:36:223, 2015-09-04 12:27:41:775, 2, 500, 1000, 47.68,
0.0982, 0, 205.9306

real 8m6.563s

user 0m19.787s

sys 0m5.601s

If I use --sync, it is taking way longer.

Where am I doing wrong?

Thanks,
Prabhjot

On Fri, Sep 4, 2015 at 1:45 AM, Gwen Shapira  wrote:

> Yes, this should work. Expect lower throughput though.
>
> On Thu, Sep 3, 2015 at 12:52 PM, Prabhjot Bharaj 
> wrote:
>
> > Hi,
> >
> > Can I use sync for acks = -1?
> >
> > Regards,
> > Prabhjot
> > On Sep 3, 2015 11:49 PM, "Gwen Shapira"  wrote:
> >
> > > The test uses the old producer (we should fix that), and since you
> don't
> > > specify --sync, it runs async.
> > > The old async producer simply sends data and doesn't wait for acks, so
> it
> > > is possible that the messages were never acked...
> > >
> > > On Thu, Sep 3, 2015 at 7:56 AM, Prabhjot Bharaj  >
> > > wrote:
> > >
> > > > Hi Folks,
> > > >
> > > > Request your expertise on my doubt here.
> > > >
> > > > *My setup:-*
> > > >
> > > > 5 node kafka cluster (4 cores, 8GB RAM) on RAID-6 (500 GB)
> > > > Using Kafka 0.8.2.1 with modified ProducerPerformance.scala
> > > > I've modified ProducerPerformance.scala to send custom ASCII data,
> > > instead
> > > > of Byte Array of Zeroes
> > > >
> > > > *server.properties:-*
> > > >
> > > > broker.id=0
> > > >
> > > > log.cleaner.enable=false
> > > >
> > > > log.dirs=/tmp/kafka-logs
> > > >
> > > > log.retention.check.interval.ms=30
> > > >
> > > > log.retention.hours=168
> > > >
> > > > log.segment.bytes=1073741824
> > > >
> > > > num.io.threads=8
> > > >
> > > > num.network.threads=3
> > > >
> > > > num.partitions=1
> > > >
> > > > num.recovery.threads.per.data.dir=1
> > > >
> > > > *num.replica.fetchers=4*
> > > >
> > > > port=9092
> > > >
> > > > socket.receive.buffer.bytes=1048576
> > > >
> > > > socket.request.max.bytes=104857600
> > > >
> > > > socket.send.buffer.bytes=1048576
> > > >
> > > > zookeeper.connect=localhost:2181
> > > >
> > > > zookeeper.connection.timeout.ms=6000
> > > >
> > > >
> > > > *This is how I run the producer perf test:-*
> > > >
> > > > kafka-producer-perf-test.sh --broker-list
> > > > a.a.a.a:9092,b.b.b.b:9092,c.c.c.c:9092,d.d.d.d:9092,e.e.e.e:9092
> > > --messages
> > > > 10 --message-size 500 --topics temp --show-detailed-stats
> > --threads
> > > 5
> > > > --request-num-acks -1 --batch-size 200 --request-timeout-ms 1
> > > > --compression-codec 0
> > > >
> > > > *Problem:-*
> > > >
> > > > This test completes in under 15 seconds for me
> > > >
> > > > But, after this test, if I try writing to another topic which has 2
> > > > partitions and 3 replicas, it is dead slow and the same script seems
> > > never
> > > > to finish because the slow ISR catch-up is still going on.
> > > >
> > > > *My inference:-*
> > > > I have noticed that for a topic with 1 partition and 3 replicas, the
> > ISR
> > > > shows only 1 broker id.
> > > >
> > > > Topic:temp PartitionCount:1 ReplicationFactor:3 Configs:
> > > 

Amount of partitions

2015-09-04 Thread Jörg Wagner

Hello!

Regarding the recommended amount of partitions I am a bit confused. 
Basically I got the impression that it's better to have lots of 
partitions (see information from linkedin etc). On the other hand, a lot 
of performance benchmarks floating around show only a few partitions are 
being used.


Especially when considering the difference between hdd and ssds and also 
the amount thereof, what is the way to go?


In my case, I seem to have the best stability and performance issues 
with few partitions *per hdd*, and only one io thread per disk.


What are your experiences and recommendations?

Cheers
Jörg


Re: latency test

2015-09-04 Thread Yuheng Du
Thank you Erik! That's is helpful!

But also I see jitters of the maximum latencies when running the
experiment.

The average end to acknowledgement latency from producer to broker is
around 5ms when using 92 producers and 4 brokers, and the 99.9 percentile
latency is 58ms, but the maximum latency goes up to 1359 ms. How to locate
the source of this jitter?

Thanks.

On Fri, Sep 4, 2015 at 10:54 AM, Helleren, Erik 
wrote:

> WellŠ not to be contrarian, but latency depends much more on the latency
> between the producer and the broker that is the leader for the partition
> you are publishing to.  At least when your brokers are not saturated with
> messages, and acks are set to 1.  If acks are set to ALL, latency on an
> non-saturated kafka cluster will be: Round Trip Latency from producer to
> leader for partition + Max( slowest Round Trip Latency to a replicas of
> that partition).  If a cluster is saturated with messages, we have to
> assume that all partitions receive an equal distribution of messages to
> avoid linear algebra and queueing theory models.  I don¹t like linear
> algebra :P
>
> Since you are probably putting all your latencies into a single histogram
> per producer, or worse, just an average, this pattern would have been
> obscured.  Obligatory lecture about measuring latency by Gil Tene
> (https://www.youtube.com/watch?v=9MKY4KypBzg).  To verify this hypothesis,
> you should re-write the benchmark to plot the latencies for each write to
> a partition for each producer into a histogram. (HRD histogram is pretty
> good for that).  This would give you producers*partitions histograms,
> which might be unwieldy for that many producers. But wait, there is hope!
>
> To verify that this hypothesis holds, you just have to see that there is a
> significant difference between different partitions on a SINGLE producing
> client. So, pick one producing client at random and use the data from
> that. The easy way to do that is just plot all the partition latency
> histograms on top of each other in the same plot, that way you have a
> pretty plot to show people.  If you don¹t want to setup plotting, you can
> just compare the medians (50¹th percentile) of the partitions¹ histograms.
>  If there is a lot of variance, your latency anomaly is explained by
> brokers 4-7 being slower than nodes 0-3!  If there isn¹t a lot of variance
> at 50%, look at higher percentiles.  And if higher percentiles for all the
> partitions look the same, this hypothesis is disproved.
>
> If you want to make a general statement about latency of writing to kafka,
> you can merge all the histograms into a single histogram and plot that.
>
> To Yuheng¹s credit, more brokers always results in more throughput. But
> throughput and latency are two different creatures.  Its worth noting that
> kafka is designed to be high throughput first and low latency second.  And
> it does a really good job at both.
>
> Disclaimer: I might not like linear algebra, but I do like statistics.
> Let me know if there are topics that need more explanation above that
> aren¹t covered by Gil¹s lecture.
> -Erik
>
> On 9/4/15, 9:03 AM, "Yuheng Du"  wrote:
>
> >When I using 32 partitions, the 4 brokers latency becomes larger than the
> >8
> >brokers latency.
> >
> >So is it always true that using more brokers can give less latency when
> >the
> >number of partitions is at least the size of the brokers?
> >
> >Thanks.
> >
> >On Thu, Sep 3, 2015 at 10:45 PM, Yuheng Du 
> >wrote:
> >
> >> I am running a producer latency test. When using 92 producers in 92
> >> physical node publishing to 4 brokers, the latency is slightly lower
> >>than
> >> using 8 brokers, I am using 8 partitions for the topic.
> >>
> >> I have rerun the test and it gives me the same result, the 4 brokers
> >> scenario still has lower latency than the 8 brokers scenarios.
> >>
> >> It is weird because I tested 1broker, 2 brokers, 4 brokers, 8 brokers,
> >>16
> >> brokers and 32 brokers. For the rest of the case the latency decreases
> >>as
> >> the number of brokers increase.
> >>
> >> 4 brokers/8 brokers is the only pair that doesn't satisfy this rule.
> >>What
> >> could be the cause?
> >>
> >> I am using a 200 bytes message, the test let each producer publishes
> >>500k
> >> messages to a given topic. Every test run when I change the number of
> >> brokers, I use a new topic.
> >>
> >> Thanks for any advices.
> >>
>
>


Re: When a message is exposed to the consumer

2015-09-04 Thread Yuheng Du
Can't read it. Sorry

On Fri, Sep 4, 2015 at 12:08 PM, Roman Shramkov 
wrote:

> Её ай н Анны уйг
>
> sent from a mobile device, please excuse brevity and typos
>
>
> Пользователь Yuheng Du написал 
>
> According to the section 3.1 of the paper "Kafka: a Distributed Messaging
> System for Log Processing":
>
> "a message is only exposed to the consumers after it is flushed"?
>
> Is it still true in the current kafka? like the message can only be
> available after it is flushed to disk?
>
> Thanks.
>


Re: Amount of partitions

2015-09-04 Thread tao xiao
Here is a good doc to describe how to choose the right number of partitions

http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/

On Fri, Sep 4, 2015 at 10:08 PM, Jörg Wagner  wrote:

> Hello!
>
> Regarding the recommended amount of partitions I am a bit confused.
> Basically I got the impression that it's better to have lots of partitions
> (see information from linkedin etc). On the other hand, a lot of
> performance benchmarks floating around show only a few partitions are being
> used.
>
> Especially when considering the difference between hdd and ssds and also
> the amount thereof, what is the way to go?
>
> In my case, I seem to have the best stability and performance issues with
> few partitions *per hdd*, and only one io thread per disk.
>
> What are your experiences and recommendations?
>
> Cheers
> Jörg
>



-- 
Regards,
Tao


Re: latency test

2015-09-04 Thread Yuheng Du
Thanks for your reply Erik. I am running some more tests according to your
suggestions now and I will share with my results here. Is it necessary to
use a fixed number of partitions (32 partitions maybe) for my test?

I am testing 2, 4, 8, 16 and 32 brokers scenarios, all of them are running
on individual physical nodes. So I think using at least 32 partitions will
make more sense? I have seen latencies increase as the number of partitions
goes up in my experiments.

To get the latency of each event data recorded, are you suggesting that I
rewrite my own test program (in Java perhaps) or I can just modify the
standard test program provided by kafka (
https://gist.github.com/jkreps/c7ddb4041ef62a900e6c )? I guess I need to
rebuild the source if I modify the standard java test program
ProducerPerformance provided in kafka, right? Now this standard program
only has average latencies and percentile latencies but no per event
latencies.

Thanks.

On Fri, Sep 4, 2015 at 1:42 PM, Helleren, Erik 
wrote:

> That is an excellent question!  There are a bunch of ways to monitor
> jitter and see when that is happening.  Here are a few:
>
> - You could slice the histogram every few seconds, save it out with a
> timestamp, and then look at how they compare.  This would be mostly
> manual, or you can graph line charts of the percentiles over time in excel
> where each percentile would be a series.  If you are using HDR Histogram,
> you should look at how to use the Recorder class to do this coupled with a
> ScheduledExecutorService.
>
> - You can just save the starting timestamp of the event and the latency of
> each event.  If you put it into a CSV, you can just load it up into excel
> and graph as a XY chart.  That way you can see every point during the
> running of your program and you can see trends.  You want to be careful
> about this one, especially of writing to a file in the callback that kfaka
> provides.
>
> Also, I have noticed that most of the very slow observations are at
> startup.  But don’t trust me, trust the data and share your findings.
> Also, having a 99.9 percentile provides a pretty good standard for typical
> poor case performance.  Average is borderline useless, 50%’ile is a better
> typical case because that’s the number that says “half of events will be
> this slow or faster”, or for values that are high like 99.9%’ile, “0.1% of
> all events will be slower than this”.
> -Erik
>
> On 9/4/15, 12:05 PM, "Yuheng Du"  wrote:
>
> >Thank you Erik! That's is helpful!
> >
> >But also I see jitters of the maximum latencies when running the
> >experiment.
> >
> >The average end to acknowledgement latency from producer to broker is
> >around 5ms when using 92 producers and 4 brokers, and the 99.9 percentile
> >latency is 58ms, but the maximum latency goes up to 1359 ms. How to locate
> >the source of this jitter?
> >
> >Thanks.
> >
> >On Fri, Sep 4, 2015 at 10:54 AM, Helleren, Erik
> >
> >wrote:
> >
> >> WellŠ not to be contrarian, but latency depends much more on the latency
> >> between the producer and the broker that is the leader for the partition
> >> you are publishing to.  At least when your brokers are not saturated
> >>with
> >> messages, and acks are set to 1.  If acks are set to ALL, latency on an
> >> non-saturated kafka cluster will be: Round Trip Latency from producer to
> >> leader for partition + Max( slowest Round Trip Latency to a replicas of
> >> that partition).  If a cluster is saturated with messages, we have to
> >> assume that all partitions receive an equal distribution of messages to
> >> avoid linear algebra and queueing theory models.  I don¹t like linear
> >> algebra :P
> >>
> >> Since you are probably putting all your latencies into a single
> >>histogram
> >> per producer, or worse, just an average, this pattern would have been
> >> obscured.  Obligatory lecture about measuring latency by Gil Tene
> >> (https://www.youtube.com/watch?v=9MKY4KypBzg).  To verify this
> >>hypothesis,
> >> you should re-write the benchmark to plot the latencies for each write
> >>to
> >> a partition for each producer into a histogram. (HRD histogram is pretty
> >> good for that).  This would give you producers*partitions histograms,
> >> which might be unwieldy for that many producers. But wait, there is
> >>hope!
> >>
> >> To verify that this hypothesis holds, you just have to see that there
> >>is a
> >> significant difference between different partitions on a SINGLE
> >>producing
> >> client. So, pick one producing client at random and use the data from
> >> that. The easy way to do that is just plot all the partition latency
> >> histograms on top of each other in the same plot, that way you have a
> >> pretty plot to show people.  If you don¹t want to setup plotting, you
> >>can
> >> just compare the medians (50¹th percentile) of the partitions¹
> >>histograms.
> >>  If there is a lot of variance, your latency 

Re: latency test

2015-09-04 Thread Yuheng Du
No problem. Thanks for your advice. I think it would be fun to explore. I
only know how to program in java though. Hope it will work.

On Fri, Sep 4, 2015 at 2:03 PM, Helleren, Erik 
wrote:

> I thing the suggestion is to have partitions/brokers >=1, so 32 should be
> enough.
>
> As for latency tests, there isn’t a lot of code to do a latency test.  If
> you just want to measure ack time its around 100 lines.  I will try to
> push out some good latency testing code to github, but my company is
> scared of open sourcing code… so it might be a while…
> -Erik
>
>
> On 9/4/15, 12:55 PM, "Yuheng Du"  wrote:
>
> >Thanks for your reply Erik. I am running some more tests according to your
> >suggestions now and I will share with my results here. Is it necessary to
> >use a fixed number of partitions (32 partitions maybe) for my test?
> >
> >I am testing 2, 4, 8, 16 and 32 brokers scenarios, all of them are running
> >on individual physical nodes. So I think using at least 32 partitions will
> >make more sense? I have seen latencies increase as the number of
> >partitions
> >goes up in my experiments.
> >
> >To get the latency of each event data recorded, are you suggesting that I
> >rewrite my own test program (in Java perhaps) or I can just modify the
> >standard test program provided by kafka (
> >https://gist.github.com/jkreps/c7ddb4041ef62a900e6c )? I guess I need to
> >rebuild the source if I modify the standard java test program
> >ProducerPerformance provided in kafka, right? Now this standard program
> >only has average latencies and percentile latencies but no per event
> >latencies.
> >
> >Thanks.
> >
> >On Fri, Sep 4, 2015 at 1:42 PM, Helleren, Erik
> >
> >wrote:
> >
> >> That is an excellent question!  There are a bunch of ways to monitor
> >> jitter and see when that is happening.  Here are a few:
> >>
> >> - You could slice the histogram every few seconds, save it out with a
> >> timestamp, and then look at how they compare.  This would be mostly
> >> manual, or you can graph line charts of the percentiles over time in
> >>excel
> >> where each percentile would be a series.  If you are using HDR
> >>Histogram,
> >> you should look at how to use the Recorder class to do this coupled
> >>with a
> >> ScheduledExecutorService.
> >>
> >> - You can just save the starting timestamp of the event and the latency
> >>of
> >> each event.  If you put it into a CSV, you can just load it up into
> >>excel
> >> and graph as a XY chart.  That way you can see every point during the
> >> running of your program and you can see trends.  You want to be careful
> >> about this one, especially of writing to a file in the callback that
> >>kfaka
> >> provides.
> >>
> >> Also, I have noticed that most of the very slow observations are at
> >> startup.  But don’t trust me, trust the data and share your findings.
> >> Also, having a 99.9 percentile provides a pretty good standard for
> >>typical
> >> poor case performance.  Average is borderline useless, 50%’ile is a
> >>better
> >> typical case because that’s the number that says “half of events will be
> >> this slow or faster”, or for values that are high like 99.9%’ile, “0.1%
> >>of
> >> all events will be slower than this”.
> >> -Erik
> >>
> >> On 9/4/15, 12:05 PM, "Yuheng Du"  wrote:
> >>
> >> >Thank you Erik! That's is helpful!
> >> >
> >> >But also I see jitters of the maximum latencies when running the
> >> >experiment.
> >> >
> >> >The average end to acknowledgement latency from producer to broker is
> >> >around 5ms when using 92 producers and 4 brokers, and the 99.9
> >>percentile
> >> >latency is 58ms, but the maximum latency goes up to 1359 ms. How to
> >>locate
> >> >the source of this jitter?
> >> >
> >> >Thanks.
> >> >
> >> >On Fri, Sep 4, 2015 at 10:54 AM, Helleren, Erik
> >> >
> >> >wrote:
> >> >
> >> >> WellŠ not to be contrarian, but latency depends much more on the
> >>latency
> >> >> between the producer and the broker that is the leader for the
> >>partition
> >> >> you are publishing to.  At least when your brokers are not saturated
> >> >>with
> >> >> messages, and acks are set to 1.  If acks are set to ALL, latency on
> >>an
> >> >> non-saturated kafka cluster will be: Round Trip Latency from
> >>producer to
> >> >> leader for partition + Max( slowest Round Trip Latency to a replicas
> >>of
> >> >> that partition).  If a cluster is saturated with messages, we have to
> >> >> assume that all partitions receive an equal distribution of messages
> >>to
> >> >> avoid linear algebra and queueing theory models.  I don¹t like linear
> >> >> algebra :P
> >> >>
> >> >> Since you are probably putting all your latencies into a single
> >> >>histogram
> >> >> per producer, or worse, just an average, this pattern would have been
> >> >> obscured.  Obligatory lecture about measuring latency by Gil Tene
> >> >> 

Re: Amount of partitions

2015-09-04 Thread Todd Palino
Jun's post is a good start, but I find it's easier to talk in terms of more
concrete reasons and guidance for having fewer or more partitions per topic.

Start with the number of brokers in the cluster. This is a good baseline
for the minimum number of partitions in a topic, as it will assure balance
over the cluster. Of course, if you have lots of topics, you can
potentially skip past this as you'll end up with balanced load in the
aggregate, but I think it's a good practice regardless. As with all other
advice here, there are always exceptions. If you really, really, really
need to assure ordering of messages, you might be stuck with a single
partition for some use cases.

In general, you should pick more partitions if a) the topic is very busy,
or b) you have more consumers. Looking at the second case first, you always
want to have at least as many partitions in a topic as you have individual
consumers in a consumer group. So if you have 16 consumers in a single
group, you will want the topic they consume to have at least 16 partitions.
In fact, you may also want to always have a multiple of the number of
consumers so that you have even distribution. How many consumers you have
in a group is going to be driven more by what you do with the messages once
they are consumed, so here you'll be looking from the bottom of your stack
up, until you get to Kafka.

How busy the topic is is looking from the top down, through the producer,
to Kafka. It's also a little more difficult to provide guidance on. We have
a policy of expanding partitions for a topic whenever the size of the
partition on disk (full retention over 4 days) is larger than 50 GB. We
find that this gives us a few benefits. One is that it takes a reasonable
amount of time when we need to move a partition from one broker to another.
Another is that when we have partitions that are larger than this, the rate
tends to cause problems with consumers. For example, we see mirror maker
perform much better, and have less spiky lag problems, when we stay under
this limit. We're even considering revising the limit down a little, as
we've had some reports from other wildcard consumers that they've had
problems keeping up with topics that have partitions larger than about 30
GB.

The last thing to look at is whether or not you are producing keyed
messages to the topic. If you're working with unkeyed messages, there is no
problem. You can usually add partitions whenever you want to down the road
with little coordination with producers and consumers. If you are producing
keyed messages, there is a good chance you do not want to change the
distribution of keys to partitions at various points in the future when you
need to size up. This means that when you first create the topic, you
probably want to create it with enough partitions to deal with growth over
time, both on the produce and consume side, even if that is too many
partitions right now by other measures. For example, we have one client who
requested 720 partitions for a particular set of topics. The reasoning was
that they are producing keyed messages, they wanted to account for growth,
and they wanted even distribution of the partitions to consumers as they
grow. 720 happens to have a lot of factors, so it was a good number for
them to pick.

As a note, we have up to 5000 partitions per broker right now on current
hardware, and we're moving to new hardware (more disk, 256 GB of memory,
10gig interfaces) where we're going to have up to 12,000. Our default
partition count for most clusters is 8, and we've got topics up to 512
partitions in some places just taking into account the produce rate alone
(not counting those 720-partition topics that aren't that busy). Many of
our brokers run with over 10k open file handles for regular files alone,
and over 50k open when you include network.

-Todd



On Fri, Sep 4, 2015 at 8:11 AM, tao xiao  wrote:

> Here is a good doc to describe how to choose the right number of partitions
>
>
> http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/
>
> On Fri, Sep 4, 2015 at 10:08 PM, Jörg Wagner 
> wrote:
>
> > Hello!
> >
> > Regarding the recommended amount of partitions I am a bit confused.
> > Basically I got the impression that it's better to have lots of
> partitions
> > (see information from linkedin etc). On the other hand, a lot of
> > performance benchmarks floating around show only a few partitions are
> being
> > used.
> >
> > Especially when considering the difference between hdd and ssds and also
> > the amount thereof, what is the way to go?
> >
> > In my case, I seem to have the best stability and performance issues with
> > few partitions *per hdd*, and only one io thread per disk.
> >
> > What are your experiences and recommendations?
> >
> > Cheers
> > Jörg
> >
>
>
>
> --
> Regards,
> Tao
>


new consumer API & release 0.8.3

2015-09-04 Thread Shashank Singh
Hi

I am eager to get to use the enhanced Consumer API which provides better
control in terms of offset management etc. As I believe from reading
through forums it is coming as part of 0.8.3 release. However there is no
tentative date for the same.

Can you please give any hint on that. Also which is the best forum to ask
questions on how these new APIs are shaping up and the details about the
same..

-- 

*Warm Regards,*

*Shashank  *

*Mobile: +91 9910478553 *

*Linkedin: in.linkedin.com/pub/shashank-singh/13/763/906/
*


Re: latency test

2015-09-04 Thread Helleren, Erik
I thing the suggestion is to have partitions/brokers >=1, so 32 should be
enough.  

As for latency tests, there isn’t a lot of code to do a latency test.  If
you just want to measure ack time its around 100 lines.  I will try to
push out some good latency testing code to github, but my company is
scared of open sourcing code… so it might be a while…
-Erik


On 9/4/15, 12:55 PM, "Yuheng Du"  wrote:

>Thanks for your reply Erik. I am running some more tests according to your
>suggestions now and I will share with my results here. Is it necessary to
>use a fixed number of partitions (32 partitions maybe) for my test?
>
>I am testing 2, 4, 8, 16 and 32 brokers scenarios, all of them are running
>on individual physical nodes. So I think using at least 32 partitions will
>make more sense? I have seen latencies increase as the number of
>partitions
>goes up in my experiments.
>
>To get the latency of each event data recorded, are you suggesting that I
>rewrite my own test program (in Java perhaps) or I can just modify the
>standard test program provided by kafka (
>https://gist.github.com/jkreps/c7ddb4041ef62a900e6c )? I guess I need to
>rebuild the source if I modify the standard java test program
>ProducerPerformance provided in kafka, right? Now this standard program
>only has average latencies and percentile latencies but no per event
>latencies.
>
>Thanks.
>
>On Fri, Sep 4, 2015 at 1:42 PM, Helleren, Erik
>
>wrote:
>
>> That is an excellent question!  There are a bunch of ways to monitor
>> jitter and see when that is happening.  Here are a few:
>>
>> - You could slice the histogram every few seconds, save it out with a
>> timestamp, and then look at how they compare.  This would be mostly
>> manual, or you can graph line charts of the percentiles over time in
>>excel
>> where each percentile would be a series.  If you are using HDR
>>Histogram,
>> you should look at how to use the Recorder class to do this coupled
>>with a
>> ScheduledExecutorService.
>>
>> - You can just save the starting timestamp of the event and the latency
>>of
>> each event.  If you put it into a CSV, you can just load it up into
>>excel
>> and graph as a XY chart.  That way you can see every point during the
>> running of your program and you can see trends.  You want to be careful
>> about this one, especially of writing to a file in the callback that
>>kfaka
>> provides.
>>
>> Also, I have noticed that most of the very slow observations are at
>> startup.  But don’t trust me, trust the data and share your findings.
>> Also, having a 99.9 percentile provides a pretty good standard for
>>typical
>> poor case performance.  Average is borderline useless, 50%’ile is a
>>better
>> typical case because that’s the number that says “half of events will be
>> this slow or faster”, or for values that are high like 99.9%’ile, “0.1%
>>of
>> all events will be slower than this”.
>> -Erik
>>
>> On 9/4/15, 12:05 PM, "Yuheng Du"  wrote:
>>
>> >Thank you Erik! That's is helpful!
>> >
>> >But also I see jitters of the maximum latencies when running the
>> >experiment.
>> >
>> >The average end to acknowledgement latency from producer to broker is
>> >around 5ms when using 92 producers and 4 brokers, and the 99.9
>>percentile
>> >latency is 58ms, but the maximum latency goes up to 1359 ms. How to
>>locate
>> >the source of this jitter?
>> >
>> >Thanks.
>> >
>> >On Fri, Sep 4, 2015 at 10:54 AM, Helleren, Erik
>> >
>> >wrote:
>> >
>> >> WellŠ not to be contrarian, but latency depends much more on the
>>latency
>> >> between the producer and the broker that is the leader for the
>>partition
>> >> you are publishing to.  At least when your brokers are not saturated
>> >>with
>> >> messages, and acks are set to 1.  If acks are set to ALL, latency on
>>an
>> >> non-saturated kafka cluster will be: Round Trip Latency from
>>producer to
>> >> leader for partition + Max( slowest Round Trip Latency to a replicas
>>of
>> >> that partition).  If a cluster is saturated with messages, we have to
>> >> assume that all partitions receive an equal distribution of messages
>>to
>> >> avoid linear algebra and queueing theory models.  I don¹t like linear
>> >> algebra :P
>> >>
>> >> Since you are probably putting all your latencies into a single
>> >>histogram
>> >> per producer, or worse, just an average, this pattern would have been
>> >> obscured.  Obligatory lecture about measuring latency by Gil Tene
>> >> (https://www.youtube.com/watch?v=9MKY4KypBzg).  To verify this
>> >>hypothesis,
>> >> you should re-write the benchmark to plot the latencies for each
>>write
>> >>to
>> >> a partition for each producer into a histogram. (HRD histogram is
>>pretty
>> >> good for that).  This would give you producers*partitions histograms,
>> >> which might be unwieldy for that many producers. But wait, there is
>> >>hope!
>> >>
>> >> To verify that this hypothesis holds, 

Re: latency test

2015-09-04 Thread Helleren, Erik
That is an excellent question!  There are a bunch of ways to monitor
jitter and see when that is happening.  Here are a few:

- You could slice the histogram every few seconds, save it out with a
timestamp, and then look at how they compare.  This would be mostly
manual, or you can graph line charts of the percentiles over time in excel
where each percentile would be a series.  If you are using HDR Histogram,
you should look at how to use the Recorder class to do this coupled with a
ScheduledExecutorService.

- You can just save the starting timestamp of the event and the latency of
each event.  If you put it into a CSV, you can just load it up into excel
and graph as a XY chart.  That way you can see every point during the
running of your program and you can see trends.  You want to be careful
about this one, especially of writing to a file in the callback that kfaka
provides.  

Also, I have noticed that most of the very slow observations are at
startup.  But don’t trust me, trust the data and share your findings.
Also, having a 99.9 percentile provides a pretty good standard for typical
poor case performance.  Average is borderline useless, 50%’ile is a better
typical case because that’s the number that says “half of events will be
this slow or faster”, or for values that are high like 99.9%’ile, “0.1% of
all events will be slower than this”.
-Erik 

On 9/4/15, 12:05 PM, "Yuheng Du"  wrote:

>Thank you Erik! That's is helpful!
>
>But also I see jitters of the maximum latencies when running the
>experiment.
>
>The average end to acknowledgement latency from producer to broker is
>around 5ms when using 92 producers and 4 brokers, and the 99.9 percentile
>latency is 58ms, but the maximum latency goes up to 1359 ms. How to locate
>the source of this jitter?
>
>Thanks.
>
>On Fri, Sep 4, 2015 at 10:54 AM, Helleren, Erik
>
>wrote:
>
>> WellŠ not to be contrarian, but latency depends much more on the latency
>> between the producer and the broker that is the leader for the partition
>> you are publishing to.  At least when your brokers are not saturated
>>with
>> messages, and acks are set to 1.  If acks are set to ALL, latency on an
>> non-saturated kafka cluster will be: Round Trip Latency from producer to
>> leader for partition + Max( slowest Round Trip Latency to a replicas of
>> that partition).  If a cluster is saturated with messages, we have to
>> assume that all partitions receive an equal distribution of messages to
>> avoid linear algebra and queueing theory models.  I don¹t like linear
>> algebra :P
>>
>> Since you are probably putting all your latencies into a single
>>histogram
>> per producer, or worse, just an average, this pattern would have been
>> obscured.  Obligatory lecture about measuring latency by Gil Tene
>> (https://www.youtube.com/watch?v=9MKY4KypBzg).  To verify this
>>hypothesis,
>> you should re-write the benchmark to plot the latencies for each write
>>to
>> a partition for each producer into a histogram. (HRD histogram is pretty
>> good for that).  This would give you producers*partitions histograms,
>> which might be unwieldy for that many producers. But wait, there is
>>hope!
>>
>> To verify that this hypothesis holds, you just have to see that there
>>is a
>> significant difference between different partitions on a SINGLE
>>producing
>> client. So, pick one producing client at random and use the data from
>> that. The easy way to do that is just plot all the partition latency
>> histograms on top of each other in the same plot, that way you have a
>> pretty plot to show people.  If you don¹t want to setup plotting, you
>>can
>> just compare the medians (50¹th percentile) of the partitions¹
>>histograms.
>>  If there is a lot of variance, your latency anomaly is explained by
>> brokers 4-7 being slower than nodes 0-3!  If there isn¹t a lot of
>>variance
>> at 50%, look at higher percentiles.  And if higher percentiles for all
>>the
>> partitions look the same, this hypothesis is disproved.
>>
>> If you want to make a general statement about latency of writing to
>>kafka,
>> you can merge all the histograms into a single histogram and plot that.
>>
>> To Yuheng¹s credit, more brokers always results in more throughput. But
>> throughput and latency are two different creatures.  Its worth noting
>>that
>> kafka is designed to be high throughput first and low latency second.
>>And
>> it does a really good job at both.
>>
>> Disclaimer: I might not like linear algebra, but I do like statistics.
>> Let me know if there are topics that need more explanation above that
>> aren¹t covered by Gil¹s lecture.
>> -Erik
>>
>> On 9/4/15, 9:03 AM, "Yuheng Du"  wrote:
>>
>> >When I using 32 partitions, the 4 brokers latency becomes larger than
>>the
>> >8
>> >brokers latency.
>> >
>> >So is it always true that using more brokers can give less latency when
>> >the
>> >number of partitions is at least the size of 

RE: When a message is exposed to the consumer

2015-09-04 Thread Roman Shramkov
Её ай н Анны уйг

sent from a mobile device, please excuse brevity and typos


Пользователь Yuheng Du написал 

According to the section 3.1 of the paper "Kafka: a Distributed Messaging
System for Log Processing":

"a message is only exposed to the consumers after it is flushed"?

Is it still true in the current kafka? like the message can only be
available after it is flushed to disk?

Thanks.


Re: Slow ISR catch-up

2015-09-04 Thread Prabhjot Bharaj
Hello friends,

Request your expertise on this problem I'm facing

Thanks
On Sep 4, 2015 8:09 PM, "Prabhjot Bharaj"  wrote:

> Hi,
>
> I am experiencing super slow throughput when using acks=-1
> Some further progress in continuation to the test in my previous email:-
>
> *Topic details -*
>
> Topic:temp PartitionCount:1 ReplicationFactor:3 Configs:
>
> Topic: temp Partition: 0 Leader: 5 Replicas: 5,1,2 Isr: 5,2,1
> *This is the command I'm running - *
>
> kafka-producer-perf-test.sh --broker-list 96.7.250.122:9092,
> 96.17.183.53:9092,96.17.183.54:9092,96.7.250.117:9092,96.7.250.118:9092
> --messages 10 --message-size 500 --topics temp --show-detailed-stats
> --threads 30 --request-num-acks -1 --batch-size 1000 --request-timeout-ms
> 1
>
> *Server.properties:-*
>
> broker.id=0
>
> port=9092
>
> *num.network.threads=6*
>
> num.io.threads=8
>
> *socket.send.buffer.bytes=10485760*
>
> *socket.receive.buffer.bytes=10485760*
>
> socket.request.max.bytes=104857600
>
> log.dirs=/tmp/kafka-logs
>
> num.partitions=1
>
> num.recovery.threads.per.data.dir=1
>
> log.retention.hours=168
>
> log.segment.bytes=1073741824
>
> log.retention.check.interval.ms=30
>
> log.cleaner.enable=false
>
> zookeeper.connect=localhost:2181
>
> zookeeper.connection.timeout.ms=6000
>
> *num.replica.fetchers=6*
> *Observation:-*
>
> I have also noticed that if I use acks=1 (without --sync) and immediately
> use acks=-1 (without --sync), the test completes very quickly. Also, after
> running  this, if I describe the topic, it is still not in sync, which
> means acks=-1 is treated as 1 only
>
> Also, when running on a freshly created topic with just acks=-1, it takes
> 8 minutes to complete with --sync
>
> time kafka-producer-perf-test.sh --broker-list 96.7.250.122:9092,
> 96.17.183.53:9092,96.17.183.54:9092,96.7.250.117:9092,96.7.250.118:9092
> --messages 10 --message-size 500 --topics temp --show-detailed-stats
> --threads 30 --request-num-acks -1 --batch-size 1000 --request-timeout-ms
> 1 --compression-codec 2 --sync
>
> start.time, end.time, compression, message.size, batch.size,
> total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec
>
> 2015-09-04 12:19:36:223, 2015-09-04 12:27:41:775, 2, 500, 1000, 47.68,
> 0.0982, 0, 205.9306
>
> real 8m6.563s
>
> user 0m19.787s
>
> sys 0m5.601s
>
> If I use --sync, it is taking way longer.
>
> Where am I doing wrong?
>
> Thanks,
> Prabhjot
>
> On Fri, Sep 4, 2015 at 1:45 AM, Gwen Shapira  wrote:
>
>> Yes, this should work. Expect lower throughput though.
>>
>> On Thu, Sep 3, 2015 at 12:52 PM, Prabhjot Bharaj 
>> wrote:
>>
>> > Hi,
>> >
>> > Can I use sync for acks = -1?
>> >
>> > Regards,
>> > Prabhjot
>> > On Sep 3, 2015 11:49 PM, "Gwen Shapira"  wrote:
>> >
>> > > The test uses the old producer (we should fix that), and since you
>> don't
>> > > specify --sync, it runs async.
>> > > The old async producer simply sends data and doesn't wait for acks,
>> so it
>> > > is possible that the messages were never acked...
>> > >
>> > > On Thu, Sep 3, 2015 at 7:56 AM, Prabhjot Bharaj <
>> prabhbha...@gmail.com>
>> > > wrote:
>> > >
>> > > > Hi Folks,
>> > > >
>> > > > Request your expertise on my doubt here.
>> > > >
>> > > > *My setup:-*
>> > > >
>> > > > 5 node kafka cluster (4 cores, 8GB RAM) on RAID-6 (500 GB)
>> > > > Using Kafka 0.8.2.1 with modified ProducerPerformance.scala
>> > > > I've modified ProducerPerformance.scala to send custom ASCII data,
>> > > instead
>> > > > of Byte Array of Zeroes
>> > > >
>> > > > *server.properties:-*
>> > > >
>> > > > broker.id=0
>> > > >
>> > > > log.cleaner.enable=false
>> > > >
>> > > > log.dirs=/tmp/kafka-logs
>> > > >
>> > > > log.retention.check.interval.ms=30
>> > > >
>> > > > log.retention.hours=168
>> > > >
>> > > > log.segment.bytes=1073741824
>> > > >
>> > > > num.io.threads=8
>> > > >
>> > > > num.network.threads=3
>> > > >
>> > > > num.partitions=1
>> > > >
>> > > > num.recovery.threads.per.data.dir=1
>> > > >
>> > > > *num.replica.fetchers=4*
>> > > >
>> > > > port=9092
>> > > >
>> > > > socket.receive.buffer.bytes=1048576
>> > > >
>> > > > socket.request.max.bytes=104857600
>> > > >
>> > > > socket.send.buffer.bytes=1048576
>> > > >
>> > > > zookeeper.connect=localhost:2181
>> > > >
>> > > > zookeeper.connection.timeout.ms=6000
>> > > >
>> > > >
>> > > > *This is how I run the producer perf test:-*
>> > > >
>> > > > kafka-producer-perf-test.sh --broker-list
>> > > > a.a.a.a:9092,b.b.b.b:9092,c.c.c.c:9092,d.d.d.d:9092,e.e.e.e:9092
>> > > --messages
>> > > > 10 --message-size 500 --topics temp --show-detailed-stats
>> > --threads
>> > > 5
>> > > > --request-num-acks -1 --batch-size 200 --request-timeout-ms 1
>> > > > --compression-codec 0
>> > > >
>> > > > *Problem:-*
>> > > >
>> > > > This test completes in under 15 seconds for me
>> > > >
>> > > > But, after this test, if I try writing to another topic which has 2
>> > 

Zombie Replica Fetcher Threads

2015-09-04 Thread John Watson
It would seem (if the metrics registry is accurate) that replica
fetcher threads can persist after a leadership election. Even when the
broker itself is elected leader.
This also seems to occur after a reassignment too (as evident by the 5
different thread entries for the same partition in the registry)

This leads to inaccurate MaxLag and ConsumerLag metrics to be
reported. Since it's whatever the last reported value is.

Restarting the offending broker clears these at least.