Variation of producer latency in ProducerPerformance test

2015-08-11 Thread Yuheng Du
Hi,

I am running a test which 92 producers each publish 53000 records of size
254 bytes to 2 brokers.

The average latency shown in each producer has high variations. For some
producer, the average latency is as low as 38ms to send the 53000 records;
but for some producer, the average latency is as high as 13067ms.

How to explain this problem? To get the lowest latencies, what batch size
and other important configs should I use?

Thanks!


Help with SocketTimeoutException while reading from Kafka cluster

2015-08-11 Thread venkatesh kavuluri
Hi All,


I am running a high level Kafka consumer and while it works fine against a
smaller staging cluster I am getting socket timeout exceptions while
running against larger LIVE cluster (against a topic with 100 partitions).
I am using "zookeeper.connect" to connect to the cluster.


*Stage cluster (Consumer gets messages)*:

1751 [ConsumerFetcherThread-local-enrich_-1439334511597-5eb31426-0-1]
INFO  kafka.consumer.ConsumerFetcherThread  -
[ConsumerFetcherThread-local-enrich_-1439334511597-5eb31426-0-1],
Starting

2344 [local-enrich_-1439334511597-5eb31426-leader-finder-thread] INFO
 kafka.consumer.ConsumerFetcherManager  -
[ConsumerFetcherManager-1439334511817] Added fetcher for partitions
ArrayBuffer([[fpti_onboarding_events,2], initOffset -1 to broker
id:1,host:XXX,port:9092] , [[fpti_onboarding_events,1], initOffset -1 to
broker id:1,host:,port:9092] , [[fpti_onboarding_events,0], initOffset
-1 to broker id:1,host:,port:9092] )

[ 2 ]** LISTENING TO EVENTS **

[ 0 ]** LISTENING TO EVENTS **

[ 1 ]** LISTENING TO EVENTS **


*Live cluster (Socket Timeouts )*:

3645
[ConsumerFetcherThread-c3-onboard_-2-9571-1439334326956-cfa8b46a-0-5]
INFO  kafka.consumer.ConsumerFetcherThread  -
[ConsumerFetcherThread-c3-onboard_-2-9571-1439334326956-cfa8b46a-0-5],
Starting

83799 [c3-onboard_-2-9571-1439334326956-cfa8b46a-leader-finder-thread]
INFO  kafka.consumer.SimpleConsumer  - Reconnect due to socket error:
java.net.SocketTimeoutException

163931 [c3-onboard_-2-9571-1439334326956-cfa8b46a-leader-finder-thread]
INFO  kafka.consumer.SimpleConsumer  - Reconnect due to socket error:
java.net.SocketTimeoutException

163969
[ConsumerFetcherThread-c3-onboard_-2-9571-1439334326956-cfa8b46a-0-1]
INFO  kafka.consumer.ConsumerFetcherThread  -
[ConsumerFetcherThread-c3-onboard_-2-9571-1439334326956-cfa8b46a-0-1],
Starting

Looks like leader-finder-thread is timing out in Live. Do you know why it
would happen.


I have the following timeouts defined in the configuration:

props.put("zookeeper.session.timeout.ms", "6400");

props.put("zookeeper.sync.time.ms", "600");

props.put("auto.offset.reset", "smallest");

props.put("socket.timeout.ms", "8");


lowlatency on kafka

2015-08-11 Thread Alvaro Gareppe
I'm starting to use kafka for a low latency application.

I need a topic that has over all process latency around 2 or 3 ms, (latency
from producer to consumer)

I want to use a async producer, but I'm not getting it to work that fast.
What are the key properties to configure in: producer, consumer, and topic
to accomplish the best latency possible ?

I can send you what I have configured so far.

Thank you --


Start consuming from latest message

2015-08-11 Thread Datta, Saurav
Hello,

I have a Spark source producing to Kafka, and a Java consumer. The consumer was 
shutdown for around a week while the producer was active.
Now when the consumer is active,  it is fetching messages from 7 days back.

I would like to ignore the older messages, and start from the latest one. How 
do I do it ?
This is for version 0.8.2.1.

Please share your suggestions.

Regards,
Saurav



RE: 0.8.2.1 upgrade causes much more IO

2015-08-11 Thread Matthew Bruce
Hi Andrew,

I work with Todd and did our 0.8.2.1 testing with him.  I believe that the 
Kafka 0.8.x brokers recompresses the messages once it receives them in, order 
to assign the offsets to the messages (see the 'Compression in Kafka' section 
of: http://nehanarkhede.com/2013/03/28/compression-in-kafka-gzip-or-snappy/). I 
expect that you will see an improvement with Snappy 1.1.1.7  (FWIW, our load 
generator's version of Snappy didn't change between our 0.8.1.1 and 0.8.2.1 
testing, and we still saw the IO hit on the broker side, which seems to confirm 
this).

Thanks,
Matt Bruce


From: Andrew Otto [mailto:ao...@wikimedia.org]
Sent: Tuesday, August 11, 2015 3:15 PM
To: users@kafka.apache.org
Cc: Dan Andreescu ; Joseph Allemandou 

Subject: Re: 0.8.2.1 upgrade causes much more IO

Hi Todd,

We are using snappy!  And we are using version 1.1.1.6 as of our upgrade to 
0.8.2.1 yesterday.  However, as far as I can tell, that is only relevant for 
Java producers, right?   Our main producers use librdkafka (the Kafka C lib) to 
produce, and in doing so use a built in C version of snappy[1].

Even so, your issue sounds very similar to mine, and I don't have a full 
understanding of how brokers deal with compression, so I have updated the 
snappy java version to 1.1.1.7 on one of our brokers.  We'll have to wait a 
while to see if the log sizes are actually smaller for data written to this 
broker.

Thanks!




[1] https://github.com/edenhill/librdkafka/blob/0.8.5/src/snappy.c
On Aug 11, 2015, at 12:58, Todd Snyder 
mailto:tsny...@blackberry.com>> wrote:

Hi Andrew,

Are you using Snappy Compression by chance?  When we tested the 0.8.2.1 upgrade 
initially we saw similar results and tracked it down to a problem with Snappy 
version 1.1.1.6 (https://issues.apache.org/jira/browse/KAFKA-2189).  We're 
running with Snappy 1.1.1.7 now and the performance is back to where it used to 
be.



Sent from my BlackBerry 10 smartphone on the TELUS network.
From: Andrew Otto
Sent: Tuesday, August 11, 2015 12:26 PM
To: users@kafka.apache.org
Reply To: users@kafka.apache.org
Cc: Dan Andreescu; Joseph Allemandou
Subject: 0.8.2.1 upgrade causes much more IO


Hi all!

Yesterday I did a production upgrade of our 4 broker Kafka cluster from 0.8.1.1 
to 0.8.2.1.

When we did so, we were running our (varnishkafka) producers with 
request.required.acks = -1.  After switching to 0.8.2.1, producers saw produce 
response RTTs of >60 seconds.  I then switched to request.required.acks = 1, 
and producers settled down.  However, we then started seeing flapping ISRs 
about every 10 minutes.  We run Camus every 10 minutes.  If we disable Camus, 
then ISRs don't flap.

All of these issues seem to be a side affect of a larger problem.  The total 
amount of network and disk IO that Kafka brokers are doing after the upgrade to 
0.8.2.1 has tripled.  We were previously seeing about 20 MB/s incoming on 
broker interfaces, 0.8.2.1 knocks this up to around 60 MB/s.  Disk writes have 
tripled accordingly.  Disk reads have also increased by a huge amount, although 
I suspect this is a consequence of more data flying around somehow dirtying the 
disk cache

You can see these changes in this dashboard: 
http://grafana.wikimedia.org/#/dashboard/db/kafka-0821-upgrade

The upgrade started at around 2015-08-10 14:30, and was completed on all 4 
brokers within a couple of hours.

Probably the most relevant is network rx_bytes on brokers.

[cid:image001.png@01D0D44C.7A1DF110]


We looked at Kafka .log file sizes and noticed that file sizes are indeed much 
larger than they were before this upgrade:

# 0.8.1.1
2015-08-10T04 38119109383
2015-08-10T05 46172089174
2015-08-10T06 46172182745
2015-08-10T07 53151490032
2015-08-10T08 53151892928
2015-08-10T09 55836248198
2015-08-10T10 57984054557
2015-08-10T11 63353197416
2015-08-10T12 68184938548
2015-08-10T13 69259218741
2015-08-10T14 79567698089
# Upgrade to 0.8.2.1 starts here
2015-08-10T15 133643184876
2015-08-10T16 168515916825
2015-08-10T17 181394338213
2015-08-10T18 177097927553
2015-08-10T19 183530782549
2015-08-10T20 178706680082
2015-08-10T21 178712665924
2015-08-10T22 171741495606
2015-08-10T23 169049665348
2015-08-11T00 163682183241
2015-08-11T01 165292426510


Aside from the request.required.acks change I mentioned above, we haven't made 
any config changes on brokers, producers, or consumers.  Our server.properties 
file is here: https://gist.github.com/ottomata/cdd270102287661c176a

Has anyone seen this before?  What could be the cause of more data here?  
Perhaps there is some compression config change that we missed that is causing 
this data to be sent or saved uncompressed?  (Sent uncompressed is unlikely, as 
we would probably notice a larger network change on the producers than we do.  
(Unless I'm looking at that wrong right now...:))  Is there a quick way to tell 
if the data is compressed?


Thanks!
-Andrew Otto


--

Re: 0.8.2.1 upgrade causes much more IO

2015-08-11 Thread Todd Snyder
Hi Andrew,



Are you using Snappy Compression by chance?  When we tested the 0.8.2.1 upgrade 
initially we saw similar results and tracked it down to a problem with Snappy 
version 1.1.1.6 (https://issues.apache.org/jira/browse/KAFKA-2189).  We’re 
running with Snappy 1.1.1.7 now and the performance is back to where it used to 
be.


Sent from my BlackBerry 10 smartphone on the TELUS network.
From: Andrew Otto
Sent: Tuesday, August 11, 2015 12:26 PM
To: users@kafka.apache.org
Reply To: users@kafka.apache.org
Cc: Dan Andreescu; Joseph Allemandou
Subject: 0.8.2.1 upgrade causes much more IO


Hi all!

Yesterday I did a production upgrade of our 4 broker Kafka cluster from 0.8.1.1 
to 0.8.2.1.

When we did so, we were running our (varnishkafka) producers with 
request.required.acks = -1.  After switching to 0.8.2.1, producers saw produce 
response RTTs of >60 seconds.  I then switched to request.required.acks = 1, 
and producers settled down.  However, we then started seeing flapping ISRs 
about every 10 minutes.  We run Camus every 10 minutes.  If we disable Camus, 
then ISRs don’t flap.

All of these issues seem to be a side affect of a larger problem.  The total 
amount of network and disk IO that Kafka brokers are doing after the upgrade to 
0.8.2.1 has tripled.  We were previously seeing about 20 MB/s incoming on 
broker interfaces, 0.8.2.1 knocks this up to around 60 MB/s.  Disk writes have 
tripled accordingly.  Disk reads have also increased by a huge amount, although 
I suspect this is a consequence of more data flying around somehow dirtying the 
disk cache

You can see these changes in this dashboard: 
http://grafana.wikimedia.org/#/dashboard/db/kafka-0821-upgrade

The upgrade started at around 2015-08-10 14:30, and was completed on all 4 
brokers within a couple of hours.

Probably the most relevant is network rx_bytes on brokers.

[cid:099E3DC1-28F5-4BFC-A149-691DB87B01FD]


We looked at Kafka .log file sizes and noticed that file sizes are indeed much 
larger than they were before this upgrade:

# 0.8.1.1
2015-08-10T04 38119109383
2015-08-10T05 46172089174
2015-08-10T06 46172182745
2015-08-10T07 53151490032
2015-08-10T08 53151892928
2015-08-10T09 55836248198
2015-08-10T10 57984054557
2015-08-10T11 63353197416
2015-08-10T12 68184938548
2015-08-10T13 69259218741
2015-08-10T14 79567698089
# Upgrade to 0.8.2.1 starts here
2015-08-10T15 133643184876
2015-08-10T16 168515916825
2015-08-10T17 181394338213
2015-08-10T18 177097927553
2015-08-10T19 183530782549
2015-08-10T20 178706680082
2015-08-10T21 178712665924
2015-08-10T22 171741495606
2015-08-10T23 169049665348
2015-08-11T00 163682183241
2015-08-11T01 165292426510


Aside from the request.required.acks change I mentioned above, we haven’t made 
any config changes on brokers, producers, or consumers.  Our server.properties 
file is here: https://gist.github.com/ottomata/cdd270102287661c176a

Has anyone seen this before?  What could be the cause of more data here?  
Perhaps there is some compression config change that we missed that is causing 
this data to be sent or saved uncompressed?  (Sent uncompressed is unlikely, as 
we would probably notice a larger network change on the producers than we do.  
(Unless I’m looking at that wrong right now…:))  Is there a quick way to tell 
if the data is compressed?


Thanks!
-Andrew Otto


-
This transmission (including any attachments) may contain confidential 
information, privileged material (including material protected by the 
solicitor-client or other applicable privileges), or constitute non-public 
information. Any use of this information by anyone other than the intended 
recipient is prohibited. If you have received this transmission in error, 
please immediately reply to the sender and delete this information from your 
system. Use, dissemination, distribution, or reproduction of this transmission 
by unintended recipients is not authorized and may be unlawful.


Re: Topic auto deletion

2015-08-11 Thread Grant Henke
Hi Shahar,

This feature does not exist.

Thanks,
Grant


On Tue, Aug 11, 2015 at 2:28 AM, Shahar Danos 
wrote:

> Hi,
>
> I'm wondering whether Kafka 0.8.2 has a topic auto-deletion feature, but
> couldn't figure it out from documentation<
> https://kafka.apache.org/082/ops.html>.
> I'm aware of delete.topic.enable config parameter, but I'm asking about an
> expiration aspect; when a topic exists but no one used it (consume/produce)
> for the last X mins, it will be auto-removed. I wonder if I can configure
> topics to be expired after a time I choose.
>
> Many thanks in advance,
> Shahar.
>



-- 
Grant Henke
Software Engineer | Cloudera
gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke


broker.id does not work still showing 0,1 while it was set to 7,8

2015-08-11 Thread shahab
Hi,

I am using kafka version 8.2.1 and I setup a cluster of two servers with
broker.id= 7 and other one as 8. However, when I create a topic and I get
topic description I only see 0,1 as leader, Replicas and Isr, and there is
nothing about showing 7,8 as broker ids. I restarted couple of times but it
did not change.

Does anyone know how to resolve this?

best,
/Shahab


Re: how to get single record from kafka topic+partition @ specified offset

2015-08-11 Thread Joe Lawson
Great thanks!

On Mon, Aug 10, 2015 at 7:03 PM, Ewen Cheslack-Postava 
wrote:

> Right now I think the only place the new API is documented is in the
> javadocs. Here are the relevant sections for replacing the simple consumer.
>
> Subscribing to specific partitions:
>
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L204
> Seeking to specific partitions:
>
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L282
>
> With the new API you'll just need to do something like this:
>
> TopicPartition tp = new TopicPartition("topic", 1);
> long offset = 100;
>
> KafkaConsumer consumer = new KafkaConsumer(props);
> consumer.subscribe(tp);
> consumer.seek(tp, offset);
> while(true) {
>ConsumerRecords records = consumer.poll();
>if (!records.isEmpty()) {
>   // records[0] will be the message you wanted
>   break;
>}
> }
>
>
>
> On Mon, Aug 10, 2015 at 3:52 PM, Joe Lawson <
> jlaw...@opensourceconnections.com> wrote:
>
> > Ewen,
> >
> > Do you have an example or link for the changes/plans that will bring the
> > benefits you describe?
> >
> > Cheers,
> >
> > Joe Lawson
> > On Aug 10, 2015 3:27 PM, "Ewen Cheslack-Postava" 
> > wrote:
> >
> > > You can do this using the SimpleConsumer. See
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
> > > for details with some code.
> > >
> > > When the new consumer is released in 0.8.3, this will get a *lot*
> > simpler.
> > >
> > > -Ewen
> > >
> > > On Fri, Aug 7, 2015 at 9:26 AM, Padgett, Ben 
> > > wrote:
> > >
> > > > Does anyone have an example of how to get a single record from a
> > > > topic+partition given a specific offset?
> > > >
> > > > I am interested in this for some retry logic for failed messages.
> > > >
> > > > Thanks!
> > > >
> > > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> > >
> >
>
>
>
> --
> Thanks,
> Ewen
>


Topic auto deletion

2015-08-11 Thread Shahar Danos
Hi,

I'm wondering whether Kafka 0.8.2 has a topic auto-deletion feature, but 
couldn't figure it out from 
documentation.
I'm aware of delete.topic.enable config parameter, but I'm asking about an 
expiration aspect; when a topic exists but no one used it (consume/produce) for 
the last X mins, it will be auto-removed. I wonder if I can configure topics to 
be expired after a time I choose.

Many thanks in advance,
Shahar.


How to sync back a failed broker?

2015-08-11 Thread shahab
Hi,

I have a kafka cluster consisting of two servers. I created a topic
"ReplicatedTopic" with 3 partitions and replication factor of 2.

Next, I shutdown one the servers for 20 sec and return it back again.
Then, I looked at status of topic  "ReplicatedTopic"  and I got the
followings:

Topic:ReplicatedTopic PartitionCount:3 ReplicationFactor:2 Configs:
Topic: ReplicatedTopic Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1
Topic: ReplicatedTopic Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1
Topic: ReplicatedTopic Partition: 2 Leader: 1 Replicas: 0,1 Isr: 1

Although broker zero is back. it seems it is not in sync with leader and in
fact it never became in sync again.

Now question is how to make first broker in sync again so it appears both
in "isr" list and also it becomes leader for one of the partitions?

best,
/Shahab


Re: How to read messages from Kafka by specific time?

2015-08-11 Thread shahab
Thanks Ewen for the clarification.  I will test this.

best,
/Shahab

On Mon, Aug 10, 2015 at 9:03 PM, Ewen Cheslack-Postava 
wrote:

> You can use SimpleConsumer.getOffsetsBefore to get a list of offsets before
> a Unix timestamp. However, this isn't per-message. The offests returned are
> for the log segments stored on the broker, so the granularity will depend
> on your log rolling settings.
>
> -Ewen
>
> On Wed, Aug 5, 2015 at 2:11 AM, shahab  wrote:
>
> > Hi,
> >
> > Probably this question has been already asked before, but I couldn't find
> > it,
> >
> > I would like to fetch data from kafka by timestamp, and according to Kafk
> > FAQ (
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest
> > ?)
> > "Kafka allows querying offsets of messages by time", I tried to use
> > UnixTimeStamp instead  in the offset request, but every time I got an
> empty
> > array, simply it didn't work.
> >
> > Based on my google search this is not possible, but Kafka FAQ states that
> > this is possible!
> > Does any one know how to do this? I do appreciate it.
> >
> > best,
> > /Shahab
> >
>
>
>
> --
> Thanks,
> Ewen
>