Re: [ANNOUNCE] New committer: Matthias J. Sax

2018-01-12 Thread Neha Narkhede
Congratulations Matthias. Very well deserved!
On Fri, Jan 12, 2018 at 7:11 PM Manikumar  wrote:

> Congrats.  well deserved.
>
> On Sat, Jan 13, 2018 at 8:37 AM, Martin Gainty 
> wrote:
>
> > Willkommen Matthias!
> > Martin-
> > 
> > From: Damian Guy 
> > Sent: Friday, January 12, 2018 7:43 PM
> > To: users@kafka.apache.org
> > Cc: dev
> > Subject: Re: [ANNOUNCE] New committer: Matthias J. Sax
> >
> > Can't think of anyone me deserving! Congratulations Matthias!
> > On Sat, 13 Jan 2018 at 00:17, Ismael Juma  wrote:
> >
> > > Congratulations Matthias!
> > >
> > > On 12 Jan 2018 10:59 pm, "Guozhang Wang"  wrote:
> > >
> > > > Hello everyone,
> > > >
> > > > The PMC of Apache Kafka is pleased to announce Matthias J. Sax as our
> > > > newest Kafka committer.
> > > >
> > > > Matthias has made tremendous contributions to Kafka Streams API since
> > > early
> > > > 2016. His footprint has been all over the places in Streams: in the
> > past
> > > > two years he has been the main driver on improving the join semantics
> > > > inside Streams DSL, summarizing all their shortcomings and bridging
> the
> > > > gaps; he has also been largely working on the exactly-once semantics
> of
> > > > Streams by leveraging on the transaction messaging feature in 0.11.0.
> > In
> > > > addition, Matthias have been very active in community activity that
> > goes
> > > > beyond mailing list: he's getting the close to 1000 up votes and 100
> > > > helpful flags on SO for answering almost all questions about Kafka
> > > Streams.
> > > >
> > > > Thank you for your contribution and welcome to Apache Kafka,
> Matthias!
> > > >
> > > >
> > > >
> > > > Guozhang, on behalf of the Apache Kafka PMC
> > > >
> > >
> >
>
-- 
Thanks,
Neha


Re: [ANNOUNCE] New committer: Damian Guy

2017-06-09 Thread Neha Narkhede
Well deserved. Congratulations Damian!

On Fri, Jun 9, 2017 at 1:34 PM Guozhang Wang  wrote:

> Hello all,
>
>
> The PMC of Apache Kafka is pleased to announce that we have invited Damian
> Guy as a committer to the project.
>
> Damian has made tremendous contributions to Kafka. He has not only
> contributed a lot into the Streams api, but have also been involved in many
> other areas like the producer and consumer clients, broker-side
> coordinators (group coordinator and the ongoing transaction coordinator).
> He has contributed more than 100 patches so far, and have been driving on 6
> KIP contributions.
>
> More importantly, Damian has been a very prolific reviewer on open PRs and
> has been actively participating on community activities such as email lists
> and slack overflow questions. Through his code contributions and reviews,
> Damian has demonstrated good judgement on system design and code qualities,
> especially on thorough unit test coverages. We believe he will make a great
> addition to the committers of the community.
>
>
> Thank you for your contributions, Damian!
>
>
> -- Guozhang, on behalf of the Apache Kafka PMC
>
-- 
Thanks,
Neha


Re: [VOTE] KIP-156 Add option "dry run" to Streams application reset tool

2017-05-10 Thread Neha Narkhede
+1

On Wed, May 10, 2017 at 12:32 PM Gwen Shapira  wrote:

> +1. Also not sure that adding a parameter to a CLI requires a KIP. It seems
> excessive.
>
>
> On Tue, May 9, 2017 at 7:57 PM Jay Kreps  wrote:
>
> > +1
> > On Tue, May 9, 2017 at 3:41 PM BigData dev 
> > wrote:
> >
> > > Hi, Everyone,
> > >
> > > Since this is a relatively simple change, I would like to start the
> > voting
> > > process for KIP-156: Add option "dry run" to Streams application reset
> > tool
> > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=69410150
> > >
> > >
> > > The vote will run for a minimum of 72 hours.
> > >
> > >
> > > Thanks,
> > >
> > > Bharat
> > >
> >
>
-- 
Thanks,
Neha


Re: [VOTE] 0.10.2.1 RC1

2017-04-14 Thread Neha Narkhede
+1 (binding). Verified signatures, ran quickstart and tests.

On Fri, Apr 14, 2017 at 3:45 PM Gwen Shapira  wrote:

> Verified my own signatures, ran quickstart and created few Connectors.
>
> +1 (binding)
>
>
> On Wed, Apr 12, 2017 at 5:25 PM, Gwen Shapira  wrote:
> > Hello Kafka users, developers, client-developers, friends, romans,
> > citizens, etc,
> >
> > This is the second candidate for release of Apache Kafka 0.10.2.1.
> >
> > This is a bug fix release and it includes fixes and improvements from 24
> JIRAs
> > (including a few critical bugs).
> >
> > Release notes for the 0.10.2.1 release:
> > http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc1/RELEASE_NOTES.html
> >
> > *** Please download, test and vote by Monday, April 17, 5:30 pm PT
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > http://kafka.apache.org/KEYS
> >
> > Your help in validating this bugfix release is super valuable, so
> > please take the time to test and vote!
> >
> > Suggested tests:
> >  * Grab the source archive and make sure it compiles
> >  * Grab one of the binary distros and run the quickstarts against them
> >  * Extract and verify one of the site docs jars
> >  * Build a sample against jars in the staging repo
> >  * Validate GPG signatures on at least one file
> >  * Validate the javadocs look ok
> >  * The 0.10.2 documentation was updated for this bugfix release
> > (especially upgrade, streams and connect portions) - please make sure
> > it looks ok: http://kafka.apache.org/documentation.html
> >
> > * Release artifacts to be voted upon (source and binary):
> > http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc1/
> >
> > * Maven artifacts to be voted upon:
> > https://repository.apache.org/content/groups/staging/
> >
> > * Javadoc:
> > http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc1/javadoc/
> >
> > * Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.1 tag:
> >
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=e133f2ca57670e77f8114cc72dbc2f91a48e3a3b
> >
> > * Documentation:
> > http://kafka.apache.org/0102/documentation.html
> >
> > * Protocol:
> > http://kafka.apache.org/0102/protocol.html
> >
> > /**
> >
> > Thanks,
> >
> > Gwen Shapira
>
>
>
> --
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 <(650)%20450-2760> | @gwenshap
> Follow us: Twitter | blog
>
-- 
Thanks,
Neha


Re: [ANNOUNCE] New committer: Grant Henke

2017-01-12 Thread Neha Narkhede
Congratulations, Grant. Well deserved!

On Thu, Jan 12, 2017 at 7:51 AM Grant Henke  wrote:

> Thanks everyone!
>
> On Thu, Jan 12, 2017 at 2:58 AM, Damian Guy  wrote:
>
> > Congratulations!
> >
> > On Thu, 12 Jan 2017 at 03:35 Jun Rao  wrote:
> >
> > > Grant,
> > >
> > > Thanks for all your contribution! Congratulations!
> > >
> > > Jun
> > >
> > > On Wed, Jan 11, 2017 at 2:51 PM, Gwen Shapira 
> wrote:
> > >
> > > > The PMC for Apache Kafka has invited Grant Henke to join as a
> > > > committer and we are pleased to announce that he has accepted!
> > > >
> > > > Grant contributed 88 patches, 90 code reviews, countless great
> > > > comments on discussions, a much-needed cleanup to our protocol and
> the
> > > > on-going and critical work on the Admin protocol. Throughout this, he
> > > > displayed great technical judgment, high-quality work and willingness
> > > > to contribute where needed to make Apache Kafka awesome.
> > > >
> > > > Thank you for your contributions, Grant :)
> > > >
> > > > --
> > > > Gwen Shapira
> > > > Product Manager | Confluent
> > > > 650.450.2760 <(650)%20450-2760> <(650)%20450-2760> | @gwenshap
> > > > Follow us: Twitter | blog
> > > >
> > >
> >
>
>
>
> --
> Grant Henke
> Software Engineer | Cloudera
> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
>
-- 
Thanks,
Neha


Re: [kafka-clients] [VOTE] 0.10.1.0 RC3

2016-10-19 Thread Neha Narkhede
+1 (binding)

Verified quick start and artifacts.
On Mon, Oct 17, 2016 at 10:39 PM Dana Powers  wrote:

> +1 -- passes kafka-python integration tests
>
> On Mon, Oct 17, 2016 at 1:28 PM, Jun Rao  wrote:
> > Thanks for preparing the release. Verified quick start on scala 2.11
> binary.
> > +1
> >
> > Jun
> >
> > On Fri, Oct 14, 2016 at 4:29 PM, Jason Gustafson 
> wrote:
> >>
> >> Hello Kafka users, developers and client-developers,
> >>
> >> One more RC for 0.10.1.0. We're hoping this is the final one so that we
> >> can meet the release target date of Oct. 17 (Monday). Please let me
> know as
> >> soon as possible if you find any major problems.
> >>
> >> Release plan:
> >> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.10.1.
> >>
> >> Release notes for the 0.10.1.0 release:
> >> http://home.apache.org/~jgus/kafka-0.10.1.0-rc3/RELEASE_NOTES.html
> >>
> >> *** Please download, test and vote by Monday, Oct 17, 5pm PT
> >>
> >> Kafka's KEYS file containing PGP keys we use to sign the release:
> >> http://kafka.apache.org/KEYS
> >>
> >> * Release artifacts to be voted upon (source and binary):
> >> http://home.apache.org/~jgus/kafka-0.10.1.0-rc3/
> >>
> >> * Maven artifacts to be voted upon:
> >> https://repository.apache.org/content/groups/staging/
> >>
> >> * Javadoc:
> >> http://home.apache.org/~jgus/kafka-0.10.1.0-rc3/javadoc/
> >>
> >> * Tag to be voted upon (off 0.10.1 branch) is the 0.10.1.0-rc3 tag:
> >>
> >>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=50f30a44f31fca1bd9189d2814388d51bd56b06b
> >>
> >> * Documentation:
> >> http://kafka.apache.org/0101/documentation.html
> >>
> >> * Protocol:
> >> http://kafka.apache.org/0101/protocol.html
> >>
> >> * Tests:
> >> Unit tests: https://builds.apache.org/job/kafka-0.10.1-jdk7/71/
> >> System tests:
> >>
> http://testing.confluent.io/confluent-kafka-0-10-1-system-test-results/?prefix=2016-10-13--001.1476369986--apache--0.10.1--ee212d1/
> >>
> >> (Note that these tests do not include a couple patches merged today. I
> >> will send links to updated test builds as soon as they are available)
> >>
> >> Thanks,
> >>
> >> Jason
> >>
> >> --
> >> You received this message because you are subscribed to the Google
> Groups
> >> "kafka-clients" group.
> >> To unsubscribe from this group and stop receiving emails from it, send
> an
> >> email to kafka-clients+unsubscr...@googlegroups.com.
> >> To post to this group, send email to kafka-clie...@googlegroups.com.
> >> Visit this group at https://groups.google.com/group/kafka-clients.
> >> To view this discussion on the web visit
> >>
> https://groups.google.com/d/msgid/kafka-clients/CAJDuW%3DBm0HCOjiHiwnW3WJ_i5u_0e4J2G_mZ_KBkB_WEmo7pNg%40mail.gmail.com
> .
> >> For more options, visit https://groups.google.com/d/optout.
> >
> >
> > --
> > You received this message because you are subscribed to the Google Groups
> > "kafka-clients" group.
> > To unsubscribe from this group and stop receiving emails from it, send an
> > email to kafka-clients+unsubscr...@googlegroups.com.
> > To post to this group, send email to kafka-clie...@googlegroups.com.
> > Visit this group at https://groups.google.com/group/kafka-clients.
> > To view this discussion on the web visit
> >
> https://groups.google.com/d/msgid/kafka-clients/CAFc58G_Atqyc7-O13EGnRNibng5UPo-a_2h00N2%3D%3DMtWktm%3D1g%40mail.gmail.com
> .
> >
> > For more options, visit https://groups.google.com/d/optout.
>
-- 
Thanks,
Neha


[ANNOUNCE] New committer: Jason Gustafson

2016-09-06 Thread Neha Narkhede
The PMC for Apache Kafka has invited Jason Gustafson to join as a committer and
we are pleased to announce that he has accepted!

Jason has contributed numerous patches to a wide range of areas, notably
within the new consumer and the Kafka Connect layers. He has displayed
great taste and judgement which has been apparent through his involvement
across the board from mailing lists, JIRA, code reviews to contributing
features, bug fixes and code and documentation improvements.

Thank you for your contribution and welcome to Apache Kafka, Jason!
-- 
Thanks,
Neha


Re: [VOTE] 0.10.0.1 RC2

2016-08-05 Thread Neha Narkhede
+1 (binding)

On Fri, Aug 5, 2016 at 12:29 PM, Grant Henke  wrote:

> +1 (non-binding)
>
> On Fri, Aug 5, 2016 at 2:04 PM, Dana Powers  wrote:
>
> > passed kafka-python integration tests, +1
> >
> > -Dana
> >
> >
> > On Fri, Aug 5, 2016 at 9:35 AM, Tom Crayford 
> wrote:
> > > Heroku has tested this using the same performance testing setup we used
> > to
> > > evaluate the impact of 0.9 -> 0.10 (see https://engineering.
> > > heroku.com/blogs/2016-05-27-apache-kafka-010-evaluating-
> > > performance-in-distributed-systems/).
> > >
> > > We see no issues at all with them, so +1 (non-binding) from here.
> > >
> > > On Fri, Aug 5, 2016 at 12:58 PM, Jim Jagielski 
> wrote:
> > >
> > >> Looks good here: +1
> > >>
> > >> > On Aug 4, 2016, at 9:54 AM, Ismael Juma  wrote:
> > >> >
> > >> > Hello Kafka users, developers and client-developers,
> > >> >
> > >> > This is the third candidate for the release of Apache Kafka
> 0.10.0.1.
> > >> This
> > >> > is a bug fix release and it includes fixes and improvements from 53
> > JIRAs
> > >> > (including a few critical bugs). See the release notes for more
> > details:
> > >> >
> > >> > http://home.apache.org/~ijuma/kafka-0.10.0.1-rc2/RELEASE_NOTES.html
> > >> >
> > >> > When compared to RC1, RC2 contains a fix for a regression where an
> > older
> > >> > version of slf4j-log4j12 was also being included in the libs folder
> of
> > >> the
> > >> > binary tarball (KAFKA-4008). Thanks to Manikumar Reddy for reporting
> > the
> > >> > issue.
> > >> >
> > >> > *** Please download, test and vote by Monday, 8 August, 8am PT ***
> > >> >
> > >> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > >> > http://kafka.apache.org/KEYS
> > >> >
> > >> > * Release artifacts to be voted upon (source and binary):
> > >> > http://home.apache.org/~ijuma/kafka-0.10.0.1-rc2/
> > >> >
> > >> > * Maven artifacts to be voted upon:
> > >> > https://repository.apache.org/content/groups/staging
> > >> >
> > >> > * Javadoc:
> > >> > http://home.apache.org/~ijuma/kafka-0.10.0.1-rc2/javadoc/
> > >> >
> > >> > * Tag to be voted upon (off 0.10.0 branch) is the 0.10.0.1-rc2 tag:
> > >> > https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> > >> f8f56751744ba8e55f90f5c4f3aed8c3459447b2
> > >> >
> > >> > * Documentation:
> > >> > http://kafka.apache.org/0100/documentation.html
> > >> >
> > >> > * Protocol:
> > >> > http://kafka.apache.org/0100/protocol.html
> > >> >
> > >> > * Successful Jenkins builds for the 0.10.0 branch:
> > >> > Unit/integration tests: *https://builds.apache.org/job
> > >> /kafka-0.10.0-jdk7/182/
> > >> > *
> > >> > System tests: *https://jenkins.confluent.io/
> > >> job/system-test-kafka-0.10.0/138/
> > >> > *
> > >> >
> > >> > Thanks,
> > >> > Ismael
> > >>
> > >>
> >
>
>
>
> --
> Grant Henke
> Software Engineer | Cloudera
> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
>



-- 
Thanks,
Neha


[ANNOUNCE] New committer: Ismael Juma

2016-04-25 Thread Neha Narkhede
The PMC for Apache Kafka has invited Ismael Juma to join as a committer and
we are pleased to announce that he has accepted!

Ismael has contributed 121 commits
 to a wide range of
areas, notably within the security and the network layer. His involvement
has been phenomenal across the board from mailing lists, JIRA, code reviews
and helping us move to GitHub pull requests to contributing features, bug
fixes and code and documentation improvements.

Thank you for your contribution and welcome to Apache Kafka, Ismael!

-- 
Thanks,
Neha


Re: 0.9.0.1 RC1

2016-02-15 Thread Neha Narkhede
+1 (binding).

Verified source and binary artifacts, ran ./gradlew testAll, quick start on
source artifact and Scala 2.11 binary artifact.

On Mon, Feb 15, 2016 at 7:43 PM, Ewen Cheslack-Postava 
wrote:

> Yeah, I saw
>
> kafka.network.SocketServerTest > tooBigRequestIsRejected FAILED
> java.net.SocketException: Broken pipe
> at java.net.SocketOutputStream.socketWrite0(Native Method)
> at
> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
> at java.net.SocketOutputStream.write(SocketOutputStream.java:138)
> at java.io.DataOutputStream.writeShort(DataOutputStream.java:168)
> at
> kafka.network.SocketServerTest.sendRequest(SocketServerTest.scala:62)
> at
>
> kafka.network.SocketServerTest.tooBigRequestIsRejected(SocketServerTest.scala:132)
>
> from the source artifacts one time.
>
> For reference, quick check of Confluent's kafka-trunk build
> http://jenkins.confluent.io/job/kafka-trunk/ doesn't show these specific
> transient errors (but it's also, obviously, a different branch). However,
> it does show other unrelated transient test errors.
>
> I've run the tests on trunk probably a dozen times in the past week for
> various PRs and not seen these test failures. The fact that I saw one the
> first time I ran tests on 0.9.0 has me a bit worried, though a couple of
> more test runs didn't have the same result.
>
> Also, for this specific test, I reopened
> https://issues.apache.org/jira/browse/KAFKA-2398 in August and then
> haven't
> seen it much since and we released the last version with that bug open...
>
> I guess I'm a wary +1 since we have a system test run passing, I've only
> seen this once, and it seems to be an existing transient test issue that
> had no impact in practice.
>
> -Ewen
>
>
> On Mon, Feb 15, 2016 at 8:39 PM, Ismael Juma  wrote:
>
> > +1 (non-binding).
> >
> > Verified source and binary artifacts, ran ./gradlew testAll with JDK
> 7u80,
> > quick start on source artifact and Scala 2.11 binary artifact.
> >
> > Ismael
> >
> > On Fri, Feb 12, 2016 at 2:55 AM, Jun Rao  wrote:
> >
> > > This is the first candidate for release of Apache Kafka 0.9.0.1. This a
> > bug
> > > fix release that fixes 70 issues.
> > >
> > > Release Notes for the 0.9.0.1 release
> > >
> >
> https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/RELEASE_NOTES.html
> > >
> > > *** Please download, test and vote by Tuesday, Feb. 16, 7pm PT
> > >
> > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > http://kafka.apache.org/KEYS in addition to the md5, sha1
> > > and sha2 (SHA256) checksum.
> > >
> > > * Release artifacts to be voted upon (source and binary):
> > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/
> > >
> > > * Maven artifacts to be voted upon prior to release:
> > > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> > >
> > > * scala-doc
> > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/scaladoc/
> > >
> > > * java-doc
> > > https://home.apache.org/~junrao/kafka-0.9.0.1-candidate1/javadoc/
> > >
> > > * The tag to be voted upon (off the 0.9.0 branch) is the 0.9.0.1 tag
> > >
> > >
> >
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2c17685a45efe665bf5f24c0296cb8f9e1157e89
> > >
> > > * Documentation
> > > http://kafka.apache.org/090/documentation.html
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> >
>
>
>
> --
> Thanks,
> Ewen
>



-- 
Thanks,
Neha


Re: 0.9.0.0 RC4

2015-11-23 Thread Neha Narkhede
+1 (binding).

Verified source and binary artifacts, ran unit tests.

On Mon, Nov 23, 2015 at 9:32 AM, Jun Rao  wrote:

> I updated the release notes. Since this doesn't affect the release
> artifacts to be voted upon, we don't have to do another RC.
>
> Please vote by 6pm PT today.
>
> Thanks,
>
> Jun
>
> On Mon, Nov 23, 2015 at 8:43 AM, Guozhang Wang  wrote:
>
> > I think we should update the release notes to remove the Kafka Streams
> > tickets, I have marked them as 0.9.1.0.
> >
> > Guozhang
> >
> > On Fri, Nov 20, 2015 at 5:21 PM, Jun Rao  wrote:
> >
> > > This is the fourth candidate for release of Apache Kafka 0.9.0.0. This
> a
> > > major release that includes (1) authentication (through SSL and SASL)
> and
> > > authorization, (2) a new java consumer, (3) a Kafka connect framework
> for
> > > data ingestion and egression, and (4) quotas. Since this is a major
> > > release, we will give people a bit more time for trying this out.
> > >
> > > Release Notes for the 0.9.0.0 release
> > >
> > >
> >
> https://people.apache.org/~junrao/kafka-0.9.0.0-candidate4/RELEASE_NOTES.html
> > >
> > > *** Please download, test and vote by Monday, Nov. 23, 6pm PT
> > >
> > > Kafka's KEYS file containing PGP keys we use to sign the release:
> > > http://kafka.apache.org/KEYS in addition to the md5, sha1
> > > and sha2 (SHA256) checksum.
> > >
> > > * Release artifacts to be voted upon (source and binary):
> > > https://people.apache.org/~junrao/kafka-0.9.0.0-candidate4/
> > >
> > > * Maven artifacts to be voted upon prior to release:
> > > https://repository.apache.org/content/groups/staging/
> > >
> > > * scala-doc
> > > https://people.apache.org/~junrao/kafka-0.9.0.0-candidate4/scaladoc/
> > >
> > > * java-doc
> > > https://people.apache.org/~junrao/kafka-0.9.0.0-candidate4/javadoc/
> > >
> > > * The tag to be voted upon (off the 0.9.0 branch) is the 0.9.0.0 tag
> > >
> > >
> >
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=132943b0f83831132cd46ac961cf6f1c00132565
> > >
> > > * Documentation
> > > http://kafka.apache.org/090/documentation.html
> > >
> > > /***
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
Thanks,
Neha


Re: [DISCUSSION] Kafka 0.8.2.2 release?

2015-08-18 Thread Neha Narkhede
How about looking at the scope for the 0.8.3 release first before we cut
yet another point release off of 0.8.2.2? Each release includes some
overhead and if there is a larger release in the works, it might be worth
working on getting that. My take is that the 2 things the community has
been waiting for is SSL support and the new consumer and we have been
promising to get 0.8.3 with both those features for several months now.

Looking at the progress on both, it seems we are very close to getting both
those checked in and it looks like we should get there in another 5-6
weeks. Furthermore, both of these features are large and I anticipate us
receiving feedback and bugs that will require a couple of point releases on
top of 0.8.3 anyway. One possibility is to work on 0.8.3 together now and
get the community to use the newly released features, gather feedback and
do point releases incorporating that feedback and iterate on it.

We could absolutely do both 0.8.2.2 and 0.8.3. What I'd ask for is for us
to look at the 0.8.3 timeline too and make a call whether 0.8.2.2 still
makes sense.

Thanks,
Neha

On Tue, Aug 18, 2015 at 10:24 AM, Gwen Shapira g...@confluent.io wrote:

 Thanks Jun.

 I updated the list with your suggestions.
 If anyone feels we are missing a critical patch for 0.8.2.2, please speak
 up.

 Gwen

 On Mon, Aug 17, 2015 at 5:40 PM, Jun Rao j...@confluent.io wrote:

  Hi, Grant,
 
  I took a look at that list. None of those is really critical as you said.
  So, I'd suggest that we not include those to minimize the scope of the
  release.
 
  Thanks,
 
  Jun
 
  On Mon, Aug 17, 2015 at 5:16 PM, Grant Henke ghe...@cloudera.com
 wrote:
 
   Thanks Gwen.
  
   I updated a few small things on the wiki page.
  
   Below is a list of jiras I think could also be marked as included. All
 of
   these, though not super critical, seem like fairly small and low risk
   changes that help avoid potentially confusing issues or errors for
 users.
  
   KAFKA-2012
   KAFKA-972
   KAFKA-2337  KAFKA-2393
   KAFKA-1867
   KAFKA-2407
   KAFKA-2234
   KAFKA-1866
   KAFKA-2345  KAFKA-2355
  
   thoughts?
  
   Thank you,
   Grant
  
   On Mon, Aug 17, 2015 at 4:56 PM, Gwen Shapira g...@confluent.io
 wrote:
  
Thanks for creating a list, Grant!
   
I placed it on the wiki with a quick evaluation of the content and
   whether
it should be in 0.8.2.2:
   
   
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/Proposed+patches+for+0.8.2.2
   
I'm attempting to only cherrypick fixes that are both important for
  large
number of users (or very critical to some users) and very safe
 (mostly
judged by the size of the change, but not only)
   
If your favorite bugfix is missing from the list, or is there but
  marked
No, please let us know (in this thread) what we are missing and why
  it
   is
both important and safe.
Also, if I accidentally included something you consider unsafe, speak
  up!
   
Gwen
   
On Mon, Aug 17, 2015 at 8:17 AM, Grant Henke ghe...@cloudera.com
   wrote:
   
 +dev

 Adding dev list back in. Somehow it got dropped.


 On Mon, Aug 17, 2015 at 10:16 AM, Grant Henke ghe...@cloudera.com
 
wrote:

  Below is a list of candidate bug fix jiras marked fixed for
 0.8.3.
  I
 don't
  suspect all of these will (or should) make it into the release
 but
   this
  should be a relatively complete list to work from:
 
 - KAFKA-2114 
 https://issues.apache.org/jira/browse/KAFKA-2114
  :
 Unable
 to change min.insync.replicas default
 - KAFKA-1702 
 https://issues.apache.org/jira/browse/KAFKA-1702
  :
 Messages silently Lost by producer
 - KAFKA-2012 
 https://issues.apache.org/jira/browse/KAFKA-2012
  :
 Broker should automatically handle corrupt index files
 - KAFKA-2406 
 https://issues.apache.org/jira/browse/KAFKA-2406
  :
ISR
 propagation should be throttled to avoid overwhelming
  controller.
 - KAFKA-2336 
 https://issues.apache.org/jira/browse/KAFKA-2336
  :
 Changing offsets.topic.num.partitions after the offset topic
 is
 created
 breaks consumer group partition assignment
 - KAFKA-2337 
 https://issues.apache.org/jira/browse/KAFKA-2337
  :
 Verify
 that metric names will not collide when creating new topics
 - KAFKA-2393 
 https://issues.apache.org/jira/browse/KAFKA-2393
  :
 Correctly Handle InvalidTopicException in
KafkaApis.getTopicMetadata()
 - KAFKA-2189 
 https://issues.apache.org/jira/browse/KAFKA-2189
  :
 Snappy
 compression of message batches less efficient in 0.8.2.1
 - KAFKA-2308 
 https://issues.apache.org/jira/browse/KAFKA-2308
  :
New
 producer + Snappy face un-compression errors after broker
  restart
 - KAFKA-2042 
 https://issues.apache.org/jira/browse/KAFKA-2042
  :
New
 producer metadata 

Help Us Nominate Apache Kafka for a 2015 Bossie (Best of OSS) Award - Due June 30th

2015-06-26 Thread Neha Narkhede
Hello Kafka community members,

We appreciate your use and support of Kafka and all the feedback you’ve
provided to us along the way.  If all is still going well with Kafka and
you’re realizing great value from it, we’d like your support in nominating
Kafka for a 2015 InfoWorld Bossie award, which is an annual award where
InfoWorld honors the best of open source software.


As a reminder, Kafka
http://www.infoworld.com/article/2688074/big-data/big-data-164727-bossie-awards-2014-the-best-open-source-big-data-tools.html#slide17
was
selected as one of InfoWorld's top picks in distributed data processing,
data analytics, machine learning, NoSQL databases, and the Hadoop
ecosystem. A technology can win consecutive years, so there's nothing
stopping Kafka from making the list again.

Nominations for this award are very simple and require you to simply
deliver an email to InfoWorld's executive editor Doug Dineley (
doug_dine...@infoworld.com) with the following information:

   -

   The name of your software, or your use case
   -

   A link to Kafka's website: http://kafka.apache.org/
   -

   A few sentences on how you or your customers are using the software and
   why it is important and award-worthy.

Submissions must be sent to Doug doug_dine...@infoworld.com by June 30,
2015. Please let us know if you have any questions or if we can help in any
way.


Thank you for being part of the Kafka community!


-- 
Best,
Neha


Re: [ANNOUNCEMENT] Apache Kafka 0.8.2.1 Released

2015-03-12 Thread Neha Narkhede
Thanks for driving this Jun and everyone for the contributions!

On Wed, Mar 11, 2015 at 12:01 PM, Jun Rao jun...@apache.org wrote:

 The Apache Kafka community is pleased to announce the release for Apache
 Kafka 0.8.2.1.

 The 0.8.2.1 release fixes 4 critical issues in 0.8.2.0.

 All of the changes in this release can be found:
 https://archive.apache.org/dist/kafka/0.8.2.1/RELEASE_NOTES.html

 Apache Kafka is high-throughput, publish-subscribe messaging system
 rethought of as a distributed commit log.

 ** Fast = A single Kafka broker can handle hundreds of megabytes of reads
 and
 writes per second from thousands of clients.

 ** Scalable = Kafka is designed to allow a single cluster to serve as the
 central data backbone
 for a large organization. It can be elastically and transparently expanded
 without downtime.
 Data streams are partitioned and spread over a cluster of machines to
 allow data streams
 larger than the capability of any single machine and to allow clusters of
 co-ordinated consumers.

 ** Durable = Messages are persisted on disk and replicated within the
 cluster to prevent
 data loss. Each broker can handle terabytes of messages without
 performance impact.

 ** Distributed by Design = Kafka has a modern cluster-centric design that
 offers
 strong durability and fault-tolerance guarantees.

 You can download the release from: http://kafka.apache.org/downloads.html

 We welcome your help and feedback. For more information on how to
 report problems, and to get involved, visit the project website at
 http://kafka.apache.org/

 Thanks,

 Jun




-- 
Thanks,
Neha


Re: [kafka-clients] Re: [VOTE] 0.8.2.1 Candidate 2

2015-03-04 Thread Neha Narkhede
+1. Verified quick start, unit tests.

On Tue, Mar 3, 2015 at 12:09 PM, Joe Stein joe.st...@stealth.ly wrote:

 Ok, lets fix the transient test failure on trunk agreed not a blocker.

 +1 quick start passed, verified artifacts, updates in scala
 https://github.com/stealthly/scala-kafka/tree/0.8.2.1 and go
 https://github.com/stealthly/go_kafka_client/tree/0.8.2.1 look good

 ~ Joe Stein
 - - - - - - - - - - - - - - - - -

   http://www.stealth.ly
 - - - - - - - - - - - - - - - - -

 On Tue, Mar 3, 2015 at 12:30 PM, Jun Rao j...@confluent.io wrote:

  Hi, Joe,
 
  Yes, that unit test does have transient failures from time to time. The
  issue seems to be with the unit test itself and not the actual code. So,
  this is not a blocker for 0.8.2.1 release. I think we can just fix it in
  trunk.
 
  Thanks,
 
  Jun
 
  On Tue, Mar 3, 2015 at 9:08 AM, Joe Stein joe.st...@stealth.ly wrote:
 
  Jun, I have most everything looks good except I keep getting test
  failures from wget
 
 https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/kafka-0.8.2.1-src.tgz
   tar -xvf kafka-0.8.2.1-src.tgz  cd kafka-0.8.2.1-src  gradle 
  ./gradlew test
 
  kafka.api.ProducerFailureHandlingTest 
  testNotEnoughReplicasAfterBrokerShutdown FAILED
  org.scalatest.junit.JUnitTestFailedError: Expected
  NotEnoughReplicasException when producing to topic with fewer brokers
 than
  min.insync.replicas
  at
 
 org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:101)
  at
 
 org.scalatest.junit.JUnit3Suite.newAssertionFailedException(JUnit3Suite.scala:149)
  at org.scalatest.Assertions$class.fail(Assertions.scala:711)
  at org.scalatest.junit.JUnit3Suite.fail(JUnit3Suite.scala:149)
  at
 
 kafka.api.ProducerFailureHandlingTest.testNotEnoughReplicasAfterBrokerShutdown(ProducerFailureHandlingTest.scala:355)
 
  This happens to me all the time on a few different machines.
 
  ~ Joe Stein
  - - - - - - - - - - - - - - - - -
 
http://www.stealth.ly
  - - - - - - - - - - - - - - - - -
 
  On Mon, Mar 2, 2015 at 7:36 PM, Jun Rao j...@confluent.io wrote:
 
  +1 from me. Verified quickstart and unit tests.
 
  Thanks,
 
  Jun
 
  On Thu, Feb 26, 2015 at 2:59 PM, Jun Rao j...@confluent.io wrote:
 
  This is the second candidate for release of Apache Kafka 0.8.2.1. This
  fixes 4 critical issue in 0.8.2.0.
 
  Release Notes for the 0.8.2.1 release
 
 
 https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/RELEASE_NOTES.html
 
  *** Please download, test and vote by Monday, Mar 2, 3pm PT
 
  Kafka's KEYS file containing PGP keys we use to sign the release:
  http://kafka.apache.org/KEYS in addition to the md5, sha1
  and sha2 (SHA256) checksum.
 
  * Release artifacts to be voted upon (source and binary):
  https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/
 
  * Maven artifacts to be voted upon prior to release:
  https://repository.apache.org/content/groups/staging/
 
  * scala-doc
  https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/scaladoc/
 
  * java-doc
  https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/javadoc/
 
  * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.1 tag
 
 
 https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=bd1bfb63ec73c10d08432ac893a23f28281ea021
  (git commit ee1267b127f3081db491fa1bf9a287084c324e36)
 
  /***
 
  Thanks,
 
  Jun
 
 
   --
  You received this message because you are subscribed to the Google
  Groups kafka-clients group.
  To unsubscribe from this group and stop receiving emails from it, send
  an email to kafka-clients+unsubscr...@googlegroups.com.
  To post to this group, send email to kafka-clie...@googlegroups.com.
  Visit this group at http://groups.google.com/group/kafka-clients.
  To view this discussion on the web visit
 
 https://groups.google.com/d/msgid/kafka-clients/CAFc58G_5FuLKx3_kM4PCgqQL8d%2B4sqE0o-%2BXfu3FJicAgn5KPw%40mail.gmail.com
  
 https://groups.google.com/d/msgid/kafka-clients/CAFc58G_5FuLKx3_kM4PCgqQL8d%2B4sqE0o-%2BXfu3FJicAgn5KPw%40mail.gmail.com?utm_medium=emailutm_source=footer
 
  .
 
  For more options, visit https://groups.google.com/d/optout.
 
 
 
 




-- 
Thanks,
Neha


Re: Kafka Poll: Version You Use?

2015-03-04 Thread Neha Narkhede
Thanks for running the poll and sharing the results!

On Wed, Mar 4, 2015 at 8:34 PM, Otis Gospodnetic otis.gospodne...@gmail.com
 wrote:

 Hi,

 You can see the number of voters in the poll itself (view poll results link
 in the poll widget).
 Audience details unknown, but the poll was posted on:
 * twitter - https://twitter.com/sematext/status/57050147435776
 * LinkedIn - a few groups - Kafka, DevOps, and I think another larger one
 * this mailing list

 Otis
 --
 Monitoring * Alerting * Anomaly Detection * Centralized Log Management
 Solr  Elasticsearch Support * http://sematext.com/


 On Wed, Mar 4, 2015 at 11:24 PM, Christian Csar christ...@csar.us wrote:

  Do you have a anything on the number of voters, or audience breakdown?
 
  Christian
 
  On Wed, Mar 4, 2015 at 8:08 PM, Otis Gospodnetic 
  otis.gospodne...@gmail.com
   wrote:
 
   Hello hello,
  
   Results of the poll are here!
   Any guesses before looking?
   What % of Kafka users are on 0.8.2.x already?
   What % of people are still on 0.7.x?
  
  
  
 
 http://blog.sematext.com/2015/03/04/poll-results-kafka-version-distribution/
  
   Otis
   --
   Monitoring * Alerting * Anomaly Detection * Centralized Log Management
   Solr  Elasticsearch Support * http://sematext.com/
  
  
   On Thu, Feb 26, 2015 at 3:32 PM, Otis Gospodnetic 
   otis.gospodne...@gmail.com wrote:
  
Hi,
   
With 0.8.2 out I thought it might be useful for everyone to see which
version(s) of Kafka people are using.
   
Here's a quick poll:
http://blog.sematext.com/2015/02/23/kafka-poll-version-you-use/
   
We'll publish the results next week.
   
Thanks,
Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log
 Management
Solr  Elasticsearch Support * http://sematext.com/
   
   
  
 




-- 
Thanks,
Neha


Re: Trying to get kafka data to Hadoop

2015-03-04 Thread Neha Narkhede
Thanks Jagat for the callout!

Confluent Platform 1.0 http://confluent.io/product/ includes Camus and we
were happy to address any questions in our community mailing list
confluent-platf...@googlegroups.com.



On Wed, Mar 4, 2015 at 8:41 PM, max square max2subscr...@gmail.com wrote:

 Thunder,

 thanks for your reply. The hadoop job is now correctly configured (the
 client was not getting the correct jars), however I am getting Avro
 formatting exceptions due to the format the schema-repo server follows. I
 think I will do something similar and create our own branch that uses the
 schema repo. Any gotchas you can advice on?

 Thanks!

 Max

 On Wed, Mar 4, 2015 at 9:24 PM, Thunder Stumpges tstump...@ntent.com
 wrote:

  What branch of camus are you using? We have our own fork that we updated
  the camus dependency from the avro snapshot of the REST Schema Repository
  to the new official one you mention in github.com/schema-repo. I was
  not aware of a branch on the main linked-in camus repo that has this.
 
  That being said, we are doing essentially this same thing however we are
  using a single shaded uber-jar. I believe the maven project builds this
  automatically doesnt it?
 
  I'll take a look at the details of how we are invoking this on our site
  and get back to you.
 
  Cheers,
  Thunder
 
 
  -Original Message-
  From: max square [max2subscr...@gmail.com]
  Received: Wednesday, 04 Mar 2015, 5:38PM
  To: users@kafka.apache.org [users@kafka.apache.org]
  Subject: Trying to get kafka data to Hadoop
 
  Hi all,
 
  I have browsed through different conversations around Camus, and bring
 this
  as a kinda Kafka question. I know is not the most orthodox, but if
 someone
  has some thoughts I'd appreciate ir.
 
  That said, I am trying to set up Camus, using a 3 node Kafka cluster
  0.8.2.1, using a project that is trying to build Avro Schema-Repo
  https://github.com/schema-repo/schema-repo. All of the Avro schemas
 for
  the topics are published correctly. I am building Camus and using:
 
  hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar
 com.linkedin.camus.etl.
  kafka.CamusJob -libjars $CAMUS_LIBJARS  -D mapreduce.job.user.classpath.
  first=true -P config.properties
 
  As the command to start the job, where I have set up an environment
  variable that holds all the libjars that the mvn package command
 generates.
 
  I have also set the following properties to configure the job:
  camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.
  LatestSchemaKafkaAvroMessageDecoder
 
 
 kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.
  AvroRestSchemaRegistry
 
  etl.schema.registry.url=http://10.0.14.25:2876/schema-repo/
 
  When I execute the job I get an Exception indicating the
  AvroRestSchemaRegistry class can't be found (I've double checked it's
 part
  of the libjars). I wanted to ask if this is the correct way to set up
 this
  integration, and if anyone has pointers on why the job is not finding the
  class AvroRestSchemaRegistry
 
  Thanks in advance for the help!
 
  Max
 
  Follows the complete stack trace:
 
  [CamusJob] - failed to create decoder
 
  com.linkedin.camus.coders.MessageDecoderException:
  com.linkedin.camus.coders
  .MessageDecoderException:java.lang.ClassNotFoundException: com.linkedin.
  camus.schemaregistry.AvroRestSchemaRegistry
 at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.
  createMessageDecoder(MessageDecoderFactory.java:29)
 
at
  com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.createMessageDecoder
  (EtlInputFormat.java:391)
 
 at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.getSplits(
  EtlInputFormat.java:256)
 
 at
 org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:
  1107)
 
 at
  org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1124
  )
 
   at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:178)
 
 at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:1023)
 
 at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:976)
 
 at java.security.AccessController.doPrivileged(Native Method)
 
 at javax.security.auth.Subject.doAs(Subject.java:415)
 
 at org.apache.hadoop.security.UserGroupInformation.doAs(
  UserGroupInformation.java:1642)
 
 at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.
  java:976)
 
 at org.apache.hadoop.mapreduce.Job.submit(Job.java:582)
 
 at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:335)
 
 at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:563)
 
 at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
 
 at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
 
 at com.linkedin.camus.etl.kafka.CamusJob.main(CamusJob.java:518)
 
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 
 at
  

Announcing the Confluent Platform built on Apache Kafka

2015-02-25 Thread Neha Narkhede
Folks,

We, at Confluent http://confluent.io, are excited to announce the release
of Confluent Platform 1.0 built around Apache Kafka -
http://blog.confluent.io/2015/02/25/announcing-the-confluent-platform-1-0/

We also published a detailed two-part guide on how you can put Kafka to use
in your organization -
http://blog.confluent.io/2015/02/25/stream-data-platform-1/

And, there is a public mailing list where we would love to hear your
feedback: confluent-platf...@googlegroups.com

Thanks,
Neha


Re: About Symantec's encryption-thru-Kafka proof of concept

2015-02-20 Thread Neha Narkhede
Great. Thanks for sharing!

On Thu, Feb 19, 2015 at 8:51 PM, Jim Hoagland jim_hoagl...@symantec.com
wrote:

 Hi Folks,

 At the recent Kafka Meetup in Mountain View there was interest expressed
 about the encryption through Kafka proof of concept that Symantec did a
 few months ago, so I have created a blog post with some details about it.
 You can find that here:
   http://goo.gl/sjYGWN

 Let me know if you have any thoughts or questions.

 Thanks,

   Jim

 --
 Jim Hoagland, Ph.D.
 Sr. Principal Software Engineer
 Big Data Analytics Team
 Cloud Platform Engineering
 Symantec Corporation
 http://cpe.symantec.com




-- 
Thanks,
Neha


Re: Kafka Leader Rebalance

2015-02-17 Thread Neha Narkhede
Are you are on 0.8.1.1 or higher and are you using the kafka-topics tool to
see which leaders are on which brokers?
Did you also wait for more than 15 seconds to see if the leader election
takes place?
Are there any errors in the controller log?

On Sat, Feb 14, 2015 at 9:35 AM, nitin sharma kumarsharma.ni...@gmail.com
wrote:

 Hi,

 Anyone who knows how to solve my problem, please respond..

 Regards,
 Nitin Kumar Sharma.


 On Thu, Feb 12, 2015 at 3:50 PM, nitin sharma kumarsharma.ni...@gmail.com
 
 wrote:

  Hi Team,
 
  i need some help in solving my current issue related to
  kafka-leadership-rebalance
 
  I have 2 brokers..  i have deployed 2 topics with 2 partition and 2
  replica each in following format.. I made use of kafka-reassignment.sh
 for
  same
 
  Topic   partition   Leader Follower
  Topic#1 P#0 Broker#1Broker#2
  Topic#1 P#1 Broker#2Broker#1
 
  Topic#2 P#0Broker#1 Broker#2
  Topic#2 P#1 Broker#2Broker#1
 
 
  By problem is when a particular Broker server restarts, the complete
  arrangement goes for toss. All the partitions moves to one server.
 
  I have tried putting following properties but nothing has worked:
  auto.leader.rebalance.enable=true
  leader.imbalance.check.interval.seconds=15
  leader.imbalance.per.broker.percenatage=1
 
 
  Regards,
  Nitin Kumar Sharma.
 
 




-- 
Thanks,
Neha


Re: [kafka-clients] Re: [VOTE] 0.8.2.0 Candidate 3

2015-02-02 Thread Neha Narkhede
Great! Thanks Jun for helping with the release and everyone involved for
your contributions.

On Mon, Feb 2, 2015 at 1:32 PM, Joe Stein joe.st...@stealth.ly wrote:

 Huzzah!

 Thanks Jun for preparing the release candidates and getting this out to the
 community.

 - Joe Stein

 On Mon, Feb 2, 2015 at 2:27 PM, Jun Rao j...@confluent.io wrote:

  The following are the results of the votes.
 
  +1 binding = 3 votes
  +1 non-binding = 1 votes
  -1 = 0 votes
  0 = 0 votes
 
  The vote passes.
 
  I will release artifacts to maven central, update the dist svn and
 download
  site. Will send out an announce after that.
 
  Thanks everyone that contributed to the work in 0.8.2.0!
 
  Jun
 
  On Wed, Jan 28, 2015 at 9:22 PM, Jun Rao j...@confluent.io wrote:
 
  This is the third candidate for release of Apache Kafka 0.8.2.0.
 
  Release Notes for the 0.8.2.0 release
 
 
 https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/RELEASE_NOTES.html
 
  *** Please download, test and vote by Saturday, Jan 31, 11:30pm PT
 
  Kafka's KEYS file containing PGP keys we use to sign the release:
  http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2
  (SHA256) checksum.
 
  * Release artifacts to be voted upon (source and binary):
  https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/
 
  * Maven artifacts to be voted upon prior to release:
  https://repository.apache.org/content/groups/staging/
 
  * scala-doc
  https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/scaladoc/
 
  * java-doc
  https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/javadoc/
 
  * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
 
 
 https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=223ac42a7a2a0dab378cc411f4938a9cea1eb7ea
  (commit 7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4)
 
  /***
 
  Thanks,
 
  Jun
 
 
   --
  You received this message because you are subscribed to the Google Groups
  kafka-clients group.
  To unsubscribe from this group and stop receiving emails from it, send an
  email to kafka-clients+unsubscr...@googlegroups.com.
  To post to this group, send email to kafka-clie...@googlegroups.com.
  Visit this group at http://groups.google.com/group/kafka-clients.
  To view this discussion on the web visit
 
 https://groups.google.com/d/msgid/kafka-clients/CAFc58G-XRpw9ik35%2BCmsYm-uc2hjHet6fOpw4bF90ka9Z%3DhH%3Dw%40mail.gmail.com
  
 https://groups.google.com/d/msgid/kafka-clients/CAFc58G-XRpw9ik35%2BCmsYm-uc2hjHet6fOpw4bF90ka9Z%3DhH%3Dw%40mail.gmail.com?utm_medium=emailutm_source=footer
 
  .
 
  For more options, visit https://groups.google.com/d/optout.
 




-- 
Thanks,
Neha


Re: [VOTE] 0.8.2.0 Candidate 3

2015-02-01 Thread Neha Narkhede
+1 (binding). Verified quickstart and unit tests ran ok.

On Sun, Feb 1, 2015 at 9:11 AM, Steven Wu stevenz...@gmail.com wrote:

 In Netflix, we have been using route53 DNS name as bootstrap servers in AWS
 env. Basically, when a kafka broker start, we add it to route53 DNS name
 for the cluster. this is like the VIP that Jay suggested.

 But we are also moving toward to use Eureka service registry for
 bootstrapping. We are worried that if DNS name happens to resolve to a bad
 broker. it might impact the bootstrap process/resiliency. We want to get a
 list of brokers from Eureka to pass in as bootstrap.servers.



 On Sun, Feb 1, 2015 at 5:30 AM, Jay Kreps jay.kr...@gmail.com wrote:

  You may already know this but the producer doesn't require a complete
 list
  of brokers in its config, it just requires the connection info for one
  active broker which it uses to discover the rest of the brokers. We allow
  you to specify multiple urls here for failover in cases where you aren't
  using a vip. So if you can put three brokers into the VIP for metadata
  bootstrapping you can still scale up and down the rest of the cluster.
 
  -Jay
 
  On Sun, Feb 1, 2015 at 12:17 AM, Alex The Rocker alex.m3...@gmail.com
  wrote:
 
   Jun:
  
   You raise a very good question: let me explain why we use
   Broker.getConnectionString(), so may be we'll get a supported way to
   answer our need.
  
   We use Broker.getConnectionString() because we deploy Kafka services
   in Amazon EC2 with the following architecture:
   * Three VMs dedicated to Zookeeper processes
   * At least two VMs with Kafka broker, but depending on load it can be
   scaled to more broker VMs. Brokers self-register their address in
   Zookeeper by serializing Broker objects in Zk.
  
   The VMs with Zookeeper have Elastic IPs = stable public IPs,
  
   These public IPs are fed to the  various Application services which
   rely on Kafka to stream their logs  monitoring data to our central
   Hadoop system.
  
   Using zkclient and the above mentionned public zookeeper IPs, we get
   the list of brokers registrered to a given Kafka service:  this is
   where we unserializer Broker objects and then use
   getConnectionString() to discover the brokers' addresses. Then,
   brokers addresses are used to initialize the Kafka producer(s).
  
   The whole trick is that we cannot use Elastic IP (=stable IPs) for
   Kafka VMs, because of their 'elastic nature : we want to be able to
   scale up / down the number of VMs with Kafka brokers.
  
   Now, we understand that using non public Kafka API is bad : we've been
   broken when moving to 0.8.1.1, then again when moving to 0.8.2.0...
  
   So it's time to raise the right question: what would be the supported
   way to configure our producers given our dynamic-IP-for-brokers
   context?
  
   Thanks,
   Alex.
  
   2015-02-01 8:55 GMT+01:00 VERMEERBERGEN Alexandre
   alexandre.vermeerber...@3ds.com:
   
-Original Message-
From: Jun Rao [mailto:j...@confluent.io]
Sent: Sunday, February 01, 2015 3:03
To: users@kafka.apache.org; kafka-clie...@googlegroups.com
Cc: d...@kafka.apache.org
Subject: Re: [VOTE] 0.8.2.0 Candidate 3
   
Hi, Alex,
   
Thanks for testing RC3.
   
Broker.connectionString() is actually not part of the public api for
  the
   producer. Is there a particular reason that you need to use this api?
   
Thanks,
   
Jun
   
On Sat, Jan 31, 2015 at 1:53 PM, Alex The Rocker 
 alex.m3...@gmail.com
  
wrote:
   
Hello,
   
I have read Broker.scala source code, and I found the answer:
 - With Kafka 0.8.1.1 we used Broker.getConnectionString() in our
 Java
code.
 - With Kafka 0.8.2.0, this method has been replaced by a 0-arity
method without the get prefix, so we have to change our Java code
 to
call
Broker.connectionString()
   
So despite binary compatibility is broken, we have a by-pass.
I hope this will help other people relying on this API...
   
and I'm going to continue tests with 0.8.2 rc3..
   
Alex
   
2015-01-31 21:23 GMT+01:00 Alex The Rocker alex.m3...@gmail.com:
   
 Hello,

 I ran my own tests made with kafka_2.10-0.8.1.1.tgz binaries with
 our
 application:

 1st test:
 ==
   replace all kafka .jar files in our application on consumming
 side
   (without recompiling anything)
   = tests passed, OK

 2nd test:
 ===
   replace all kafka .jar files in our application on producubg
 side
   (without recompiling anything)
   = KO, we get this error:

 2015-01-31 20:54:00,094 [Timer-2] ERROR c.d.i.t.StdOutErrRedirect
 -
 Exception in thread Timer-2
 2015-01-31 20:54:00,111 [Timer-2] ERROR c.d.i.t.StdOutErrRedirect
 -
 java.lang.NoSuchMethodError:
 kafka.cluster.Broker.getConnectionString()Ljava/lang/String;

 Which means that binary compatibility with 0.8.1.1 version has
 been

Re: [kafka-clients] Re: [VOTE] 0.8.2.0 Candidate 2 (with the correct links)

2015-01-25 Thread Neha Narkhede
+1 (binding)
Verified keys, quick start, unit tests.

On Sat, Jan 24, 2015 at 4:26 PM, Joe Stein joe.st...@stealth.ly wrote:

 That makes sense, thanks!

 On Sat, Jan 24, 2015 at 7:00 PM, Jay Kreps jay.kr...@gmail.com wrote:

  But I think the flaw in trying to guess what kind of serializer they will
  use is when we get it wrong. Basically let's say we guess String. Say
 30%
  of the time we will be right and we will save the two configuration
 lines.
  70% of the time we will be wrong and the user gets a super cryptic
  ClassCastException: xyz cannot be cast to [B (because [B is how java
  chooses to display the byte array class just to up the pain), then they
  figure out how to subscribe to our mailing list and email us the cryptic
  exception, then we explain about how we helpfully set these properties
 for
  them to save them time. :-)
 
  https://www.google.com/?gws_rd=ssl#q=kafka+classcastexception+%22%5BB%22
 
  I think basically we did this experiment with the old clients and the
  conclusion is that serialization is something you basically have to think
  about to use Kafka and trying to guess just makes things worse.
 
  -Jay
 
  On Sat, Jan 24, 2015 at 2:51 PM, Joe Stein joe.st...@stealth.ly wrote:
 
  Maybe. I think the StringSerialzer could look more like a typical type
 of
  message.  Instead of encoding being a property it would be more
 typically
  just written in the bytes.
 
  On Sat, Jan 24, 2015 at 12:12 AM, Jay Kreps jay.kr...@gmail.com
 wrote:
 
   I don't think so--see if you buy my explanation. We previously
 defaulted
   to the byte array serializer and it was a source of unending
 frustration
   and confusion. Since it wasn't a required config people just went
 along
   plugging in whatever objects they had, and thinking that changing the
   parametric types would somehow help. Then they would get a class case
   exception and assume our stuff was somehow busted, not realizing we
 had
   helpfully configured a type different from what they were passing in
  under
   the covers. So I think it is actually good for people to think: how
 am I
   serializing my data, and getting that exception will make them ask
 that
   question right?
  
   -Jay
  
   On Fri, Jan 23, 2015 at 9:06 PM, Joe Stein joe.st...@stealth.ly
  wrote:
  
   Should value.serializer in the new java producer be defaulted to
   Array[Byte] ?
  
   I was working on testing some upgrade paths and got this
  
   ! return exception in callback when buffer cannot accept message
  
 ConfigException: Missing required configuration
  value.serializer
   which has no default value. (ConfigDef.java:124)
  
 org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
  
  
  
  
 
 org.apache.kafka.common.config.AbstractConfig.init(AbstractConfig.java:48)
  
  
  
  
 
 org.apache.kafka.clients.producer.ProducerConfig.init(ProducerConfig.java:235)
  
  
  
  
 
 org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:129)
  
  
  
  
 
 ly.stealth.testing.BaseSpec$class.createNewKafkaProducer(BaseSpec.scala:42)
  
  
   ly.stealth.testing.KafkaSpec.createNewKafkaProducer(KafkaSpec.scala:36)
  
  
  
  
 
 ly.stealth.testing.KafkaSpec$$anonfun$3$$anonfun$apply$37.apply(KafkaSpec.scala:175)
  
  
  
  
 
 ly.stealth.testing.KafkaSpec$$anonfun$3$$anonfun$apply$37.apply(KafkaSpec.scala:170)
  
  
  
   On Fri, Jan 23, 2015 at 5:55 PM, Jun Rao j...@confluent.io wrote:
  
This is a reminder that the deadline for the vote is this Monday,
 Jan
   26,
7pm PT.
   
Thanks,
   
Jun
   
On Wed, Jan 21, 2015 at 8:28 AM, Jun Rao j...@confluent.io wrote:
   
This is the second candidate for release of Apache Kafka 0.8.2.0.
  There
has been some changes since the 0.8.2 beta release, especially in
  the
   new
java producer api and jmx mbean names. It would be great if people
  can
   test
this out thoroughly.
   
Release Notes for the 0.8.2.0 release
   
   
  
 
 https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/RELEASE_NOTES.html
   
*** Please download, test and vote by Monday, Jan 26h, 7pm PT
   
Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS in addition to the md5, sha1 and
 sha2
(SHA256) checksum.
   
* Release artifacts to be voted upon (source and binary):
https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/
   
* Maven artifacts to be voted upon prior to release:
https://repository.apache.org/content/groups/staging/
   
* scala-doc
   
  https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/scaladoc/
   
* java-doc
   
 https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/javadoc/
   
* The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0
 tag
   
   
  
 
 https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=058d58adef2ab2787e49d8efeefd61bb3d32f99c
(commit 0b312a6b9f0833d38eec434bfff4c647c1814564)
   

Re: Release Date for Kafka 0.8.2

2015-01-06 Thread Neha Narkhede
You can track the blockers here
https://issues.apache.org/jira/browse/KAFKA-1841?jql=project%20%3D%20KAFKA%20AND%20fixVersion%20%3D%200.8.2%20AND%20resolution%20%3D%20Unresolved%20AND%20priority%20%3D%20Blocker%20ORDER%20BY%20key%20DESC.
We are waiting on follow up patches for 2 JIRAs which are under review. We
should be able to release 0.8.2 in a few days.



On Mon, Jan 5, 2015 at 10:26 PM, Otis Gospodnetic 
otis.gospodne...@gmail.com wrote:

 Hi Srividhya,

 See

 http://search-hadoop.com/m/4TaT4B9tys1/subj=Re+Kafka+0+8+2+release+before+Santa+Claus

 Otis
 --
 Monitoring * Alerting * Anomaly Detection * Centralized Log Management
 Solr  Elasticsearch Support * http://sematext.com/


 On Mon, Jan 5, 2015 at 11:55 AM, Srividhya Shanmugam 
 srividhyashanmu...@fico.com wrote:

  Kafka Team,
 
  We are currently using the 0.8.2 beta version with a patch for
 KAFKA-1738.
  Do you have any updates on when 0.8.2 final version will be released?
 
  Thanks,
  Srividhya
 
 
  This email and any files transmitted with it are confidential,
 proprietary
  and intended solely for the individual or entity to whom they are
  addressed. If you have received this email in error please delete it
  immediately.
 




-- 
Thanks,
Neha


Re: NPE in debug logging statement in kafka new producer

2014-12-29 Thread Neha Narkhede

 So, do I need to expect 0.8.2 official release will have generic
 Producer interface?


Yes

On Sun, Dec 28, 2014 at 11:31 PM, Bae, Jae Hyeon metac...@gmail.com wrote:

 I forgot to mention I was building with kafka-clients-0.8.2-beta.

 To reproduce this problem, I cloned kafka github repo and changed the
 source tree to the tag:0.8.2-beta and modified the dependency in my IDE
 from kafka-clients-0.8.2-beta to the source tree with tag:0.8.2-beta but I
 couldn't reproduce it... weird.

 Also, I found out in 0.8.2 branch Producer interface was changed with
 generics. So, do I need to expect 0.8.2 official release will have generic
 Producer interface?

 On Sun, Dec 28, 2014 at 11:57 AM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  This is a bug. Would you mind filing a JIRA? Also feel free to upload a
  patch.
 
  On Sat, Dec 27, 2014 at 7:25 PM, Bae, Jae Hyeon metac...@gmail.com
  wrote:
 
   Hi
  
   While I am testing kafka java producer, I saw the following NPE
  
   SLF4J: Failed toString() invocation on an object of type
   [org.apache.kafka.common.Cluster]
   java.lang.NullPointerException
   at
 org.apache.kafka.common.PartitionInfo.toString(PartitionInfo.java:72)
   at java.lang.String.valueOf(String.java:2854)
   at java.lang.StringBuilder.append(StringBuilder.java:128)
   at java.util.AbstractCollection.toString(AbstractCollection.java:458)
   at java.lang.String.valueOf(String.java:2854)
   at java.lang.StringBuilder.append(StringBuilder.java:128)
   at org.apache.kafka.common.Cluster.toString(Cluster.java:151)
   at
  
  
 
 org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:305)
   at
  
  
 
 org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:277)
   at
  
 org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:231)
   at org.slf4j.helpers.MessageFormatter.format(MessageFormatter.java:152)
   at org.slf4j.impl.Log4jLoggerAdapter.debug(Log4jLoggerAdapter.java:251)
   at
  
  
 
 org.apache.kafka.clients.producer.internals.Metadata.update(Metadata.java:133)
   at
  
  
 
 org.apache.kafka.clients.NetworkClient.handleMetadataResponse(NetworkClient.java:299)
   at
  
  
 
 org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:284)
   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:185)
   at
  org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
   at
  org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
   at java.lang.Thread.run(Thread.java:744)
  
   This looks like not critical because it's debug() statement but this
 NPE
   means potential threat in kafka producer stableness.
  
   Do you have any idea? If you want, I can share my test code.
  
   Thank you
   Best, Jae
  
 




-- 
Thanks,
Neha


Re: Kafka 0.8.2 release - before Santa Claus?

2014-12-29 Thread Neha Narkhede
Went through the list and cleaned it up. Most patches just need a final
review. The only JIRA that has no patch is
https://issues.apache.org/jira/browse/KAFKA-1723.

On Fri, Dec 26, 2014 at 2:18 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Actually, KAFKA-1785 https://issues.apache.org/jira/browse/KAFKA-1785
 can
 also wait - since it is likely to be part of a larger patch.

 On Thu, Dec 25, 2014 at 10:39 AM, Gwen Shapira gshap...@cloudera.com
 wrote:

  IMO:
  KAFKA-1790 - can be pushed out (or even marked as won't fix)
  KAFKA-1782 - can be pushed out (not really a blocker)
 
  The rest look like actual blockers to me.
 
  Gwen
 
  On Tue, Dec 23, 2014 at 1:32 PM, Otis Gospodnetic 
  otis.gospodne...@gmail.com wrote:
 
  Hi,
 
  I see 16 open issues for 0.8.2 at
 
 
 https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20fixVersion%20%3D%200.8.2%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20updated%20DESC%2C%20due%20ASC%2C%20priority%20DESC%2C%20created%20ASC
  - some with patches, some blockers, some blockers without patches.
 
  Are all issues listed as blockers truly blockers for 0.8.2?
 
  Thanks,
  Otis
  --
  Monitoring * Alerting * Anomaly Detection * Centralized Log Management
  Solr  Elasticsearch Support * http://sematext.com/
 
 
  On Mon, Dec 1, 2014 at 8:13 PM, Joe Stein joe.st...@stealth.ly wrote:
 
   If we can have a build ready by Dec 26th I think that is feasible. I
  could
   prepare and post that if we think we have the votes and a stable
  version.
  
   /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop
   /
   On Dec 1, 2014 7:56 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
  
+1 for doing a 0.8.2 final before the December break.
   
On Mon, Dec 1, 2014 at 8:40 AM, Jun Rao jun...@gmail.com wrote:
   
 We are currently discussing a last-minute API change to the new
 java
 producer. We have also accumulated a few more 0.8.2 blockers.



   
  
 
 https://issues.apache.org/jira/browse/KAFKA-1642?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20priority%20%3D%20Blocker%20AND%20fixVersion%20%3D%200.8.2

 So, we likely will have another 0.8.2 release in Dec. However, I
 am
  not
 sure if that's beta2 or final.

 Thanks,

 Jun

 On Wed, Nov 26, 2014 at 12:22 PM, Otis Gospodnetic 
 otis.gospodne...@gmail.com wrote:

  Hi,
 
  People using SPM to monitor Kafka have been anxiously asking us
  about
the
  0.8.2 release and we've been telling them December.  Is that
 still
   the
  plan?
 
  Thanks,
  Otis
  --
  Monitoring * Alerting * Anomaly Detection * Centralized Log
   Management
  Solr  Elasticsearch Support * http://sematext.com/
 

   
  
 
 
 




-- 
Thanks,
Neha


Re: leader and isr were not set when create the topic

2014-12-29 Thread Neha Narkhede
It seems that somehow the follower lost its highwatermark checkpoint file.
Can you share the steps to reproduce this along with the Kafka versions you
are using?

On Mon, Dec 22, 2014 at 4:17 PM, Sa Li sal...@gmail.com wrote:

 I have three nodes: 100, 101, and 102

 When I restart all of them, seems now everything is ok, but I would like to
 paste the error messages I got from server.log from each node, see if you
 can help to understand what is the problem.

 on node 100
 [2014-12-23 00:04:39,401] ERROR [KafkaApi-100] Error when processing fetch
 request for partition [perf_producer_p8_test,7] offset 125000 from follower
 with correlation id 0 (kafka.server.KafkaApis)
 kafka.common.OffsetOutOfRangeException: Request for offset 125000 but we
 only have log segments in the range 0 to 0.
  at kafka.log.Log.read(Log.scala:380)
  at

 kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
  at

 kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)

  at

 kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)

  at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

  at
 scala.collection.immutable.Map$Map3.foreach(Map.scala:154)
  at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at
 scala.collection.AbstractTraversable.map(Traversable.scala:105)
 ..
 ..


 in Node 101 and 102
 [2014-12-23 00:04:39,440] ERROR [ReplicaFetcherThread-0-100], Current
 offset 1 25000 for partition [perf_producer_p8_test,1] out of range; reset
 offset to 0 (kafka.server.ReplicaFetcherThread)
 [2014-12-23 00:04:39,442] INFO Truncating log perf_producer_p8_test-7 to
 offset 0. (kafka.log.Log)
 [2014-12-23 00:04:39,452] WARN [ReplicaFetcherThread-0-100], Replica 102
 for partition [perf_producer_p8_test,7] reset its fetch offset to current
 leader 100's latest offset 0 (kafka.server.ReplicaFetcherThread)






 On Mon, Dec 22, 2014 at 3:55 PM, Sa Li sal...@gmail.com wrote:
 
  Hello, Neha
 
  This is the error from server.log
 
  [2014-12-22 23:53:25,663] WARN [KafkaApi-100] Fetch request with
  correlation id 1227732 from client ReplicaFetcherThread-0-100 on
 partition
  [perf_producer_p8_test,1] failed due to Leader not local for partition
  [perf_producer_p8_test,1] on broker 100 (kafka.server.KafkaApis)
 
 
  On Mon, Dec 22, 2014 at 3:50 PM, Sa Li sal...@gmail.com wrote:
 
  I restart the kafka server, it is the same thing, sometime nothing
 listed
  on ISR, leader, I checked the state-change log
 
  [2014-12-22 23:46:38,164] TRACE Broker 100 cached leader info
 
 (LeaderAndIsrInfo:(Leader:101,ISR:101,102,100,LeaderEpoch:0,ControllerEpoch:4),ReplicationFactor:3),AllReplicas:101,102,100)
  for partition [perf_producer_p8_test,1] in response to UpdateMetadata
  request sent by controller 101 epoch 4 with correlation id 138
  (state.change.logger)
 
 
 
  On Mon, Dec 22, 2014 at 2:46 PM, Sa Li sal...@gmail.com wrote:
 
  Hi, All
 
  I created a topic with 3 replications and 6 partitions, but when I
 check
  this topic, seems there is no leader and isr were set for this topic,
 see
 
  bin/kafka-topics.sh --create --zookeeper 10.100.98.100:2181
  --replication-factor 3 --partitions 6 --topic perf_producer_p6_test
  SLF4J: Class path contains multiple SLF4J bindings.
  SLF4J: Found binding in
 
 [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  SLF4J: Found binding in
 
 [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
  explanation.
  SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
  Created topic perf_producer_p6_test.
 
  root@precise64:/etc/kafka# bin/kafka-topics.sh --describe --zookeeper
  10.100.98.100:2181 --topic perf_producer_p6_test
  SLF4J: Class path contains multiple SLF4J bindings.
  SLF4J: Found binding in
 
 [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  SLF4J: Found binding in
 
 [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
  explanation.
  SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
  Topic:perf_producer_p6_test PartitionCount:6
  ReplicationFactor:3 Configs:
  Topic: perf_producer_p6_testPartition: 0Leader: none
  Replicas: 100,101,102   Isr:

Re: NPE in debug logging statement in kafka new producer

2014-12-28 Thread Neha Narkhede
This is a bug. Would you mind filing a JIRA? Also feel free to upload a
patch.

On Sat, Dec 27, 2014 at 7:25 PM, Bae, Jae Hyeon metac...@gmail.com wrote:

 Hi

 While I am testing kafka java producer, I saw the following NPE

 SLF4J: Failed toString() invocation on an object of type
 [org.apache.kafka.common.Cluster]
 java.lang.NullPointerException
 at org.apache.kafka.common.PartitionInfo.toString(PartitionInfo.java:72)
 at java.lang.String.valueOf(String.java:2854)
 at java.lang.StringBuilder.append(StringBuilder.java:128)
 at java.util.AbstractCollection.toString(AbstractCollection.java:458)
 at java.lang.String.valueOf(String.java:2854)
 at java.lang.StringBuilder.append(StringBuilder.java:128)
 at org.apache.kafka.common.Cluster.toString(Cluster.java:151)
 at

 org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:305)
 at

 org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:277)
 at
 org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:231)
 at org.slf4j.helpers.MessageFormatter.format(MessageFormatter.java:152)
 at org.slf4j.impl.Log4jLoggerAdapter.debug(Log4jLoggerAdapter.java:251)
 at

 org.apache.kafka.clients.producer.internals.Metadata.update(Metadata.java:133)
 at

 org.apache.kafka.clients.NetworkClient.handleMetadataResponse(NetworkClient.java:299)
 at

 org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:284)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:185)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)

 This looks like not critical because it's debug() statement but this NPE
 means potential threat in kafka producer stableness.

 Do you have any idea? If you want, I can share my test code.

 Thank you
 Best, Jae



Re: Kafka consumer session timeouts

2014-12-22 Thread Neha Narkhede
Terry,

The zookeeper client used by the high level Kafka consumer has a separate
thread that does the heartbeat in the background. So even if it takes long
to process the message, it should not make the consumer's session to time
out or make the consumer rebalance. You may be running into long GC pauses
on your consumer that might be causing the session timeouts.

Thanks,
Neha

On Fri, Dec 19, 2014 at 12:51 PM, Terry Cumaranatunge cumar...@gmail.com
wrote:

 Hi
 I would like to get some feedback on design choices with kafka consumers.
 We have an application that a consumer reads a message and the thread does
 a number of things, including database accesses before a message is
 produced to another topic. The time between consuming and producing the
 message on the thread can take several minutes. Once message is produced to
 new topic, a commit is done to indicate we are done with work on the
 consumer queue message. Auto commit is disabled for this reason.

 I'm using the high level consumer and what I'm noticing is that zookeeper
 and kafka sessions timeout because it is taking too long before we do
 anything on consumer queue so kafka ends up rebalancing every time the
 thread goes back to read more from consumer queue and it starts to take a
 long time before a consumer reads a new message after a while.

 I can set zookeeper session timeout very high to not make that a problem
 but then i have to adjust the rebalance parameters accordingly and kafka
 won't pickup a new consumer for a while among other side effects.

 What are my options to solve this problem? Is there a way to heartbeat to
 kafka and zookeeper to keep both happy? Do i still have these same issues
 if i were to use a simple consumer?

 Thanks




-- 
Thanks,
Neha


Re: leader and isr were not set when create the topic

2014-12-22 Thread Neha Narkhede
There is possibly some error in your broker logs. Can you check if you see
any and send it around?

On Mon, Dec 22, 2014 at 2:46 PM, Sa Li sal...@gmail.com wrote:

 Hi, All

 I created a topic with 3 replications and 6 partitions, but when I check
 this topic, seems there is no leader and isr were set for this topic, see

 bin/kafka-topics.sh --create --zookeeper 10.100.98.100:2181
 --replication-factor 3 --partitions 6 --topic perf_producer_p6_test
 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in

 [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in

 [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
 explanation.
 SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
 Created topic perf_producer_p6_test.

 root@precise64:/etc/kafka# bin/kafka-topics.sh --describe --zookeeper
 10.100.98.100:2181 --topic perf_producer_p6_test
 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in

 [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in

 [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
 explanation.
 SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
 Topic:perf_producer_p6_test PartitionCount:6
 ReplicationFactor:3 Configs:
 Topic: perf_producer_p6_testPartition: 0Leader: none
 Replicas: 100,101,102   Isr:
 Topic: perf_producer_p6_testPartition: 1Leader: none
 Replicas: 101,102,100   Isr:
 Topic: perf_producer_p6_testPartition: 2Leader: none
 Replicas: 102,100,101   Isr:
 Topic: perf_producer_p6_testPartition: 3Leader: none
 Replicas: 100,102,101   Isr:
 Topic: perf_producer_p6_testPartition: 4Leader: none
 Replicas: 101,100,102   Isr:
 Topic: perf_producer_p6_testPartition: 5Leader: none
 Replicas: 102,101,100   Isr:

 Is there a way to specifically set leader and isr in command line, it is
 strange when I create the topic with 5 partitions, it has leader and isr:
 root@precise64:/etc/kafka# bin/kafka-topics.sh --describe --zookeeper
 10.100.98.100:2181 --topic perf_producer_p5_test
 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in

 [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in

 [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
 explanation.
 SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
 Topic:perf_producer_p5_test PartitionCount:5
 ReplicationFactor:3 Configs:
 Topic: perf_producer_p5_testPartition: 0Leader: 102
 Replicas: 102,100,101   Isr: 102,100,101
 Topic: perf_producer_p5_testPartition: 1Leader: 102
 Replicas: 100,101,102   Isr: 102,101
 Topic: perf_producer_p5_testPartition: 2Leader: 101
 Replicas: 101,102,100   Isr: 101,102,100
 Topic: perf_producer_p5_testPartition: 3Leader: 102
 Replicas: 102,101,100   Isr: 102,101,100
 Topic: perf_producer_p5_testPartition: 4Leader: 102
 Replicas: 100,102,101   Isr: 102,101


 Any ideas?

 thanks

 --

 Alec Li




-- 
Thanks,
Neha


Re: Rebalance not happening even after increasing max retries causing conflict in ZK

2014-12-22 Thread Neha Narkhede
Can you share a reproducible test case?

On Tue, Dec 9, 2014 at 7:11 AM, Mohit Kathuria mkathu...@sprinklr.com
wrote:

 Neha,

 The same issue reoccured with just 2 consumer processes. The exception was
 related to conflict in writing the ephemeral node. Below was the exception.
 Topic name is
  lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin
 with 30 partitions. The 2 processes were running on 2 servers with ips
 10.0.8.222 and 10.0.8.225.

 *2014-12-09 13:22:11 k.u.ZkUtils$ [INFO] I wrote this conflicted ephemeral
 node
 [{version:1,subscription:{lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin:5},pattern:static,timestamp:1417964160024}]
 at
 /consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin_ip-10-0-8-222-1417963753598-b19de58d
 a while back in a different session, hence I will backoff for this node to
 be deleted by Zookeeper and retry*
 Attached the complete error logs. The exception occured after the
 rebalance failed even after 40 retries. Rebalance failed as the process
 already owning some of the partitions did not give us ownership due to
 conflicting ephemeral nodes. As you suggested, we ran the wchp command  on
 the 3 zookeeper nodes at this time and figured out that the watcher was
 registered for only one of the process. I am copying the kafka consumer
 watcher registered on one of the zookeeper servers. (Attached are the wchp
 outputs of all 3 zk servers)

 *$echo wchp | nc localhost 2181 *


 */kafka/consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids*

 * 0x34a175e1d5d0130*


 0x34a175e1d5d0130 was the ephemeral node session Id. I went back to the
 zookeeper shell and checked the consumers registered for this topic and
 consumer group(same as topic name). Attaching the output in zkCommands.txt.
 This clearly shows that

 10.0.8.222 has ephemeralOwner = 0x34a175e1d5d0130

 10.0.8.225 has ephemeralOwner = 0x34a175e1d5d0127


 I think we have the issue here that both consumers have written to
 different ephemeral nodes. Watchers are registered for the one of the 2
 ephemeral node. The root cause seems to be the inconsistent state while
 writing the ephemeral nodes in ZK.

 Let me know if you need more details.

 -Thanks,

 Mohit




 On Mon, Nov 10, 2014 at 8:46 AM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

 A rebalance should trigger on all consumers when you add a new consumer to
 the group. If you don't see the zookeeper watch fire, the consumer may
 have
 somehow lost the watch. We have seen this behavior on older zk versions, I
 wonder it that bug got reintroduced. To verify if this is the case, you
 can
 run the wchp zookeeper command on the zk leader and check if each consumer
 has a watch registered.

 Do you have a way to try this on zk 3.3.4? I would recommend you try the
 wchp suggestion as well.

 On Fri, Nov 7, 2014 at 6:07 AM, Mohit Kathuria mkathu...@sprinklr.com
 wrote:

  Hi all,
 
  Can someone help here. We are getting constant rebalance failure each
 time
  a consumer is added beyond a certain number. Did quite a lot of
 debugging
  on this and still not able to figure out the pattern.
 
  -Thanks,
  Mohit
 
  On Mon, Nov 3, 2014 at 10:53 PM, Mohit Kathuria mkathu...@sprinklr.com
 
  wrote:
 
   Neha,
  
   Looks like an issue with the consumer rebalance not able to complete
   successfully. We were able to reproduce the issue on topic with 30
   partitions,  3 consumer processes(p1,p2 and p3), properties -  40
   rebalance.max.retries and 1(10s) rebalance.backoff.ms.
  
   Before the process p3 was started, partition ownership was as
 expected:
  
   partitions 0-14 owned by p1
   partitions 15-29 - owner p2
  
   As the process p3 started, rebalance was triggered. Process p3 was
   successfully able to acquire partition ownership for partitions 20-29
 as
   expected as per the rebalance algorithm. However, process p2 while
 trying
   to acquire ownership of partitions 10-19 saw rebalance failure after
 40
   retries.
  
   Attaching the logs from process p2 and process p1. It says that p2 was
   attempting to rebalance, it was trying to acquire ownership of
 partitions
   10-14 which were owned by process p1. However, at the same time
 process
  p1
   did not get any event for giving up the partition ownership for
  partitions
   1-14.
   We were expecting a rebalance to have triggered in p1 - but it didn't
 and
   hence not giving up ownership. Is our assumption correct/incorrect?
   And if the rebalance gets triggered in p1 - how to figure out apart
 from
   logs as the logs on p1 did not have anything.
  
   *2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO]
   [topic_consumerIdString], waiting for the partition ownership to be
   deleted: 11*
  
   During and after the rebalance failed on process p2, Partition
 Ownership
   was as below:
   0-14

Re: can't produce message in kafka production

2014-12-18 Thread Neha Narkhede
The producer is complaining that it's socket channel is already closed.
Which makes me think it was closed due to some error that is not present in
your logs. I'd enable DEBUG and see if that shows the cause.

On Thu, Dec 18, 2014 at 4:13 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Perhaps you have the logs from broker? It may show other errors that
 can help us troubleshoot.

 On Thu, Dec 18, 2014 at 4:11 PM, Sa Li sal...@gmail.com wrote:
  Thanks, Gwen, I telnet it,
  root@precise64:/etc/kafka# telnet 10.100.98.100 9092
  Trying 10.100.98.100...
  Connected to 10.100.98.100.
  Escape character is '^]'.
 
  seems it connected, and I check with system operation people, netstate
  should 9092 is listening. I am assuming this is the connection issue,
 since
  I can run the same command to my dev-cluster with no problem at all,
 which
  is 10.100.70.128:9092.
 
  Just in case, is it possibly caused by other types of issues?
 
  thanks
 
  Alec
 
  On Thu, Dec 18, 2014 at 2:33 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  Looks like you can't connect to: 10.100.98.100:9092
 
  I'd validate that this is the issue using telnet and then check the
  firewall / ipfilters settings.
 
  On Thu, Dec 18, 2014 at 2:21 PM, Sa Li sal...@gmail.com wrote:
   Dear all
  
   We just build a kafka production cluster, I can create topics in kafka
   production from another host. But when I am send very simple message
 as
   producer, it generate such errors:
  
   root@precise64:/etc/kafka# bin/kafka-console-producer.sh
 --broker-list
   10.100.98.100:9092 --topic my-replicated-topic-production
   SLF4J: Class path contains multiple SLF4J bindings.
   SLF4J: Found binding in
  
 
 [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: Found binding in
  
 
 [jar:file:/etc/kafka/core/build/dependant-libs-2.10.4/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
   SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
   explanation.
   SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
   my test message 1
   [2014-12-18 21:44:25,830] WARN Failed to send producer request with
   correlation id 2 to broker 101 with data for partitions
   [my-replicated-topic-production,1]
   (kafka.producer.async.DefaultEventHandler)
   java.nio.channels.ClosedChannelException
   at
 kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
   at
  kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
   at
  
 
 kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
   at
  
 
 kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
   at
  
 
 kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
   at
  
 
 kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
   at
  
 
 kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
   at
  
 kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
   at
  
 kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
   at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
   at
  
 
 kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:256)
   at
  
 
 kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:107)
   at
  
 
 kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:99)
   at
  
 
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
   at
  
 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at
  
 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
   at
  
 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
   at
  scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
   at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
   at
  
 
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
   at
  
 
 kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:99)
   at
  
 
 kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
   at
  
 
 kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
   at
  
 
 

Re: Kafka Topic/Partition Reassignment issue

2014-12-17 Thread Neha Narkhede
The reassignment tool outputs the original assignment before executing the
next one. If you have that saved, you can initiate another assignment to
make it go to its initial state. That is probably a safer way to fix the
reassignment.

On Wed, Dec 17, 2014 at 3:12 PM, Salman Ahmed ahmed.sal...@gmail.com
wrote:

 I had an issue where one kafka node was filling up on disk space. I used
 the reassignment script in an incorrect way, overloading large number of
 topics/partition on two target machines, which caused kafka to stop on
 those machines.

 I would like to cancel the reassignment process, and restore it to original
 or no further reassignment. Has anyone had to edit znodes and clean up
 reassignment so replicas and ISR are as they were prior to reassignment. If
 so please share the steps.

 Thank you
 Sal



-- 
Thanks,
Neha


Re: Built-in replication for broker

2014-12-16 Thread Neha Narkhede
Replication is available in Kafka 0.8.0 onwards.

On Tue, Dec 16, 2014 at 4:31 PM, Haoming Zhang haoming.zh...@outlook.com
wrote:

 Dear developers,

 In the paper Kafka: A Distributed Messaging System for Log Processing,
 Jay Kreps, Neha Narkhede, Jun Rao from LinkedIn, at NetDB workshop 2011,

 you mentioned this:
 If a broker goes down, any message stored on it not yet consumed becomes
 unavailable. If the storage system on a broker is permanently damaged, any
 unconsumed message is lost forever. In the future, we plan to add built-in
 replication in Kafka to redundantly store each me ssage on multiple
 brokers.

 I'm curious, does the built-in replication has been implemented or it's
 still in progress? Currently, is there anyway to recover the died broker
 and unconsumed messages?

 Thanks,
 Haoming




-- 
Thanks,
Neha


Re: getOffsetsBefore and latest

2014-12-15 Thread Neha Narkhede

 what about getOffsetsBefore using kafka.api.OffsetRequest.LatestTime? am i
 safe to assume this returns me truly the most recent offset + 1 for each
 partition?


That's right.

On Mon, Dec 15, 2014 at 7:18 AM, Koert Kuipers ko...@tresata.com wrote:

 i read in several places that getOffsetsBefore does not necessary returns
 the last offset before the timestamp, because it is basically file based
 (so it works at the granularity of the files kafka produces).

 what about getOffsetsBefore using kafka.api.OffsetRequest.LatestTime? am i
 safe to assume this returns me truly the most recent offset + 1 for each
 partition?

 thanks! koert



-- 
Thanks,
Neha


Re: kafka producer consumer problem

2014-12-15 Thread Neha Narkhede
Hi,

For the producer, we have moved to a new producer API (under
org.apache.kafka.clients.producer.KafkaProducer). Please feel free to give
that a spin and report any issues that you see.

I think the consumer issue you reported is being discussed in another
thread and is fixed in 0.8.2. Can you check if you still see the issue in
0.8.2-beta http://kafka.apache.org/downloads.html?

On Sun, Dec 14, 2014 at 10:08 PM, 黄震 skyhuang...@163.com wrote:

 Hi,

I'm using kafka-0.8.1.1, this is a good log system, and I'm very
 appreciate for your works. But I'm also found some problem:

1. producer:

  kafka.producer.async.DefaultEventHandler.scala handle(): I suggest
 that sendPartitionPerTopicCache should be cleared every batch, since It
 would not well distributed in every partition.

   2. consumer:

   kafka.consumer.SimpleConsumer.scala disconnect(): I suggest that we
 should delete if(blockingChannel.isConnected), since I came across this
 problem when switch broken down, and then broker was abnormally shutted
 down:

  2014-12-04 17:12:14,260 [ReplicaFetcherThread-7-8] ERROR
 kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-7-8], Error in
 fetch Name: FetchRequest; Version: 0; CorrelationId: 1069738; ClientId:
 ReplicaFetcherThread-7-8; ReplicaId: 3; MaxWait: 500 ms; MinBytes: 1 bytes;
 RequestInfo: [t.v.3,0] - PartitionFetchInfo(0,1048576)
 java.nio.channels.UnresolvedAddressException at
 sun.nio.ch.Net.checkAddress(Net.java:29) at
 sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512) at
 kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at
 kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) at
 kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57) at
 kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) at
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
 at
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
 at
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
 at
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)





 --


 Best Regards,

 Allen huang



-- 
Thanks,
Neha


Re: kafka async producer takes a lot of cpu

2014-12-15 Thread Neha Narkhede
The stack trace you showed shouldn't be going beyond the isDebugEnabled()
check if your log4j properties is configured at a higher log level.
Regarding your producer dropping messages, have you checked the produce
metrics (kafka.network:name={Produce-TotalTimeMs,type=RequestMetrics)
on the broker to see if the bottleneck is on the server or not?

On Sun, Dec 14, 2014 at 9:31 PM, Rajiv Kurian ra...@signalfuse.com wrote:

 I'll try it and let you guys know. Is there anything that comes to mind
 from these log messages though? Why would there be so many log messages?
 Would you suggest doing something else to find out why things are working
 so poorly? I am worried about making the risky transition to the beta
 producer client and finding out that things don't improve at all. Another
 thing that was peculiar was that only one of my producers (out of 3, each
 on a separate host) were dropping so many messages.

 On Sun, Dec 14, 2014 at 4:58 PM, Neha Narkhede n...@confluent.io wrote:

  Thanks for reporting the issue, Rajiv. Since we are actively phasing out
  the old client, it will be very helpful to know what the behavior on the
  new client is.
 
 
  On Fri, Dec 12, 2014 at 8:12 PM, Rajiv Kurian ra...@signalfuse.com
  wrote:
  
   I am using the kafka java api async client (the one that wraps the
 Scala
   client). It's dropping quite a bit of data due to the queue being full,
  and
   the process ends up taking a lot of cpu too.
  
   I am posting to a topic with 1024 partitions (across 3 brokers) - maybe
  the
   high number of brokers is one of the issues. Profiling with YourKit
  showed
   37% of my CPU being spent on
  kafka.producer.async.ProducerSendThread.run().
   So seems like the producer is not able to keep up with my application
 and
   starts dropping. When I expand this waterfall in YourKit, I see that
 23%
   total (not out of the 37%) is being spent on logging! Something like
  this:
  
  
  
 
 kafka.producer.BrokerPartitionInfo$$anonfun$getBrokerPartitionInfo$2.apply(PartitionMetadata)
   -kafka.producer.BrokerPartitionInfo.debug(Function0)
  -kafka.utils.Logging$class.debug(Logging, Function0)
 -org.apache.log4j.Category.isDebugEnabled()
-... (a bunch of other things that finally break down into)
-LoggerContext.java:252
  
  
 
 ch.qos.logback.classic.spi.TurboFilterList.getTurboFilterChainDecision(Marker,
   Logger, Level, String, Object[], Throwable)
  
   I am not sure what's going on here. When I look at my process log, none
  of
   these messages are actually logged (probably because of the log level).
   Further I don't see anything very suspicious on the broker logs. They
 are
   at 60-70% cpu.
  
   I am planning to try the new Java beta producer client, but I am afraid
   something deeper is going on here, that might not be solved by
 switching
  to
   the newer client.
  
 
 
  --
  Thanks,
  Neha
 



-- 
Thanks,
Neha


Re: Number of Consumers Connected

2014-12-15 Thread Neha Narkhede
In addition to Gwen's suggestion, we actually don't have jmx metrics that
give you a list of actively consuming processes.

On Mon, Dec 15, 2014 at 12:59 PM, Gwen Shapira gshap...@cloudera.com
wrote:

 Currently you can find the number of consumer groups through ZooKeeper:

 connect to ZK and run
 ls /consumers

 and count the number of results

 On Mon, Dec 15, 2014 at 11:34 AM, nitin sharma
 kumarsharma.ni...@gmail.com wrote:
  Hi Team,
 
  Is it possible to know how many Consumer Group connected to kafka broker
 Ids
  and as well as how many Instances within a Group are fetching messages
 from
  Kafka Brokers
 
  Regards,
  Nitin Kumar Sharma.



-- 
Thanks,
Neha


Re: How to Setup MirrorMaker in Generalized way

2014-12-12 Thread Neha Narkhede

 Is there any solution to avoid this duplicated entry in target cluster? I
 am using Kafka


That is the expected behavior when the mirror maker process is killed. It
is expected that in most cases, it be shutdown cleanly. If it is killed, it
doesn't get a chance to checkpoint it's offset, which causes the duplicates.

Thanks,
Neha

On Thu, Dec 11, 2014 at 4:30 AM, Madhukar Bharti bhartimadhu...@gmail.com
wrote:

 Hi Neha,

 Thanks for your reply.

 Now using MM tool to replicate data between Kafka clusters, But I am facing
 one problem, Messages gets duplicated if MM killed forcefully[ *kill -9* ].

 Is there any solution to avoid this duplicated entry in target cluster? I
 am using Kafka


 *8.1.1.*

 On Mon, Dec 8, 2014 at 11:17 PM, Neha Narkhede n...@confluent.io wrote:

  Hi Madhukar,
 
  From the same documentation link you referred to -
 
  The source and destination clusters are completely independent entities:
   they can have different numbers of partitions and the offsets will not
 be
   the same. For this reason the mirror cluster is not really intended as
 a
   fault-tolerance mechanism (as the consumer position will be different);
  for
   that we recommend using normal in-cluster replication. The mirror maker
   process will, however, retain and use the message key for partitioning
 so
   order is preserved on a per-key basis.
 
 
  There is no way to setup an *exact* Kafka mirror yet.
 
  Thanks,
  Neha
 
  On Mon, Dec 8, 2014 at 7:47 AM, Madhukar Bharti 
 bhartimadhu...@gmail.com
  wrote:
 
   Hi,
  
   I am going to setup Kafka clusters having 3 brokers in Datacenter 1.
  Topics
   can be created time to time. Each topic can have varying partitions
  mostly
   1,10 or 20. Each application might have different partitioning
 algorithm
   that we don't know(let it be hidden from ops team).
  
   We want to setup mirror maker tool in such a way so that, the exact
   partitioned data should go to the same partition without knowing the
  Topics
   partition logic and it should be *generalized*. [This should be common
  for
   all Topics.]
  
   *like partition 0 at DataCenter1 should be exact mirror of  partition-0
  in
   Datacenter2*.
  
   Please suggest me a solution for doing so. If MirrorMaker
   http://kafka.apache.org/documentation.html#basic_ops_mirror_maker
 tool
   provide any configurations which solve this use-case please let me
 know.
  
  
  
   Regards,
   Madhukar Bharti
  
 
 
 
  --
  Thanks,
  Neha
 


 --
 Thanks and Regards,
 Madhukar Bharti



-- 
Thanks,
Neha


Re: Reading only the latest message

2014-12-09 Thread Neha Narkhede

 Is this the best way to get the offset and is it safe to decrement the
 offset returned as we do in the sample code below.



Yes, this should work. Let us know if you see any problems.

On Tue, Dec 9, 2014 at 7:44 AM, Orelowitz, David david.orelow...@baml.com
wrote:

  I am reposting a question that I posted last week.



 On startup or recovery we would like to read the latest message in each
 partition.

 The getOffsetsBefore() below seems to return the offset of the next
 message that will be published to that partition.



 The code below works correctly as required.

 Is this the best way to get the offset and is it safe to decrement the
 offset returned as we do in the sample code below.



 requestInfo.put(topicAndPartition, *new* PartitionOffsetRequestInfo(-1,
 1));

 kafka.javaapi.OffsetRequest request = *new* kafka.javaapi.OffsetRequest(
 requestInfo, kafka.api.OffsetRequest.*CurrentVersion*(), clientName);

 OffsetResponse response = consumer.getOffsetsBefore(request);



 *if*(readOffset != 0)

 readOffset--;

 else

handle this case…















 *From:* Neha Narkhede [mailto:n...@confluent.io]
 *Sent:* Monday, December 08, 2014 12:43 PM
 *To:* Orelowitz, David
 *Cc:* users@kafka.apache.org

 *Subject:* Re: Reading only the latest message



 The returned latest offset - 1 will be the offset of the last message.
 Sorry, should've made it clear in my last email. Let me know if that helps.



 On Mon, Dec 8, 2014 at 8:32 AM, Orelowitz, David david.orelow...@baml.com
 wrote:

 Neha,

 This seems to return the offset of the next message that will be
 published. If I fetch at that offset I will block until a new message is
 published to that partition.

 I am actually trying to read the contents of the latest message in the
 partition, and based on info in the message resubscribe to the data source.


 -Original Message-
 From: Neha Narkhede [mailto:n...@confluent.io]
 Sent: Friday, December 05, 2014 8:33 PM
 To: users@kafka.apache.org
 Subject: Re: Reading only the latest message

 You can use the getOffsetsBefore() API and specify -1L to get the offset
 of the last committed message (at the time of the request) for that
 partition.

 On Fri, Dec 5, 2014 at 12:42 PM, Orelowitz, David 
 david.orelow...@baml.com
 wrote:

  What is the best mechanism to retrieve the latest message from a kafka
  partition.
 
  We intend for our producer, on startup or recovery, to read the
  upstream sequence number in the last message in the partition and
  request for the upstream system to start sending from that sequence
 number++.
 
  Currently we are creating a SimpleConsumer and then calling
  getOffsetBefore() using the current wall time. We then decrement the
  offset returned and retrieve the message at this offset. We do manage
  the case when the offset is zero.
 
  It seem to work!
 
  Is this the right approach.
 
  Thanks,
  David
 

 --
 Thanks,
 Neha

 --
 This message, and any attachments, is for the intended recipient(s) only,
 may contain information that is privileged, confidential and/or proprietary
 and subject to important terms and conditions available at
 http://www.bankofamerica.com/emaildisclaimer.   If you are not the
 intended recipient, please delete this message.





 --

 Thanks,
 Neha
   --
 This message, and any attachments, is for the intended recipient(s) only,
 may contain information that is privileged, confidential and/or proprietary
 and subject to important terms and conditions available at
 http://www.bankofamerica.com/emaildisclaimer. If you are not the intended
 recipient, please delete this message.




-- 
Thanks,
Neha


Re: Topic config reset on alter

2014-12-09 Thread Neha Narkhede
Thanks for reporting the bug. Would you mind filing a JIRA
https://issues.apache.org/jira/browse/KAFKA/?selectedTab=com.atlassian.jira.jira-projects-plugin:issues-panel
?

On Tue, Dec 9, 2014 at 12:46 PM, Andrew Jorgensen 
ajorgen...@twitter.com.invalid wrote:

 I am using kafka 8.1.1

 If you run an alter on a topic to increase the number of partitions the
 existing configs are removed from the topic. You can confirm this by doing:

 bin/kafka-topics.sh —zookeeper localhost —alter —topic topic —config
 retention.ms=360

 bin/kafka-topics.sh —zookeeper localhost —describe
  Topic:topic  PartitionCount:5   ReplicationFactor:1 Configs:
 retention.ms=360

 bin/kafka-topics.sh —zookeeper localhost —alter —topic topic —partitions 10

 bin/kafka-topics.sh —zookeeper localhost —describe
  Topic:topic  PartitionCount:10   ReplicationFactor:1 Configs:

 This feels like a bug to me, where is the best place to report something
 like this?
 --
 Andrew Jorgensen
 @ajorgensen




-- 
Thanks,
Neha


Re: Increased CPU usage with 0.8.2-beta

2014-12-09 Thread Neha Narkhede
The following should be sufficient

java
-agentlib:hprof=cpu=samples,depth=100,interval=20,lineno=y,thread=y,file=kafka.hprof
classname

You would need to start the Kafka server with the settings above for
sometime until you observe the problem.

On Tue, Dec 9, 2014 at 3:47 AM, Mathias Söderberg 
mathias.soederb...@gmail.com wrote:

 Hi Neha,

 Yeah sure. I'm not familiar with hprof, so any particular options I should
 include or just run with defaults?

 Best regards,
 Mathias

 On Mon Dec 08 2014 at 7:41:32 PM Neha Narkhede n...@confluent.io wrote:

  Thanks for reporting the issue. Would you mind running hprof and sending
  the output?
 
  On Mon, Dec 8, 2014 at 1:25 AM, Mathias Söderberg 
  mathias.soederb...@gmail.com wrote:
 
   Good day,
  
   I upgraded a Kafka cluster from v0.8.1.1 to v0.8.2-beta and noticed
 that
   the CPU usage on the broker machines went up by roughly 40%, from ~60%
 to
   ~100% and am wondering if anyone else has experienced something
 similar?
   The load average also went up by 2x-3x.
  
   We're running on EC2 and the cluster currently consists of four
  m1.xlarge,
   with roughly 1100 topics / 4000 partitions. Using Java 7 (1.7.0_65 to
 be
   exact) and Scala 2.9.2. Configurations can be found over here:
   https://gist.github.com/mthssdrbrg/7df34a795e07eef10262.
  
   I'm assuming that this is not expected behaviour for 0.8.2-beta?
  
   Best regards,
   Mathias
  
 
 
 
  --
  Thanks,
  Neha
 




-- 
Thanks,
Neha


Re: How to raise a question in forum

2014-12-09 Thread Neha Narkhede
You can post it here :-)

On Tue, Dec 9, 2014 at 1:26 PM, nitin sharma kumarsharma.ni...@gmail.com
wrote:

 HI,
 I need some help related to Kafka Consumer Lag monitoring.. Kindly let me
 know how can i post my query to the forum.

 Regards,
 Nitin Kumar Sharma.




-- 
Thanks,
Neha


Re: Reading only the latest message

2014-12-08 Thread Neha Narkhede
The returned latest offset - 1 will be the offset of the last message.
Sorry, should've made it clear in my last email. Let me know if that helps.

On Mon, Dec 8, 2014 at 8:32 AM, Orelowitz, David david.orelow...@baml.com
wrote:

 Neha,

 This seems to return the offset of the next message that will be
 published. If I fetch at that offset I will block until a new message is
 published to that partition.

 I am actually trying to read the contents of the latest message in the
 partition, and based on info in the message resubscribe to the data source.

 -Original Message-
 From: Neha Narkhede [mailto:n...@confluent.io]
 Sent: Friday, December 05, 2014 8:33 PM
 To: users@kafka.apache.org
 Subject: Re: Reading only the latest message

 You can use the getOffsetsBefore() API and specify -1L to get the offset
 of the last committed message (at the time of the request) for that
 partition.

 On Fri, Dec 5, 2014 at 12:42 PM, Orelowitz, David 
 david.orelow...@baml.com
 wrote:

  What is the best mechanism to retrieve the latest message from a kafka
  partition.
 
  We intend for our producer, on startup or recovery, to read the
  upstream sequence number in the last message in the partition and
  request for the upstream system to start sending from that sequence
 number++.
 
  Currently we are creating a SimpleConsumer and then calling
  getOffsetBefore() using the current wall time. We then decrement the
  offset returned and retrieve the message at this offset. We do manage
  the case when the offset is zero.
 
  It seem to work!
 
  Is this the right approach.
 
  Thanks,
  David
 
 
  --
  This message, and any attachments, is for the intended recipient(s)
  only, may contain information that is privileged, confidential and/or
  proprietary and subject to important terms and conditions available at
  http://www.bankofamerica.com/emaildisclaimer.   If you are not the
  intended recipient, please delete this message.
 



 --
 Thanks,
 Neha

 --
 This message, and any attachments, is for the intended recipient(s) only,
 may contain information that is privileged, confidential and/or proprietary
 and subject to important terms and conditions available at
 http://www.bankofamerica.com/emaildisclaimer.   If you are not the
 intended recipient, please delete this message.




-- 
Thanks,
Neha


Re: How to Setup MirrorMaker in Generalized way

2014-12-08 Thread Neha Narkhede
Hi Madhukar,

From the same documentation link you referred to -

The source and destination clusters are completely independent entities:
 they can have different numbers of partitions and the offsets will not be
 the same. For this reason the mirror cluster is not really intended as a
 fault-tolerance mechanism (as the consumer position will be different); for
 that we recommend using normal in-cluster replication. The mirror maker
 process will, however, retain and use the message key for partitioning so
 order is preserved on a per-key basis.


There is no way to setup an *exact* Kafka mirror yet.

Thanks,
Neha

On Mon, Dec 8, 2014 at 7:47 AM, Madhukar Bharti bhartimadhu...@gmail.com
wrote:

 Hi,

 I am going to setup Kafka clusters having 3 brokers in Datacenter 1. Topics
 can be created time to time. Each topic can have varying partitions mostly
 1,10 or 20. Each application might have different partitioning algorithm
 that we don't know(let it be hidden from ops team).

 We want to setup mirror maker tool in such a way so that, the exact
 partitioned data should go to the same partition without knowing the Topics
 partition logic and it should be *generalized*. [This should be common for
 all Topics.]

 *like partition 0 at DataCenter1 should be exact mirror of  partition-0 in
 Datacenter2*.

 Please suggest me a solution for doing so. If MirrorMaker
 http://kafka.apache.org/documentation.html#basic_ops_mirror_maker tool
 provide any configurations which solve this use-case please let me know.



 Regards,
 Madhukar Bharti




-- 
Thanks,
Neha


Re: Increased CPU usage with 0.8.2-beta

2014-12-08 Thread Neha Narkhede
Thanks for reporting the issue. Would you mind running hprof and sending
the output?

On Mon, Dec 8, 2014 at 1:25 AM, Mathias Söderberg 
mathias.soederb...@gmail.com wrote:

 Good day,

 I upgraded a Kafka cluster from v0.8.1.1 to v0.8.2-beta and noticed that
 the CPU usage on the broker machines went up by roughly 40%, from ~60% to
 ~100% and am wondering if anyone else has experienced something similar?
 The load average also went up by 2x-3x.

 We're running on EC2 and the cluster currently consists of four m1.xlarge,
 with roughly 1100 topics / 4000 partitions. Using Java 7 (1.7.0_65 to be
 exact) and Scala 2.9.2. Configurations can be found over here:
 https://gist.github.com/mthssdrbrg/7df34a795e07eef10262.

 I'm assuming that this is not expected behaviour for 0.8.2-beta?

 Best regards,
 Mathias




-- 
Thanks,
Neha


Re: kafka consumer to write into DB

2014-12-05 Thread Neha Narkhede
Not that I know of.

On Fri, Dec 5, 2014 at 9:44 AM, Sa Li sal...@gmail.com wrote:

 Thanks, Neha, is there a java version batch consumer?

 thanks



 On Fri, Dec 5, 2014 at 9:41 AM, Scott Clasen sc...@heroku.com wrote:

  if you are using scala/akka this will handle the batching and acks for
 you.
 
  https://github.com/sclasen/akka-kafka#akkabatchconsumer
 
  On Fri, Dec 5, 2014 at 9:21 AM, Sa Li sal...@gmail.com wrote:
 
   Thank you very much for the reply, Neha, I have a question about
  consumer,
   I consume the data from kafka and write into DB, of course I have to
  create
   a hash map in memory, load data into memory and bulk copy to DB instead
  of
   insert into DB line by line. Does it mean I need to ack each message
  while
   load to memory?
  
   thanks
  
  
  
   On Thu, Dec 4, 2014 at 1:21 PM, Neha Narkhede n...@confluent.io
 wrote:
  
This is specific for pentaho but may be useful -
https://github.com/RuckusWirelessIL/pentaho-kafka-consumer
   
On Thu, Dec 4, 2014 at 12:58 PM, Sa Li sal...@gmail.com wrote:
   
 Hello, all

 I never developed a kafka consumer, I want to be able to make an
   advanced
 kafka consumer in java to consume the data and continuously write
 the
data
 into postgresql DB. I am thinking to create a map in memory and
   getting a
 predefined number of messages in memory then write into DB in
 batch,
  is
 there a API or sample code to allow me to do this?


 thanks


 --

 Alec Li

   
   
   
--
Thanks,
Neha
   
  
  
  
   --
  
   Alec Li
  
 



 --

 Alec Li




-- 
Thanks,
Neha


Re: Can Mirroring Preserve Every Topic's Partition?

2014-12-05 Thread Neha Narkhede
Going back to your previous requirement of ensuring that the data in the
target cluster is in the same order as the source cluster, all you need is
to specify a key with every record in your data. The mirror maker and its
producer takes care of placing all the data for a particular key in the
same partition on the target cluster. Effectively, all your data will be in
the same order (though there may be a few duplicates as I mentioned before).

Hope that helps!

On Fri, Dec 5, 2014 at 1:23 PM, Alex Melville amelvi...@g.hmc.edu wrote:

 Thank you for your replies Guozhang and Neha, though I have some followup
 questions.

 I wrote my own Java Consumer and Producer based off of the Kafka Producer
 API and High Level Consumer. Let's call them MyConsumer and MyProducer.
 MyProducer uses a custom Partitioner class called SimplePartitioner. In the
 producer.config file that I specify when I run the MirrorMaker from the
 command line, there is a parameter partitioner.class. I keep getting
 ClassDefNotFoundException exceptions, no matter if I put the absolute path
 to my SimplePartitioner.class file, a relative path, or even when I add
 SimplePartitioner.class to the $CLASSPATH variables created in the
 kafka-run-class.sh script. Here is my output error:

 Exception in thread main java.lang.ClassNotFoundException:
 SimplePartitioner.class
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:191)
 at kafka.utils.Utils$.createObject(Utils.scala:438)
 at kafka.producer.Producer.init(Producer.scala:60)
 at kafka.tools.MirrorMaker$$anonfun$1.apply(MirrorMaker.scala:116)
 at kafka.tools.MirrorMaker$$anonfun$1.apply(MirrorMaker.scala:106)
 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
 at scala.collection.immutable.Range.foreach(Range.scala:81)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
 at scala.collection.immutable.Range.map(Range.scala:46)
 at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:106)
 at kafka.tools.MirrorMaker.main(MirrorMaker.scala)

 What is the correct value for the partitioner.class parameter in my
 producer.properties config file?




 Guozhang, in your reply to my original message you said When the consumer
 of the MM gets a message, put the message to the producer's queue This
 seems to imply that I can specify my own custom Consumer and Producer when
 I run the Mirrormaker. How can I do this? Or, if I'm understand incorrectly
 and I have to use whichever default consumer/producer the Mirrormaker uses,
 how can I get that consumer to learn which partition it's reading from,
 pass that info to the producer, and then specify that partition ID when the
 producer rights to the target cluster?


 -Alex

 On Wed, Nov 26, 2014 at 10:33 AM, Guozhang Wang wangg...@gmail.com
 wrote:

  Hello Alex,
 
  This can be done by doing some tweaks in the MM code (with the 0.8.2 new
  producer).
 
  1. Set-up your MM to have the total # of producers equal to the #. of
  partitions in source / target cluster.
 
  2. When the consumer of the MM gets a message, put the message to the
  producer's queue based on its partition id; i.e. if the partition id is
 n,
  put to n's producer queue.
 
  3. When producer sends the data, specify the partition id; so each
 producer
  will only send to a single partition.
 
  Guozhang
 
 
  On Tue, Nov 25, 2014 at 8:19 PM, Alex Melville amelvi...@g.hmc.edu
  wrote:
 
   Howdy friends,
  
  
   I'd like to mirror the topics on several clusters to a central cluster,
  and
   I'm looking at using the default Mirrormaker to do so. I've already
 done
   some basic testing on the Mirrormaker found here:
  
  
 
 https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330
  
   and managed to successfully copy a topic's partitions on a source
 cluster
   to a topic on a target cluster. So I'm able to mirror correctly.
 However
   for my particular use case I need to ensure that when I copy a topic's
   partitions from source cluster to target cluster, a partition created
 on
   the target cluster contains data in the exact same order as the data on
  the
   corresponding partition on the source cluster.
  
   I'm thinking of writing a Simple Consumer so I can manually compare the
   events in a source cluster's partition with the corresponding partition
  on
   the target cluster, but I'm not 100% sure if I'll be able to verify my
   guarantee if I do it this way. Can 

Re: Reading only the latest message

2014-12-05 Thread Neha Narkhede
You can use the getOffsetsBefore() API and specify -1L to get the offset of
the last committed message (at the time of the request) for that partition.

On Fri, Dec 5, 2014 at 12:42 PM, Orelowitz, David david.orelow...@baml.com
wrote:

 What is the best mechanism to retrieve the latest message from a kafka
 partition.

 We intend for our producer, on startup or recovery, to read the upstream
 sequence number in the last message in the partition and request for the
 upstream system to start sending from that sequence number++.

 Currently we are creating a SimpleConsumer and then calling
 getOffsetBefore() using the current wall time. We then decrement the offset
 returned and retrieve the message at this offset. We do manage the case
 when the offset is zero.

 It seem to work!

 Is this the right approach.

 Thanks,
 David


 --
 This message, and any attachments, is for the intended recipient(s) only,
 may contain information that is privileged, confidential and/or proprietary
 and subject to important terms and conditions available at
 http://www.bankofamerica.com/emaildisclaimer.   If you are not the
 intended recipient, please delete this message.




-- 
Thanks,
Neha


Re: Broker don't get back when killed and restarted

2014-12-05 Thread Neha Narkhede
Have you tried using the latest stable version of Kafka (0.8.1.1) with
controlled shutdown?

On Fri, Dec 5, 2014 at 2:39 PM, Haeley Yao hae...@quantifind.com wrote:

 Hi, Kafka group

 We try to improve the fault-tolerance of kafka cluster. We setup 4 nodes
 kafka cluster and 3 nodes zookeeper cluster.

 ubuntu version: Ubuntu 14.04.1
 zookeeper version: 3.4.5-1392090, built on 09/30/2012 17:52 GMT
 kafka version: kafka_2.8.0-0.8.0

 kafka0.x.x.x
 broker:9092
 borker.id = 11
 zookeeper.connect=zk-01.x.x.x:2182,zk-02.x.x.x:2182,zk-03.x.x.x:2182

 kafka1.x.x.x
 broker:9092
 borker.id = 12
 zookeeper.connect=zk-01.x.x.x:2182,zk-02.x.x.x:2182,zk-03.x.x.x:2182

 kafka2.x.x.x
 broker:9092
 borker.id = 13
 zookeeper.connect=zk-01.x.x.x:2182,zk-02.x.x.x:2182,zk-03.x.x.x:2182

 kafka3.x.x.x
 broker:9092
 borker.id = 14
 zookeeper.connect=zk-01.x.x.x:2182,zk-02.x.x.x:2182,zk-03.x.x.x:2182

 1. start both kafka and zk servers, everything OK

 2. create 2 topics
 bin/kafka-create-topic.sh --zookeeper
 zk-01.x.x.x:2182,zk-02.x.x.x:2182,zk-03.x.x.x:2182 —parition 3 --replica 3
 --topic zerg.hydra

 bin/kafka-create-topic.sh --zookeeper
 zk-01.x.x.x:2182,zk-02.x.x.x:2182,zk-03.x.x.x:2182 —parition 2 --replica 3
 --topic zerg.test

 3. start producer and consumer on brokerId 11. the inputs sent to producer
 are received by consumer.

 4. stop kafka servers on brokerId 12, 13, 14 by supervisor
 /etc/init.d/supervisor stop

 only brokerId 11 is running.
 root@kafka0:/home/dev/kafka# bin/kafka-list-topic.sh --topic zerg.hydra
 --zookeeper zk-01.dev.quantifind.com:2182,zk-02.dev.quantifind.com:2182,
 zk-03.dev.quantifind.com:2182
 topic: zerg.hydra   partition: 0leader: 11  replicas:
 11,14,12  isr: 11
 topic: zerg.hydra   partition: 1leader: 11  replicas:
 11,14,12  isr: 11
 topic: zerg.hydra   partition: 2leader: 11  replicas:
 12,11,13  isr: 11

 root@kafka0:/home/dev/kafka# bin/kafka-list-topic.sh --topic zerg.test
 --zookeeper zk-01.dev.quantifind.com:2182,zk-02.dev.quantifind.com:2182,
 zk-03.dev.quantifind.com:2182
 topic: zerg.testpartition: 0leader: 11  replicas:
 13,14,11  isr: 11
 topic: zerg.testpartition: 1leader: 11  replicas:
 14,11,12  isr: 11

 5. start kafka on brokerId 13, kafka2.x.x.x
 bin/kafka-server-start.sh config/server.properties

 [2014-12-05 14:34:45,607] ERROR [KafkaApi-13] error when handling request
 Name: FetchRequest; Version: 0; CorrelationId: 222; ClientId:
 ReplicaFetcherThread-0-11; ReplicaId: 13; MaxWait: 500 ms; MinBytes: 1
 bytes; RequestInfo: [zerg.hydra,2] -
 PartitionFetchInfo(0,1048576),[zerg.test,0] -
 PartitionFetchInfo(15,1048576) (kafka.server.KafkaApis)
 kafka.common.KafkaException: Shouldn't set logEndOffset for replica 13
 partition [zerg.hydra,2] since it's local
 at kafka.cluster.Replica.logEndOffset_$eq(Replica.scala:46)
 at
 kafka.cluster.Partition.updateLeaderHWAndMaybeExpandIsr(Partition.scala:227)

 Could you help on it?

 Thank you!




 Haeley
 —
 Work hard, stay humble.







-- 
Thanks,
Neha


Re: Can Mirroring Preserve Every Topic's Partition?

2014-12-04 Thread Neha Narkhede
As Guozhang mentioned, that will allow exact partitioning but note that
this still doesn't ensure identical partitions since your target cluster
can have duplicates depending on whether the producer retried sending data
or not.

On Wed, Nov 26, 2014 at 10:33 AM, Guozhang Wang wangg...@gmail.com wrote:

 Hello Alex,

 This can be done by doing some tweaks in the MM code (with the 0.8.2 new
 producer).

 1. Set-up your MM to have the total # of producers equal to the #. of
 partitions in source / target cluster.

 2. When the consumer of the MM gets a message, put the message to the
 producer's queue based on its partition id; i.e. if the partition id is n,
 put to n's producer queue.

 3. When producer sends the data, specify the partition id; so each producer
 will only send to a single partition.

 Guozhang


 On Tue, Nov 25, 2014 at 8:19 PM, Alex Melville amelvi...@g.hmc.edu
 wrote:

  Howdy friends,
 
 
  I'd like to mirror the topics on several clusters to a central cluster,
 and
  I'm looking at using the default Mirrormaker to do so. I've already done
  some basic testing on the Mirrormaker found here:
 
 
 https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330
 
  and managed to successfully copy a topic's partitions on a source cluster
  to a topic on a target cluster. So I'm able to mirror correctly. However
  for my particular use case I need to ensure that when I copy a topic's
  partitions from source cluster to target cluster, a partition created on
  the target cluster contains data in the exact same order as the data on
 the
  corresponding partition on the source cluster.
 
  I'm thinking of writing a Simple Consumer so I can manually compare the
  events in a source cluster's partition with the corresponding partition
 on
  the target cluster, but I'm not 100% sure if I'll be able to verify my
  guarantee if I do it this way. Can anyone here verify that partitions
  copied over to the target cluster by the default Mirrormaker are an exact
  copy of those on the source cluster?
 
 
  Thanks in advance,
 
  Alex Melville
 



 --
 -- Guozhang



Re: kafka consumer to write into DB

2014-12-04 Thread Neha Narkhede
This is specific for pentaho but may be useful -
https://github.com/RuckusWirelessIL/pentaho-kafka-consumer

On Thu, Dec 4, 2014 at 12:58 PM, Sa Li sal...@gmail.com wrote:

 Hello, all

 I never developed a kafka consumer, I want to be able to make an advanced
 kafka consumer in java to consume the data and continuously write the data
 into postgresql DB. I am thinking to create a map in memory and getting a
 predefined number of messages in memory then write into DB in batch, is
 there a API or sample code to allow me to do this?


 thanks


 --

 Alec Li




-- 
Thanks,
Neha


Re: Questions about new consumer API

2014-12-02 Thread Neha Narkhede
1. In this doc it says kafka consumer will automatically do load balance.
Is it based on throughtput or same as what we have now balance the
cardinality among all consumers in same ConsumerGroup? In a real case
different partitions could have different peak time.

Load balancing is still based on # of partitions for the subscribed topics
and
ensuring that each partition has exactly one consumer as the owner.

2. In the API, threre is subscribe(partition...) method saying not using
group management, does it mean the group.id property will be discarded and
developer has full control of distributing partitions to consumers?

group.id is also required for offset management, if the user chooses to use
Kafka based offset management. The user will have full control over
distribution
of partitions to consumers.

3. Is new API compatible with old broker?

Yes, it will.

4. Will simple consumer api and high-level consumer api still be supported?

Over time, we will phase out the current high-level and simple consumer
since the
0.9 API supports both.

Thanks,
Neha

On Tue, Dec 2, 2014 at 12:07 PM, hsy...@gmail.com hsy...@gmail.com wrote:

 Hi guys,

 I'm interested in the new Consumer API.
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/

 I have couple of question.
 1. In this doc it says kafka consumer will automatically do load balance.
 Is it based on throughtput or same as what we have now balance the
 cardinality among all consumers in same ConsumerGroup? In a real case
 different partitions could have different peak time.
 2. In the API, threre is subscribe(partition...) method saying not using
 group management, does it mean the group.id property will be discarded and
 developer has full control of distributing partitions to consumers?
 3. Is new API compatible with old broker?
 4. Will simple consumer api and high-level consumer api still be supported?

 Thanks!

 Best,
 Siyuan



Re: Questions about new consumer API

2014-12-02 Thread Neha Narkhede
The offsets are keyed on group, topic, partition so if you have more than
one owner per partition, they will rewrite each other's offsets and lead to
incorrect state.

On Tue, Dec 2, 2014 at 2:32 PM, hsy...@gmail.com hsy...@gmail.com wrote:

 Thanks Neha, another question, so if offsets are stored under group.id,
 dose it mean in one group, there should be at most one subscriber for each
 topic partition?

 Best,
 Siyuan

 On Tue, Dec 2, 2014 at 12:55 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  1. In this doc it says kafka consumer will automatically do load balance.
  Is it based on throughtput or same as what we have now balance the
  cardinality among all consumers in same ConsumerGroup? In a real case
  different partitions could have different peak time.
 
  Load balancing is still based on # of partitions for the subscribed
 topics
  and
  ensuring that each partition has exactly one consumer as the owner.
 
  2. In the API, threre is subscribe(partition...) method saying not using
  group management, does it mean the group.id property will be discarded
 and
  developer has full control of distributing partitions to consumers?
 
  group.id is also required for offset management, if the user chooses to
  use
  Kafka based offset management. The user will have full control over
  distribution
  of partitions to consumers.
 
  3. Is new API compatible with old broker?
 
  Yes, it will.
 
  4. Will simple consumer api and high-level consumer api still be
 supported?
 
  Over time, we will phase out the current high-level and simple consumer
  since the
  0.9 API supports both.
 
  Thanks,
  Neha
 
  On Tue, Dec 2, 2014 at 12:07 PM, hsy...@gmail.com hsy...@gmail.com
  wrote:
 
   Hi guys,
  
   I'm interested in the new Consumer API.
   http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/
  
   I have couple of question.
   1. In this doc it says kafka consumer will automatically do load
 balance.
   Is it based on throughtput or same as what we have now balance the
   cardinality among all consumers in same ConsumerGroup? In a real case
   different partitions could have different peak time.
   2. In the API, threre is subscribe(partition...) method saying not
 using
   group management, does it mean the group.id property will be discarded
  and
   developer has full control of distributing partitions to consumers?
   3. Is new API compatible with old broker?
   4. Will simple consumer api and high-level consumer api still be
  supported?
  
   Thanks!
  
   Best,
   Siyuan
  
 



Re: Best practice for upgrading Kafka cluster from 0.8.1 to 0.8.1.1

2014-12-02 Thread Neha Narkhede
Will doing one broker at
a time by brining the broker down, updating the code, and restarting it be
sufficient?

Yes this should work for the upgrade.

On Mon, Dec 1, 2014 at 10:23 PM, Yu Yang yuyan...@gmail.com wrote:

 Hi,

 We have a kafka cluster that runs Kafka 0.8.1 that we are considering
 upgrade to 0.8.1.1. The Kafka documentation
 http://kafka.apache.org/documentation.html#upgrade mentions upgrading
 from 0.8 to 0.8.1, but not from 0.8.1 to 0.8.1.1.  Will doing one broker at
 a time by brining the broker down, updating the code, and restarting it be
 sufficient? Any best practice suggestions?

 Thanks!

 Regards,
 Yu



Re: Kafka 0.8.2 release - before Santa Claus?

2014-12-01 Thread Neha Narkhede
+1 for doing a 0.8.2 final before the December break.

On Mon, Dec 1, 2014 at 8:40 AM, Jun Rao jun...@gmail.com wrote:

 We are currently discussing a last-minute API change to the new java
 producer. We have also accumulated a few more 0.8.2 blockers.


 https://issues.apache.org/jira/browse/KAFKA-1642?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20priority%20%3D%20Blocker%20AND%20fixVersion%20%3D%200.8.2

 So, we likely will have another 0.8.2 release in Dec. However, I am not
 sure if that's beta2 or final.

 Thanks,

 Jun

 On Wed, Nov 26, 2014 at 12:22 PM, Otis Gospodnetic 
 otis.gospodne...@gmail.com wrote:

  Hi,
 
  People using SPM to monitor Kafka have been anxiously asking us about the
  0.8.2 release and we've been telling them December.  Is that still the
  plan?
 
  Thanks,
  Otis
  --
  Monitoring * Alerting * Anomaly Detection * Centralized Log Management
  Solr  Elasticsearch Support * http://sematext.com/
 



Re: logging agent based on fuse and kafka: first release

2014-11-25 Thread Neha Narkhede
Great. Thanks for sharing. I added it to our ecosystem
https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem wiki.

On Tue, Nov 25, 2014 at 9:58 AM, yazgoo yaz...@gmail.com wrote:

 Hi,

 First I'd like to thank kafka developers for writing kafka.

 This is an announcement for the first release of a file system logging
 agent based on kafka.

 It is written for collecting logs from servers running all kind of
 software,
 as a generic way to collect logs without needing to know about each logger.

 Home:
 https://github.com/yazgoo/fuse_kafka

 Here are some functionnalities:

- sends all writes to given directories to kafka
- passes through FS syscalls to underlying directory
- captures the pid, gid, uid, user, group, command line doing the write
- you can add metadata to identify from where the message comes from
(e.g. ip-address, ...)
- you can configure kafka destination cluster either by giving a broker
list or a zookeeper list
- you can specify a bandwidth quota: fuse_kafka won't send data if a
file is written more than a given size per second (useful for preventing
floods caused by core files dumped or log rotations in directories
 watched
by fuse_kafka)

 It is based on:

- FUSE (filesystem in userspace), to capture writes done under a given
directory
- kafka (messaging queue), as the event transport system
- logstash: events are written to kafka in logstash format (except
messages and commands which are stored in base64)

 It is written in C and python.

 Packages are provided for various distros, see installing section in
 README.md.
 FUSE adds an overhead, so it should not be used on filesystems where high
 throughput is necessary.
 Here are benchmarks:

 http://htmlpreview.github.io/?https://raw.githubusercontent.com/yazgoo/fuse_kafka/master/benchs/benchmarks.html

 Contributions are welcome, of course!

 Regards



Re: Broker keeps rebalancing

2014-11-13 Thread Neha Narkhede
@Neha, Can you share suggested consumer side GC settings?

Consumer side GC settings are not standard since it is a function of your
application that embeds the consumer. Your consumer application's memory
patterns will dictate your GC settings. Sorry, I know that's not very
helpful, but GC tuning is a dark art :-)

On Thu, Nov 13, 2014 at 9:13 AM, Guozhang Wang wangg...@gmail.com wrote:

 Hey Chen,

 As Neha suggested, typical reason of too many rebalances is that your
 consumers kept being timed out from ZK, and you can verify this by checking
 in your consumer logs for sth. like session timeout entries (these are
 not ERROR entries).

 Guozhang

 Guozhang

 On Wed, Nov 12, 2014 at 5:31 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  Does this help?
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyaretheremanyrebalancesinmyconsumerlog
  ?
 
  On Wed, Nov 12, 2014 at 3:53 PM, Chen Wang chen.apache.s...@gmail.com
  wrote:
 
   Hi there,
   My kafka client is reading a 3 partition topic from kafka with 3
 threads
   distributed on different machines. I am seeing frequent owner changes
 on
   the topics when running:
   bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
   my_test_group --topic mytopic -zkconnect localhost:2181
  
   The owner kept changing once a while, but I didn't see any exceptions
   thrown from the consumer side. When checking broker log, its full of
INFO Closing socket connection to /IP. (kafka.network.Processor)
  
   Is this expected behavior? If so,  how can I tell when  the leader is
   imbalanced, and rebalance is triggered?
   Thanks,
   Chen
  
 



 --
 -- Guozhang



Re: Add partitions with replica assignment in same command

2014-11-12 Thread Neha Narkhede
When we first add partitions, will it change the
assignment of replicas for existing partitions?

Nope. It should not touch the existing partitions.

Also, will there be any issues executing the second reassignment
command which will change the assignment again for the new partitions added?

No. 2nd reassignment should work as expected.

On Wed, Nov 12, 2014 at 2:24 PM, Allen Wang aw...@netflix.com.invalid
wrote:

 I found this JIRA

 https://issues.apache.org/jira/browse/KAFKA-1656

 Now, we have to use two commands to accomplish the goal - first add
 partitions using TopicCommand and then reassign replicas using
 ReassignPartitionsCommand. When we first add partitions, will it change the
 assignment of replicas for existing partitions? This is what we would like
 to avoid. Also, will there be any issues executing the second reassignment
 command which will change the assignment again for the new partitions
 added?




 On Sun, Nov 9, 2014 at 9:01 PM, Jun Rao jun...@gmail.com wrote:

  Yes, it seems that we need to fix the tool to support that. It's probably
  more intuitive to have TopicCommand just take the replica-assignment (for
  the new partitions) when altering a topic. Could you file a jira?
 
  Thanks,
 
  Jun
 
  On Fri, Nov 7, 2014 at 4:17 PM, Allen Wang aw...@netflix.com.invalid
  wrote:
 
   I am trying to figure out how to add partitions and assign replicas
 using
   one admin command. I tried kafka.admin.TopicCommand to increase the
   partition number from 9 to 12 with the following options:
  
   /apps/kafka/bin/kafka-run-class.sh kafka.admin.TopicCommand
 --zookeeper
   ${ZOOKEEPER} --alter --topic test_topic_4 --partitions 12
   --replica-assignment 2:1,0:2,1:0,1:2,2:0,0:1,1:0,2:1,0:2,2:1,0:2,1:0
  
   This gives me an error
  
   Option [replica-assignment] can't be used with option[partitions]
  
   Looking into the TopicCommand, alterTopic function seems to be able to
   handle that but the command exits with the above error before this
  function
   is invoked.
  
   Is there any workaround or other recommended way to achieve this?
  
   Thanks,
   Allen
  
 



Re: Broker keeps rebalancing

2014-11-12 Thread Neha Narkhede
Does this help?
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyaretheremanyrebalancesinmyconsumerlog
?

On Wed, Nov 12, 2014 at 3:53 PM, Chen Wang chen.apache.s...@gmail.com
wrote:

 Hi there,
 My kafka client is reading a 3 partition topic from kafka with 3 threads
 distributed on different machines. I am seeing frequent owner changes on
 the topics when running:
 bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
 my_test_group --topic mytopic -zkconnect localhost:2181

 The owner kept changing once a while, but I didn't see any exceptions
 thrown from the consumer side. When checking broker log, its full of
  INFO Closing socket connection to /IP. (kafka.network.Processor)

 Is this expected behavior? If so,  how can I tell when  the leader is
 imbalanced, and rebalance is triggered?
 Thanks,
 Chen



Re: expanding cluster and reassigning parititions without restarting producer

2014-11-11 Thread Neha Narkhede
The new producer is available in 0.8.2-beta (the most recent Kafka
release). The old producer only detects new partitions at an interval
configured by topic.metadata.refresh.interval.ms. This constraint is no
longer true for the new producer and you would likely end up with an even
distribution of data across all partitions. If you want to stay with the
old producer on 0.8.1.1, you can try reducing
topic.metadata.refresh.interval.ms but it may have some performance impact
on the Kafka cluster since it ends up sending topic metadata requests to
the broker at that interval.

Thanks,
Neha

On Tue, Nov 11, 2014 at 1:45 AM, Shlomi Hazan shl...@viber.com wrote:

 Neha, I understand that the producer kafka.javaapi.producer.Producer shown
 in examples is old,
 and that a new producer (org.apache.kafka.clients.producer) is avail? is it
 available for 0.8.1.1?
 how does it work? does it have a trigger fired when partitions are added or
 does the producer refresh some cache every some given time period?

 Shlomi


 On Tue, Nov 11, 2014 at 4:25 AM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  How can I auto refresh keyed producers to use new partitions as these
  partitions are added?
 
  Try using the new producer under org.apache.kafka.clients.producer.
 
  Thanks,
  Neha
 
  On Mon, Nov 10, 2014 at 8:52 AM, Bhavesh Mistry 
  mistry.p.bhav...@gmail.com
  wrote:
 
   I had different experience with expanding partition for new producer
 and
   its impact.  I only tried for non-key message.I would always advice
  to
   keep batch size relatively low or plan for expansion with new java
  producer
   in advance or since inception otherwise running producer code is
  impacted.
  
   Here is mail chain:
  
  
 
 http://mail-archives.apache.org/mod_mbox/kafka-dev/201411.mbox/%3ccaoejijit4cgry97dgzfjkfvaqfduv-o1x1kafefbshgirkm...@mail.gmail.com%3E
  
   Thanks,
  
   Bhavesh
  
   On Mon, Nov 10, 2014 at 5:20 AM, Shlomi Hazan shl...@viber.com
 wrote:
  
Hmmm..
The Java producer example seems to ignore added partitions too...
How can I auto refresh keyed producers to use new partitions as these
partitions are added?
   
   
On Mon, Nov 10, 2014 at 12:33 PM, Shlomi Hazan shl...@viber.com
  wrote:
   
 One more thing:
 I saw that the Python client is also unaffected by addition of
   partitions
 to a topic and that it continues to send requests only to the old
 partitions.
 is this also handled appropriately by the Java producer? Will he
 see
   the
 change and produce to the new partitions as well?
 Shlomi

 On Mon, Nov 10, 2014 at 9:34 AM, Shlomi Hazan shl...@viber.com
   wrote:

 No I don't see anything like that, the question was aimed at
  learning
   if
 it is worthwhile to make the effort of reimplementing the Python
producer
 in Java, I so I will not make all the effort just to be
 disappointed
 afterwards.
 understand I have nothing to worry about, so I will try to
 simulate
   this
 situation in small scale...
 maybe 3 brokers, one topic with one partition and then add
  partitions.
 we'll see.
 thanks for clarifying.
 Oh, Good luck with Confluent!!
 :)

 On Mon, Nov 10, 2014 at 4:17 AM, Neha Narkhede 
   neha.narkh...@gmail.com

 wrote:

 The producer might get an error code if the leader of the
  partitions
 being
 reassigned also changes. However it should retry and succeed. Do
  you
see
 a
 behavior that suggests otherwise?

 On Sat, Nov 8, 2014 at 11:45 PM, Shlomi Hazan shl...@viber.com
wrote:

  Hi All,
  I recently had an issue producing from python where expanding a
cluster
  from 3 to 5 nodes and reassigning partitions forced me to
 restart
   the
  producer b/c of KeyError thrown.
  Is this situation handled by the Java producer automatically or
   need
I
 do
  something to have the java producer refresh itself to see the
 reassigned
  partition layout and produce away ?
  Shlomi
 




   
  
 



Re: expanding cluster and reassigning parititions without restarting producer

2014-11-10 Thread Neha Narkhede
How can I auto refresh keyed producers to use new partitions as these
partitions are added?

Try using the new producer under org.apache.kafka.clients.producer.

Thanks,
Neha

On Mon, Nov 10, 2014 at 8:52 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com
wrote:

 I had different experience with expanding partition for new producer and
 its impact.  I only tried for non-key message.I would always advice to
 keep batch size relatively low or plan for expansion with new java producer
 in advance or since inception otherwise running producer code is impacted.

 Here is mail chain:

 http://mail-archives.apache.org/mod_mbox/kafka-dev/201411.mbox/%3ccaoejijit4cgry97dgzfjkfvaqfduv-o1x1kafefbshgirkm...@mail.gmail.com%3E

 Thanks,

 Bhavesh

 On Mon, Nov 10, 2014 at 5:20 AM, Shlomi Hazan shl...@viber.com wrote:

  Hmmm..
  The Java producer example seems to ignore added partitions too...
  How can I auto refresh keyed producers to use new partitions as these
  partitions are added?
 
 
  On Mon, Nov 10, 2014 at 12:33 PM, Shlomi Hazan shl...@viber.com wrote:
 
   One more thing:
   I saw that the Python client is also unaffected by addition of
 partitions
   to a topic and that it continues to send requests only to the old
   partitions.
   is this also handled appropriately by the Java producer? Will he see
 the
   change and produce to the new partitions as well?
   Shlomi
  
   On Mon, Nov 10, 2014 at 9:34 AM, Shlomi Hazan shl...@viber.com
 wrote:
  
   No I don't see anything like that, the question was aimed at learning
 if
   it is worthwhile to make the effort of reimplementing the Python
  producer
   in Java, I so I will not make all the effort just to be disappointed
   afterwards.
   understand I have nothing to worry about, so I will try to simulate
 this
   situation in small scale...
   maybe 3 brokers, one topic with one partition and then add partitions.
   we'll see.
   thanks for clarifying.
   Oh, Good luck with Confluent!!
   :)
  
   On Mon, Nov 10, 2014 at 4:17 AM, Neha Narkhede 
 neha.narkh...@gmail.com
  
   wrote:
  
   The producer might get an error code if the leader of the partitions
   being
   reassigned also changes. However it should retry and succeed. Do you
  see
   a
   behavior that suggests otherwise?
  
   On Sat, Nov 8, 2014 at 11:45 PM, Shlomi Hazan shl...@viber.com
  wrote:
  
Hi All,
I recently had an issue producing from python where expanding a
  cluster
from 3 to 5 nodes and reassigning partitions forced me to restart
 the
producer b/c of KeyError thrown.
Is this situation handled by the Java producer automatically or
 need
  I
   do
something to have the java producer refresh itself to see the
   reassigned
partition layout and produce away ?
Shlomi
   
  
  
  
  
 



Re: expanding cluster and reassigning parititions without restarting producer

2014-11-09 Thread Neha Narkhede
The producer might get an error code if the leader of the partitions being
reassigned also changes. However it should retry and succeed. Do you see a
behavior that suggests otherwise?

On Sat, Nov 8, 2014 at 11:45 PM, Shlomi Hazan shl...@viber.com wrote:

 Hi All,
 I recently had an issue producing from python where expanding a cluster
 from 3 to 5 nodes and reassigning partitions forced me to restart the
 producer b/c of KeyError thrown.
 Is this situation handled by the Java producer automatically or need I do
 something to have the java producer refresh itself to see the reassigned
 partition layout and produce away ?
 Shlomi



Re: Interrupting controlled shutdown breaks Kafka cluster

2014-11-09 Thread Neha Narkhede
We fixed a couple issues related to automatic leader balancing and
controlled shutdown. Would you mind trying out 0.8.2-beta?

On Fri, Nov 7, 2014 at 11:52 AM, Solon Gordon so...@knewton.com wrote:

 We're using 0.8.1.1 with auto.leader.rebalance.enable=true.

 On Fri, Nov 7, 2014 at 2:35 PM, Guozhang Wang wangg...@gmail.com wrote:

  Solon,
 
  Which version of Kafka are you running and are you enabling auto leader
  rebalance at the same time?
 
  Guozhang
 
  On Fri, Nov 7, 2014 at 8:41 AM, Solon Gordon so...@knewton.com wrote:
 
   Hi all,
  
   My team has observed that if a broker process is killed in the middle
 of
   the controlled shutdown procedure, the remaining brokers start spewing
   errors and do not properly rebalance leadership. The cluster cannot
  recover
   without major manual intervention.
  
   Here is how to reproduce the problem:
   1. Create a Kafka 0.8.1.1 cluster with three brokers. (Let's call them
 A,
   B, and C.) Set controlled.shutdown.enable=true.
   2. Create a topic with replication_factor = 3 and a large number of
   partitions (say 100).
   3. Send a TERM signal to broker A. This initiates controlled shutdown.
   4. Before controlled shutdown completes, quickly send a KILL signal to
   broker A.
  
   Result:
   - Brokers B and C start logging ReplicaFetcherThread connection errors
   every few milliseconds. (See below for an example.)
   - Broker A is still listed as a leader and ISR for any partitions which
   were not transferred during controlled shutdown. This causes connection
   errors when clients try to produce to or consume from these partitions.
  
   This scenario is difficult to recover from. The only ways we have found
  are
   to restart broker A multiple times (if it still exists) or to kill
 both B
   and C and then start them one by one. Without this kind of
 intervention,
   the above issues persist indefinitely.
  
   This may sound like a contrived scenario, but it's exactly what we have
   seen when a Kafka EC2 instance gets terminated by AWS. So this seems
  like a
   real liability.
  
   Are there any existing JIRA tickets which cover this behavior? And do
 you
   have any recommendations for avoiding it, other than forsaking
 controlled
   shutdowns entirely?
  
   Thanks,
   Solon
  
   Error example:
   [2014-11-06 17:10:21,459] ERROR [ReplicaFetcherThread-0-1978259225],
  Error
   in fetch Name: FetchRequest; Version: 0; CorrelationId: 3500; ClientId:
   ReplicaFetcherThread-0-1978259225; ReplicaId: 1359390395; MaxWait: 500
  ms;
   MinBytes: 1 bytes; RequestInfo: [my-topic,42] -
   PartitionFetchInfo(503,10485760),[my-topic,63] -
   PartitionFetchInfo(386,10485760),[my-topic,99] -
   PartitionFetchInfo(525,10485760),[my-topic,84] -
   PartitionFetchInfo(436,10485760),[my-topic,48] -
   PartitionFetchInfo(484,10485760),[my-topic,75] -
   PartitionFetchInfo(506,10485760),[my-topic,45] -
   PartitionFetchInfo(473,10485760),[my-topic,66] -
   PartitionFetchInfo(532,10485760),[my-topic,30] -
   PartitionFetchInfo(435,10485760),[my-topic,96] -
   PartitionFetchInfo(517,10485760),[my-topic,27] -
   PartitionFetchInfo(470,10485760),[my-topic,36] -
   PartitionFetchInfo(472,10485760),[my-topic,9] -
   PartitionFetchInfo(514,10485760),[my-topic,33] -
   PartitionFetchInfo(582,10485760),[my-topic,69] -
   PartitionFetchInfo(504,10485760),[my-topic,57] -
   PartitionFetchInfo(444,10485760),[my-topic,78] -
   PartitionFetchInfo(559,10485760),[my-topic,12] -
   PartitionFetchInfo(417,10485760),[my-topic,90] -
   PartitionFetchInfo(429,10485760),[my-topic,18] -
   PartitionFetchInfo(497,10485760),[my-topic,0] -
   PartitionFetchInfo(402,10485760),[my-topic,6] -
   PartitionFetchInfo(527,10485760),[my-topic,54] -
   PartitionFetchInfo(524,10485760),[my-topic,15] -
   PartitionFetchInfo(448,10485760),[console,0] -
   PartitionFetchInfo(4,10485760) (kafka.server.ReplicaFetcherThread)
   java.net.ConnectException: Connection refused
   at sun.nio.ch.Net.connect0(Native Method)
   at sun.nio.ch.Net.connect(Net.java:465)
   at sun.nio.ch.Net.connect(Net.java:457)
   at
  sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
   at
  kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
   at
 kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
   at
  kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
   at
   kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
   at
  
  
 
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
   at
  
  
 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
   at
  
  
 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
   at
  
  
 
 

Re: Rebalance not happening even after increasing max retries causing conflict in ZK

2014-11-09 Thread Neha Narkhede
A rebalance should trigger on all consumers when you add a new consumer to
the group. If you don't see the zookeeper watch fire, the consumer may have
somehow lost the watch. We have seen this behavior on older zk versions, I
wonder it that bug got reintroduced. To verify if this is the case, you can
run the wchp zookeeper command on the zk leader and check if each consumer
has a watch registered.

Do you have a way to try this on zk 3.3.4? I would recommend you try the
wchp suggestion as well.

On Fri, Nov 7, 2014 at 6:07 AM, Mohit Kathuria mkathu...@sprinklr.com
wrote:

 Hi all,

 Can someone help here. We are getting constant rebalance failure each time
 a consumer is added beyond a certain number. Did quite a lot of debugging
 on this and still not able to figure out the pattern.

 -Thanks,
 Mohit

 On Mon, Nov 3, 2014 at 10:53 PM, Mohit Kathuria mkathu...@sprinklr.com
 wrote:

  Neha,
 
  Looks like an issue with the consumer rebalance not able to complete
  successfully. We were able to reproduce the issue on topic with 30
  partitions,  3 consumer processes(p1,p2 and p3), properties -  40
  rebalance.max.retries and 1(10s) rebalance.backoff.ms.
 
  Before the process p3 was started, partition ownership was as expected:
 
  partitions 0-14 owned by p1
  partitions 15-29 - owner p2
 
  As the process p3 started, rebalance was triggered. Process p3 was
  successfully able to acquire partition ownership for partitions 20-29 as
  expected as per the rebalance algorithm. However, process p2 while trying
  to acquire ownership of partitions 10-19 saw rebalance failure after 40
  retries.
 
  Attaching the logs from process p2 and process p1. It says that p2 was
  attempting to rebalance, it was trying to acquire ownership of partitions
  10-14 which were owned by process p1. However, at the same time process
 p1
  did not get any event for giving up the partition ownership for
 partitions
  1-14.
  We were expecting a rebalance to have triggered in p1 - but it didn't and
  hence not giving up ownership. Is our assumption correct/incorrect?
  And if the rebalance gets triggered in p1 - how to figure out apart from
  logs as the logs on p1 did not have anything.
 
  *2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO]
  [topic_consumerIdString], waiting for the partition ownership to be
  deleted: 11*
 
  During and after the rebalance failed on process p2, Partition Ownership
  was as below:
  0-14 - owner p1
  15-19 - none
  20-29 - owner p3
 
  This left the consumers in inconsistent state as 5 partitions were never
  consumer from and neither was the partitions ownership balanced.
 
  However, there was no conflict in creating the ephemeral node which was
  the case last time. Just to note that the ephemeral node conflict which
 we
  were seeing earlier also appeared after rebalance failed. My hunch is
 that
  fixing the rebalance failure will fix that issue as well.
 
  -Thanks,
  Mohit
 
 
 
  On Mon, Oct 20, 2014 at 7:48 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
 
  Mohit,
 
  I wonder if it is related to
  https://issues.apache.org/jira/browse/KAFKA-1585. When zookeeper
 expires
  a
  session, it doesn't delete the ephemeral nodes immediately. So if you
 end
  up trying to recreate ephemeral nodes quickly, it could either be in the
  valid latest session or from the previously expired session. If you hit
  this problem, then waiting would resolve it. But if not, then this may
 be
  a
  legitimate bug in ZK 3.4.6.
 
  Can you try shutting down all your consumers, waiting until session
  timeout
  and restarting them?
 
  Thanks,
  Neha
 
  On Mon, Oct 20, 2014 at 6:15 AM, Mohit Kathuria mkathu...@sprinklr.com
 
  wrote:
 
   Dear Experts,
  
   We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I have of
   topic with 30 partitions and 2 replicas. We are using High level
  consumer
   api.
   Each consumer process which is a storm topolofy has 5 streams which
   connects to 1 or more partitions. We are not using storm's inbuilt
 kafka
   spout. Everything runs fine till the 5th consumer process(25 streams)
 is
   added for this topic.
  
   As soon as the sixth consumer process is added, the newly added
  partition
   does not get the ownership of the partitions that it requests for as
 the
   already existing owners have not yet given up the ownership.
  
   We changed certain properties on consumer :
  
   1. Max Rebalance attempts - 20 ( rebalance.backoff.ms *
   rebalance.max.retries  zk connection timeout)
   2. Back off ms between rebalances - 1 (10seconds)
   3. ZK connection timeout - 100,000 (100 seconds)
  
   Although when I am looking in the zookeeper shell when the rebalance
 is
   happening, the consumer is registered fine on the zookeeper. Just that
  the
   rebalance does not happen.
   After the 20th rebalance gets completed, we get
  
  
   *2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO]
   [rule-engine-feed_ip-10-0-2-170-1413025767369

Re: Strategies for high-concurrency consumers

2014-11-06 Thread Neha Narkhede
Jack,

Zookeeper is likely the bottleneck if rebalancing takes a very long time.
As Jay said, this will be addressed in the consumer rewrite planned for
0.9. Few more workarounds that were tried at LinkedIn - 1) To deploy
Zookeeper on SSDs and 2) Turning sync on every write off
(zookeeper.forceSync). I'm not sure if #2 negatively affected the
consistency of the zookeeper data ever but it did help with speeding up the
rebalancing.

THanks,
Neha

On Thu, Nov 6, 2014 at 11:31 AM, Jay Kreps jay.kr...@gmail.com wrote:

 Unfortunately the performance of the consumer balancing scales poorly with
 the number of partitions. This is one of the things the consumer rewrite
 project is meant to address, however that is not complete yet. A reasonable
 workaround may be to decouple your application parallelism from the number
 of partitions. I.e. have the processing of each partition happen in a
 threadpool. I'm assuming that you don't actually have 2,500 machines, just
 that you need that much parallelism since each messages takes a bit of time
 to process. This does weaken the delivery ordering, but you may be able to
 shard the processing by key to solve that problem.

 -Jay

 On Thu, Nov 6, 2014 at 10:59 AM, Jack Foy j...@whitepages.com wrote:

  Hi all,
 
  We are building a system that will carry a high volume of traffic (on the
  order of 2 billion messages in each batch), which we need to process at a
  rate of 50,000 messages per second. We need to guarantee at-least-once
  delivery for each message. The system we are feeding has a latency of
 50ms
  per message, and can absorb many concurrent requests.
 
  We have a Kafka 0.8.1.1 cluster with three brokers and a Zookeeper 3.4.5
  cluster with 5 nodes, each on physical hardware.
 
  We intend to deploy a consumer group of 2500 consumers against a single
  topic, with a partition for each consumer. We expect our consumers to be
  stable over the course of the run, so we expect rebalancing to be rare.
 In
  testing, we have successfully run 512 high-level consumers against 1024
  partitions, but beyond 512 consumers the rebalance at startup doesn’t
  complete within 10 minutes. Is this a workable strategy with high-level
  consumers? Can we actually deploy a consumer group with this many
 consumers
  and partitions?
 
  We see throughput of more than 500,000 messages per second with our 512
  consumers, but we need greater parallelism to meet our performance needs.
 
  --
  Jack Foy j...@whitepages.com
 
 
 
 



Re: High CPU usage of Crc32 on Kafka broker

2014-11-06 Thread Neha Narkhede
Allen,

Apache mailing lists don't allow attachments. Could you please link to a
pastebin or something?

Thanks,
Neha

On Thu, Nov 6, 2014 at 12:02 PM, Allen Wang aw...@netflix.com.invalid
wrote:

 After digging more into the stack trace got from flight recorder (which is
 attached), it seems that Kafka (0.8.1.1) can optimize the usage of Crc32.
 The stack trace shows that Crc32 is invoked twice from Log.append(). First
 is from the line number 231:

 val appendInfo = analyzeAndValidateMessageSet(messages)

 The second time is from line 252 in the same function:

 validMessages = validMessages.assignOffsets(offset, appendInfo.codec)

 If one of the Crc32 invocation can be eliminated, we are looking at saving
 at least 7% of CPU usage.

 Thanks,
 Allen




 On Wed, Nov 5, 2014 at 6:32 PM, Allen Wang aw...@netflix.com wrote:

 Hi,

 Using flight recorder, we have observed high CPU usage of CRC32
 (kafka.utils.Crc32.update()) on Kafka broker. It uses as much as 25% of CPU
 on an instance. Tracking down stack trace, this method is invoked by
 ReplicaFetcherThread.

 Is there any tuning we can do to reduce this?

 Also on the topic of CPU utilization, we observed that overall CPU
 utilization is proportional to AllTopicsBytesInPerSec metric. Does this
 metric include incoming replication traffic?

 Thanks,
 Allen





Re: nulls found in topic, created by recovery?

2014-11-06 Thread Neha Narkhede
IIRC, the bug that introduced the nulls was related to compressed data. Is
this topic compressed? Did you try to run a consumer through the topic's
data or alternately the DumpLogSegments tool?

On Thu, Nov 6, 2014 at 12:56 PM, Neil Harkins nhark...@gmail.com wrote:

 Hi all. I saw something weird yesterday on our leaf instances
 which run kafka 0.7.2 (and mirror to kafka 0.8 via our custom code).
 I fully realize everyone's instinctual response is upgrade, already.,
 but I'd like to have an internals discussion to better understand
 what happened, as I suspect it's still relevant in 0.8.

 Basically, in one of our topics there was an 8k stretch of nulls.
 Correlating timestamps from the messages bracketing the nulls
 to the kafka log, I see that the server restarted during that time,
 and here are the recovery lines related to the topic with the nulls:

 [2014-11-04 00:48:07,602] INFO zookeeper state changed (SyncConnected)
 (org.I0Itec.zkclient.ZkClient)
 [2014-11-04 01:00:35,806] INFO Shutting down Kafka server
 (kafka.server.KafkaServer)
 [2014-11-04 01:00:35,813] INFO shutdown scheduler kafka-logcleaner-
 (kafka.utils.KafkaScheduler)
 [2014-11-04 01:01:38,411] INFO Starting Kafka server...
 (kafka.server.KafkaServer)
 ...
 [2014-11-04 01:01:49,146] INFO Loading log 'foo.bar-0'
 (kafka.log.LogManager)
 [2014-11-04 01:01:49,147] INFO Loading the last segment
 /var/kafka-leaf-spool/foo.bar-0/002684355423.kafka in mutable
 mode, recovery true (kafka.log.Log)
 [2014-11-04 01:01:55,877] INFO recover high water mark:414004449
 (kafka.message.FileMessageSet)
 [2014-11-04 01:01:55,877] INFO Recovery succeeded in 6 seconds. 0
 bytes truncated. (kafka.message.FileMessageSet)

 The only hypothesis I can come up with is that the shutdown
 (graceful?) did not wait for all messages to flush to disk
 (we're configured with: log.flush.interval=1,
 log.default.flush.interval.ms=500, and
 log.default.flush.scheduler.interval.ms=500),
 but the max offset was recorded, so that when it came back up,
 it filled the gap with nulls to reach the valid max offset in case
 any consumers were at the end.

 But for consumers with a position prior to all the nulls,
 are they guaranteed to get back on the rails so-to-speak?
 Nulls appear as v0(i.e. magic) messages of 0 length,
 but the messages replaced could be variable length.

 Thanks in advance for any input,
 -neil



Re: Interaction of retention settings for broker and topic plus partitions

2014-11-06 Thread Neha Narkhede
To clarify though, is it correct that a per topic limit will always
override the default limit of the same type?  (e.g. a large per-topic
retention hours vs. a small default retention hours)?

That's correct.

On Thu, Nov 6, 2014 at 9:34 AM, Jason Rosenberg j...@squareup.com wrote:

 Jun,

 To clarify though, is it correct that a per topic limit will always
 override the default limit of the same type?  (e.g. a large per-topic
 retention hours vs. a small default retention hours)?

 Jason

 On Sat, Sep 20, 2014 at 12:28 AM, Jun Rao jun...@gmail.com wrote:

  That's right. The rule is that a log segment is deleted if either the
 size
  or the time limit is reached. Log sizes are per partition.
 
  Thanks,
 
  Jun
 
  On Thu, Sep 18, 2014 at 2:55 PM, Cory Watson gp...@keen.io wrote:
 
   Hello all!
  
   I'm curious about the interaction of server and topic level retention
   settings. It's not clear to me the precedence of the follow:
  
  - broker's default log.retention.bytes
  - topic's retention.bytes (which defaults to broker's
  log.retention.bytes)
  - broker's log.retention.hours and log.retention.minutes (if both
 are
  specified then it seems to be the lower of the two, since it's when
  either is exceeded)
  
   It seems that the rule is that when any of these are violated then the
  log
   segment is deleted. Is this right?
  
   Also, just to be clear: The log sizes in questions are for a single
   partitions logs?
  
   I have a situation where my per-topic retention.bytes is very high, but
  my
   default log.retention.hours is lower (the default @ 168 hours). It
 seems
   that it's truncating at the log.retention.hours instead of the topic's
   retention.bytes.
  
   Am I understanding this correctly? :)
  
   --
   Cory Watson
   Principal Infrastructure Engineer // Keen IO
  
 



Re: Kafka Release timelines

2014-11-06 Thread Neha Narkhede
0.8.2 should be available in a month. Though 0.9 might take a couple more
months and there is a good chance that it will not be this year.

Thanks,
Neha

On Thu, Nov 6, 2014 at 3:01 AM, dinesh kumar dinesh...@gmail.com wrote:

 Hi,
 I found the future release plan wiki here
 https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan .
 I
 see that 0.8.2 is still in beta even though it was stated for September.
 What is the expected date for 0.9 release?

 Thanks,
 Dinesh



Re: Consumer lag keep increasing

2014-11-06 Thread Neha Narkhede
Chen,

Consumers lag either due to an I/O or network bottleneck or due to slow
processing of messages by the user. To confirm that you are not hitting the
latter issue, you can run a console consumer on the same data and observe
the throughput that it provides and it's lag.

Thanks,
Neha

On Wed, Nov 5, 2014 at 3:31 PM, Chen Wang chen.apache.s...@gmail.com
wrote:

 Guozhang,
 I can see message keep coming, meaning messages are being consumed, right?
 But the lag is pretty huge (average 30m messages behind) as you can see
 from the graph:

 https://www.dropbox.com/s/xli25zicxv5f2qa/Screenshot%202014-11-05%2015.23.05.png?dl=0

 My understanding is that for such light weight thread, the consumer should
 almost be at the same pace with the producer. I also checked the machine
 metrics, and nothing pegged there.

 I am also moving the testing application to a separate dev cluster. In your
 experience, what things might cause the slow reading? Is this more like a
 server side thing, or consumer side?

 Chen

 On Wed, Nov 5, 2014 at 3:10 PM, Guozhang Wang wangg...@gmail.com wrote:

  Chen,
 
  Your configs seems fine.
 
  Could you use ConsumerOffsetChecker tool to see if the offset is
 advancing
  at all (i.e. messages are comsumed), and if yes get some thread dumps and
  check if your consumer is blocked on some locks?
 
  Guozhang
 
  On Wed, Nov 5, 2014 at 2:01 PM, Chen Wang chen.apache.s...@gmail.com
  wrote:
 
   Hey Guys,
   I have a really simply storm topology with a kafka spout, reading from
   kafka through high level consumer. Since the topic has 30 partitions,
 we
   have 30 threads in the spout reading from it. However, it seems that
 the
   lag keeps increasing even the thread only read the message and do
  nothing.
   The largest message size  are around 30KB, and the incoming rate can be
  as
   hight as 14k/seconds. There are 3 brokers on some high config bare
 metal
   machines. The client side config is like this:
  
   kafka.config.fetch.message.max.bytes3145728
   kafka.config.group.id   spout_readonly
   kafka.config.rebalance.backoff.ms   6000
   kafka.config.rebalance.max.retries  6
   kafka.config.zookeeper.connect  dare-broker00.sv.walmartlabs.com:2181,
   dare-broker01.sv.walmartlabs.com:2181,
   dare-broker02.sv.walmartlabs.com:2181
   kafka.config.zookeeper.session.timeout.ms   6
  
   what could possibly cause this huge lag? Will broker be a bottle neck,
 or
   some config need to be adjusted? The server side config is like this:
  
   replica.fetch.max.bytes=2097152
   message.max.bytes=2097152
   num.network.threads=4
   num.io.threads=4
  
   # The send buffer (SO_SNDBUF) used by the socket server
   socket.send.buffer.bytes=4194304
  
   # The receive buffer (SO_RCVBUF) used by the socket server
   socket.receive.buffer.bytes=2097152
  
   # The maximum size of a request that the socket server will accept
   (protection against OOM)
   socket.request.max.bytes=104857600
  
   Any help appreciated!
   Chen
  
 
 
 
  --
  -- Guozhang
 



Re: Kafka Cluster disaster decovery

2014-11-06 Thread Neha Narkhede
A common solution for disaster recovery is to mirror the Kafka cluster into
another one deployed in a separate data center. The mirroring is not
synchronous so there might be some message loss when you lose the entire
cluster in some disaster.

Thanks,
Neha

On Mon, Nov 3, 2014 at 7:43 AM, Guozhang Wang wangg...@gmail.com wrote:

 Yingkai,

 Kafka uses persistent storage so the data written to it will not be lost,
 you just need to restart the cluster. But during the down time it will
 become un-available.

 Guozhang



 On Fri, Oct 31, 2014 at 2:06 PM, Yingkai Hu yingka...@gmail.com wrote:

  Hi All,
 
  I’m new to Kafka, please direct me to the right path if it is a duplicate
  question.
 
  Basically I deployed Kafka to a 4 machine cluster, what if the whole
  cluster went down, does kafka provide any backup/restore mechanism?
 Please
  advise.
 
  Thanks!
  Yingkai




 --
 -- Guozhang



Re: corrupt message

2014-11-06 Thread Neha Narkhede
This may be due to a bug in the client. Non-java Kafka clients are
maintained by the individual client owners. You might want to ping the
owner of your library directly.

On Mon, Nov 3, 2014 at 7:21 AM, Fredrik S Loekke f...@lindcapital.com
wrote:

  When running a C# producer against a kafka 0.8.1.1 server running on a
 virtual linux (virtualbox, Ubuntu) I keep getting the following error:



 [2014-11-03 15:19:08,595] ERROR [KafkaApi-0] Error processing
 ProducerRequest with correlation id 601 from client Kafka-Net on partition
 [x,0] (kafka.server.KafkaApis)

 kafka.message.InvalidMessageException: Message is corrupt (stored crc =
 1767811542, computed crc = 1256103753)

 at kafka.message.Message.ensureValid(Message.scala:166)

 at
 kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:330)

 at
 kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:318)

 at scala.collection.Iterator$class.foreach(Iterator.scala:772)

 at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)

 at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:318)

 at kafka.log.Log.append(Log.scala:231)

 at
 kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)

 at
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)

 at
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)

 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)

 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)

 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)

 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)

 at scala.collection.Iterator$class.foreach(Iterator.scala:772)

 at
 scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)

 at
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)

 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)

 at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)

 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:233)

 at scala.collection.mutable.HashMap.map(HashMap.scala:45)

 at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)

 at
 kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)

 at kafka.server.KafkaApis.handle(KafkaApis.scala:185)

 at
 kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)

 at java.lang.Thread.run(Thread.java:745)



 Any suggestion on how to resolve this issue?



 Best regards / Med venlig hilsen



 *Fredrik Skeel Løkke*

 Software Developer ǀ IT  Analysis



 Mob.: +45 3176 8438

 f...@lindcapital.com



 [image: Beskrivelse: Beskrivelse: Beskrivelse: Beskrivelse:
 cid:image001.png@01CD4A0C.218B6960]



 Lind Capital A/S

 Værkmestergade 3, 2

 DK-8000 Aarhus C

 www.lindcapital.com

 Follow us on

 ­[image: linkedin] http://www.linkedin.com/company/lind-capital-a-s  [image:
 facebook] http://www.facebook.com/LindCapital





Re: Dynamically adding Kafka brokers

2014-11-04 Thread Neha Narkhede
I agree that KAFKA-1070 would be great to get in. I especially felt the
need for something like this while using a few other systems that automated
the port, id etc to give a good OOTB experience. Sorry, I lost track of the
review. Will do so in the next few days.

Thanks,
Neha

On Mon, Nov 3, 2014 at 3:56 PM, Jay Kreps jay.kr...@gmail.com wrote:

 I agree it would be really nice to get KAFKA-1070 figured out.

 FWIW, the reason for having a name or id other than ip was to make it
 possible to move the identity to another physical server (e.g. scp the data
 directory) and have it perform the same role on that new piece of hardware.
 Systems that tie the data to a particular host tend to be sort of hated on
 since you can't do anything simple/stupid to back them up or replace them.

 -Jay

 On Mon, Nov 3, 2014 at 2:23 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

  +1
  Thats what we use to generate broker id in automatic deployments.
  This method makes troubleshooting easier (you know where each broker is
  running), and doesn't require keeping extra files around.
 
  On Mon, Nov 3, 2014 at 2:17 PM, Joe Stein joe.st...@stealth.ly wrote:
 
   Most folks strip the IP and use that as the broker.id. KAFKA-1070 does
  not
   yet accommodate for that very widely used method. I think it would be
 bad
   if KAFKA-1070 only worked for new installations because that is how
  people
   use Kafka today (per
  
  
 
 https://issues.apache.org/jira/browse/KAFKA-1070?focusedCommentId=14085808page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14085808
   )
  
   On Mon, Nov 3, 2014 at 2:12 PM, Joel Koshy jjkosh...@gmail.com
 wrote:
  
KAFKA-1070 will help with this and is pending a review.
   
On Mon, Nov 03, 2014 at 05:03:20PM -0500, Otis Gospodnetic wrote:
 Hi,

 How do people handle situations, and specifically the broker.id
property,
 where the Kafka (broker) cluster is not fully defined right away?

 Here's the use case we have at Sematext:
 * Our software ships as a VM
 * All components run in this single VM, including 1 Kafka broker
 * Of course, this is just for a nice OOTB experience, but to scale
  one
 needs to have more instances of this VM, including more Kafka
 brokers
 * *One can clone our VM and launch N instances of it, but because
 we
have a
 single Kafka broker config with a single broker.id 
 http://broker.id
  
   in
 it, one can't just launch more of these VMs and expect to see more
   Kafka
 brokers join the cluster.  One would have to change the broker.id
 http://broker.id on each new VM instance.*

 How do others handle this in a software that is packages and ships
 to
user
 and is not in your direct control to allow you to edit configs?

 Would it be best to have a script that connect to ZooKeeper to get
  the
list
 of all existing brokers and their IDs and then generate a new
  distinct
ID +
 config for the new Kafka broker?

 Or are there slicker ways to do this that people use?

 Thanks,
 Otis
 --
 Monitoring * Alerting * Anomaly Detection * Centralized Log
  Management
 Solr  Elasticsearch Support * http://sematext.com/
   
   
  
 



Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-10-30 Thread Neha Narkhede
Also the 0.9 consumer javadoc is here -
http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html

Thanks,
Neha

On Thu, Oct 30, 2014 at 5:00 AM, Joe Stein joe.st...@stealth.ly wrote:

 Hey, yeah!

 For the new producer
 https://archive.apache.org/dist/kafka/0.8.2-beta/java-doc/

 The java consumer is slated in 0.9 more on that here

 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /

 On Thu, Oct 30, 2014 at 6:08 AM, Pierre-Yves Ritschard p...@spootnik.org
 wrote:

  Hi Joe et al.
 
  Congrats on the beta release!
  Do I read correctly that libraries can now rely on
  org.apache.kafka/kafka-clients which does not pull in scala anymore ?
 
  If so, awesome!
 
- pyr
 
  On Tue, Oct 28, 2014 at 2:01 AM, Libo Yu yu_l...@hotmail.com wrote:
 
   Congrats! When do you think the final 0.82 will be released?
  
To: annou...@apache.org; users@kafka.apache.org;
 d...@kafka.apache.org
Subject: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released
Date: Tue, 28 Oct 2014 00:50:35 +
From: joest...@apache.org
   
The Apache Kafka community is pleased to announce the beta release
 for
   Apache Kafka 0.8.2.
   
The 0.8.2-beta release introduces many new features, improvements and
   fixes including:
 - A new Java producer for ease of implementation and enhanced
   performance.
 - Delete topic support.
 - Per topic configuration of preference for consistency over
   availability.
 - Scala 2.11 support and dropping support for Scala 2.8.
 - LZ4 Compression.
   
All of the changes in this release can be found:
   https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html
   
Apache Kafka is high-throughput, publish-subscribe messaging system
   rethought of as a distributed commit log.
   
** Fast = A single Kafka broker can handle hundreds of megabytes of
   reads and
writes per second from thousands of clients.
   
** Scalable = Kafka is designed to allow a single cluster to serve
 as
   the central data backbone
for a large organization. It can be elastically and transparently
   expanded without downtime.
Data streams are partitioned and spread over a cluster of machines to
   allow data streams
larger than the capability of any single machine and to allow
 clusters
   of co-ordinated consumers.
   
** Durable = Messages are persisted on disk and replicated within
 the
   cluster to prevent
data loss. Each broker can handle terabytes of messages without
   performance impact.
   
** Distributed by Design = Kafka has a modern cluster-centric design
   that offers
strong durability and fault-tolerance guarantees.
   
You can download the release from:
   http://kafka.apache.org/downloads.html
   
We welcome your help and feedback. For more information on how to
report problems, and to get involved, visit the project website at
   http://kafka.apache.org/
   
  
  
 



Re: Consumer cannot find leader after leadership changes on broker side

2014-10-29 Thread Neha Narkhede
Which Kafka version are you on for the consumer? We recently fixed a NPE
related to ZkClient, could you give 0.8.2-beta a try on your consumer and
retry the test?

On Wed, Oct 29, 2014 at 10:34 AM, Allen Wang allenxw...@gmail.com wrote:

 After executing PreferredReplicaLeaderElectionCommand on broker instance,
 we observed one of the consumers cannot find the leadership and stopped
 consuming. The following exception is all over the log file and it appears
 that the consumer cannot recover from it:

 2014-10-29 00:53:30,492 WARN

 surorouter-logsummary_surorouter-logsummary-i-eaef7107-1413327811303-4afb7b23-leader-finder-thread
 ConsumerFetcherManager$LeaderFinderThread -

 [surorouter-logsummary_surorouter-logsummary-i-eaef7107-1413327811303-4afb7b23-leader-finder-thread],
 Failed to find leader for Set([nf_errors_log,28], [nf_errors_log,29])
 java.lang.NullPointerException
 at
 org.I0Itec.zkclient.ZkConnection.getChildren(ZkConnection.java:99)
 at org.I0Itec.zkclient.ZkClient$2.call(ZkClient.java:416)
 at org.I0Itec.zkclient.ZkClient$2.call(ZkClient.java:413)
 at
 org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
 at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:413)
 at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:409)
 at
 kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:487)
 at kafka.utils.ZkUtils$.getAllBrokersInCluster(ZkUtils.scala:84)
 at
 kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:65)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)


 Except for this instance, other consumer instances are fine. Is there
 a workaround? Should we report it as a bug?

 Thanks,
 Allen



Re: High Level Consumer Iterator IllegalStateException Issue

2014-10-28 Thread Neha Narkhede
queued.max.message.chunks controls the consumer's fetcher queue.

On Mon, Oct 27, 2014 at 9:32 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com
wrote:

 HI Neha,

 If I solved the problem number 1 think and number 2 will be solved  (prob
 1 is causing problem number 2(blocked)).  Can you please let me know what
 controls the queue size for *ConsumerFetcherThread* thread ?


 Please see the attached java source code which will reproduce the
 problem.  You may remove the recovery process...  Please check.  We have to
 do some work before we start reading from Kafka Stream Interator and this
 seems to cause some issue with java.lang.
 IllegalStateException: Iterator is in failed state*.

 Please let me know your finding and recommendation.

 Thanks,

 Bhavesh

 On Mon, Oct 27, 2014 at 6:24 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  Sometime it give following exception.

 It will help to have a more specific test case that reproduces the failed
 iterator state.

 Also, the consumer threads block if the fetcher queue is full. The queue
 can fill up if your consumer thread dies or slows down. I'd recommend you
 ensure that all your consumer threads are alive. You can take a thread
 dump
 to verify this.

 Thanks,
 Neha

 On Mon, Oct 27, 2014 at 2:14 PM, Bhavesh Mistry 
 mistry.p.bhav...@gmail.com
 wrote:

  Hi Neha,
 
 
  I have two problems:.  Any help is greatly appreciated.
 
 
  1)* java.lang.IllegalStateException: Iterator is in failed state*
 
 ConsumerConnector  consumerConnector = Consumer
  .createJavaConsumerConnector(getConsumerConfig());
  MapString, Integer topicCountMap = new HashMapString,
  Integer();
  topicCountMap.put(topic, *32*);
  MapString, ListKafkaStreambyte[], byte[] topicStreamMap =
  consumerConnector
  .createMessageStreams(topicCountMap);
 
  ListKafkaStreambyte[], byte[] streams =
  Collections.synchronizedList(topicStreamMap.get(topic));
 
  AppStaticInfo info = Mupd8Main.STATICINFO();
 
  IteratorKafkaStreambyte[], byte[] iterator =
  streams.iterator();
  // remove the head first list for this source...rest are for the
  Dynamic Souce...
  mainIterator = iterator.next().iterator();
 
  ListConsumerIteratorbyte[], byte[] iteratorList = new
  ArrayListConsumerIteratorbyte[],byte[](streams.size());
  // now rest of the iterator must be registered now..
  while(iterator.hasNext()){
  iteratorList.add(iterator.next().iterator());
  }
  *KafkaStreamRegistory.registerStream(mainSourceName,
  iteratorList);*
 
  Once the Consumer iterator is created and registered.  We use this in
  another thread to start reading from the Consumer Iterator.   Sometime
 it
  give following exception.
 
  24 Oct 2014 16:03:25,923 ERROR
  [SourceReader:request_source:LogStreamKafkaSource1]
  (grizzled.slf4j.Logger.error:116)  - SourceThread: exception during
 reads.
  Swallowed to continue next read.
  java.lang.IllegalStateException: Iterator is in failed state
  at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
 
 
  I have tried to recover from this state by using this:
  iterator.resetState(); but it does not recover sometime.
 
 
 
 
  *2) ConsumerFetcherThread are blocked on enqueue ?  What controls size
 of
  queue ? Why are they blocked ?  *Due to this our lags are increasing.
 our
  threads blocked on hasNext()...
 
 
 
 ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-1
  prio=5 tid=0x7fb36292c800 nid=0xab03 waiting on condition
  [0x000116379000]
 java.lang.Thread.State: WAITING (parking)
  at sun.misc.Unsafe.park(Native Method)
  - parking to wait for  0x000704019388 (a
  java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
  at
  java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
  at
 
 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
  at
 
 java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
  at
  kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
  at
 
 
 kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
  at
 
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
  at
 
 
 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
  at
  scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
  at
 
 scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
  at
 
 scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
  at
 
 
 kafka.server.AbstractFetcherThread$$anonfun

Re: High Level Consumer Iterator IllegalStateException Issue

2014-10-27 Thread Neha Narkhede
 (a
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at
 java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
 at

 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
 at
 java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
 at
 kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
 at

 kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
 at

 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
 at

 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
 at
 scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
 at
 scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
 at

 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
 at

 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
 at

 kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
 at kafka.utils.Utils$.inLock(Utils.scala:535)
 at

 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
 at
 kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)






 Thanks,

 Bhavesh



 On Sun, Oct 26, 2014 at 3:14 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  Can you provide the steps to reproduce this issue?
 
  On Fri, Oct 24, 2014 at 6:11 PM, Bhavesh Mistry 
  mistry.p.bhav...@gmail.com
  wrote:
 
   I am using one from the Kafka Trunk branch.
  
   Thanks,
  
   Bhavesh
  
   On Fri, Oct 24, 2014 at 5:24 PM, Neha Narkhede 
 neha.narkh...@gmail.com
   wrote:
  
Which version of Kafka are you using on the consumer?
   
On Fri, Oct 24, 2014 at 4:14 PM, Bhavesh Mistry 
mistry.p.bhav...@gmail.com
wrote:
   
 HI Kafka Community ,

 I am using kafka trunk source code and I get following exception.
  What
 could cause the iterator to have FAILED state.  Please let me know
  how
   I
 can fix this issue.





 *java.lang.IllegalStateException: Iterator is in failed stateat
 kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)*
 Here is Properties:

 Properties props = new Properties();
 props.put(zookeeper.connect, zkConnect);
 props.put(group.id, groupId);
 *props.put(consumer.timeout.ms 
 http://consumer.timeout.ms
  ,
 -1);*
 props.put(zookeeper.session.timeout.ms, 1);
 props.put(zookeeper.sync.time.ms, 6000);
 props.put(auto.commit.interval.ms, 2000);
 props.put(rebalance.max.retries, 8);
 props.put(auto.offset.reset, largest);
 props.put(fetch.message.max.bytes,2097152);
 props.put(socket.receive.buffer.bytes,2097152);
 props.put(auto.commit.enable,true);


 Thanks,

 Bhavesh

   
  
 



Re: High Level Consumer Iterator IllegalStateException Issue

2014-10-26 Thread Neha Narkhede
Can you provide the steps to reproduce this issue?

On Fri, Oct 24, 2014 at 6:11 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com
wrote:

 I am using one from the Kafka Trunk branch.

 Thanks,

 Bhavesh

 On Fri, Oct 24, 2014 at 5:24 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  Which version of Kafka are you using on the consumer?
 
  On Fri, Oct 24, 2014 at 4:14 PM, Bhavesh Mistry 
  mistry.p.bhav...@gmail.com
  wrote:
 
   HI Kafka Community ,
  
   I am using kafka trunk source code and I get following exception.  What
   could cause the iterator to have FAILED state.  Please let me know how
 I
   can fix this issue.
  
  
  
  
  
   *java.lang.IllegalStateException: Iterator is in failed stateat
   kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)*
   Here is Properties:
  
   Properties props = new Properties();
   props.put(zookeeper.connect, zkConnect);
   props.put(group.id, groupId);
   *props.put(consumer.timeout.ms http://consumer.timeout.ms,
   -1);*
   props.put(zookeeper.session.timeout.ms, 1);
   props.put(zookeeper.sync.time.ms, 6000);
   props.put(auto.commit.interval.ms, 2000);
   props.put(rebalance.max.retries, 8);
   props.put(auto.offset.reset, largest);
   props.put(fetch.message.max.bytes,2097152);
   props.put(socket.receive.buffer.bytes,2097152);
   props.put(auto.commit.enable,true);
  
  
   Thanks,
  
   Bhavesh
  
 



Re: kafka.common.ConsumerRebalanceFailedException

2014-10-26 Thread Neha Narkhede
Can you see if the workarounds mentioned here
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped,why?
for failed rebalance attempts is useful?

On Fri, Oct 24, 2014 at 11:22 AM, Mohit Anchlia mohitanch...@gmail.com
wrote:

 I am seeing following exception, don't understand the issue here. Is there
 a way to resolve this error?

 client consumer logs:


 Exception in thread main kafka.common.ConsumerRebalanceFailedException:
 groupB_ip-10-38-19-230-1414174925481-97fa3f2a can't rebalance after 4
 retries
 at

 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
 at

 kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
 at

 kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)
 at kafka.javaapi.consumer.Zookeep


 server logs:



 [2014-10-24 14:21:47,327] INFO Got user-level KeeperException when
 processing sessionid:0x149435a553d007d type:create cxid:0x97 zxid:0xb4e
 txntype:-1 reqpath:n/a Error Path:/consumers/groupB/owners/topicA/28
 Error:KeeperErrorCode = NodeExists for /consumers/groupB/owners/topicA/28
 (org.apache.zookeeper.server.PrepRequestProcessor)
 [2014-10-24 14:21:47,329] INFO Got user-level KeeperException when
 processing sessionid:0x149435a553d007d type:create cxid:0x99 zxid:0xb4f
 txntype:-1 reqpath:n/a Error Path:/consumers/groupB/owners/topicA/23
 Error:KeeperErrorCode = NodeExists for /consumers/groupB/owners/topicA/23
 (org.apache.zookeeper.server.PrepRequestProcessor)



Re: High Level Consumer Iterator IllegalStateException Issue

2014-10-24 Thread Neha Narkhede
Which version of Kafka are you using on the consumer?

On Fri, Oct 24, 2014 at 4:14 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com
wrote:

 HI Kafka Community ,

 I am using kafka trunk source code and I get following exception.  What
 could cause the iterator to have FAILED state.  Please let me know how I
 can fix this issue.





 *java.lang.IllegalStateException: Iterator is in failed stateat
 kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)*
 Here is Properties:

 Properties props = new Properties();
 props.put(zookeeper.connect, zkConnect);
 props.put(group.id, groupId);
 *props.put(consumer.timeout.ms http://consumer.timeout.ms,
 -1);*
 props.put(zookeeper.session.timeout.ms, 1);
 props.put(zookeeper.sync.time.ms, 6000);
 props.put(auto.commit.interval.ms, 2000);
 props.put(rebalance.max.retries, 8);
 props.put(auto.offset.reset, largest);
 props.put(fetch.message.max.bytes,2097152);
 props.put(socket.receive.buffer.bytes,2097152);
 props.put(auto.commit.enable,true);


 Thanks,

 Bhavesh



Re: Example of using simple consumer to fetch from multiple partitions of a topic

2014-10-24 Thread Neha Narkhede
Is there a better way to do this where I can build one big requests with
broker - partition mappings and call a consumer.fetch() with one giant
request?

Unfortunately, not until the 0.9 consumer is released. Until then, you can
take a look at ZookeeperConsumerConnector on how it manages multiple
SimpleConsumers.

Thanks,
Neha

On Fri, Oct 24, 2014 at 1:46 PM, Rajiv Kurian ra...@signalfuse.com wrote:

 I saw the example at

 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
 and it answered most of my questions. I am still trying to figure out the
 pattern to be used when I want to use a single simple consumer to fetch
 messages from different partitions (possibly from different offsets) and
 possibly managed by different leaders.

 My use case right now is that I have a consumer which dynamically decides
 which partitions it is responsible for and decides to fetch messages from
 them at potentially different offsets.

 Right now it seems like I would need a new SimpleConsumer for each broker
 since the SimpleConsumer takes the leadBroker in it's constructor. Then I'd
 have to build a FetchRequest for each broker and ensure that the
 addFetch(...) calls are made with partitions that correspond to the leader
 broker that a SimpleConsumer is managing. Finally I'll need to make up to
 numBrokers number of consumer.fetch(req) calls since each request is for a
 separate broker.

 Is there a better way to do this where I can build one big requests with
 broker - partition mappings and call a consumer.fetch() with one giant
 request? Otherwise if I am doing this in a single thread I have head of
 line blocking with one request blocking another.

 Thanks!



Re: Reusable consumer across consumer groups

2014-10-23 Thread Neha Narkhede
I'm wondering how much of this can be done using careful system design vs
building it within the consumer itself. You could distribute the several
consumer instances across machines since it is built for distributed load
balancing. That will sufficiently isolate the resources required to run the
various consumers. But probably you have a specific use case in mind for
running several consumer groups on the same machine. Would you mind giving
more details?

On Thu, Oct 23, 2014 at 12:55 AM, Stevo Slavić ssla...@gmail.com wrote:

 Imagine exposing Kafka over various remoting protocols, where incoming
 poll/read requests may come in concurrently for different consumer groups,
 especially in a case with lots of different consumer groups.
 If you create and destroy KafkaConsumer for each such request, response
 times and throughput will be very low, and doing that is one of the ways to
 reproduce https://issues.apache.org/jira/browse/KAFKA-1716

 It would be better if one could reuse a (pool of) Consumer instances, and
 through a read operation parameter specify for which consumer group should
 read be performed.

 Kind regards,
 Stevo Slavic.

 On Tue, Oct 14, 2014 at 6:17 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  Stevo,
 
  The new consumer API is planned for 0.9, not 0.8.2. You can take a look
 at
  a detailed javadoc here
  
 
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
  
  .
 
  Can you explain why you would like to poll messages across consumer
 groups
  using just one instance?
 
  Thanks,
  Neha
 
  On Tue, Oct 14, 2014 at 1:03 AM, Stevo Slavić ssla...@gmail.com wrote:
 
   Hello Apache Kafka community,
  
   Current (Kafka 0.8.1.1) high-level API's KafkaConsumer is not
 lightweight
   object, it's creation takes some time and resources, and it does not
 seem
   to be thread-safe. It's API also does not support reuse, for consuming
   messages from different consumer groups.
  
   I see even in the coming (0.8.2) redesigned API it will not be possible
  to
   reuse consumer instance to poll messages from different consumer
 groups.
  
   Can something be done to support this?
  
   Would it help if there was consumer group as a separate entity from
   consumer, for all the subscription management tasks?
  
   Kind regards,
   Stevo Slavic
  
 



Re: Kafka 0.9

2014-10-23 Thread Neha Narkhede
The new consumer development hasn't started yet. But we have a very
detailed design doc
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
and JIRA https://issues.apache.org/jira/browse/KAFKA-1326 plan, if you'd
like to contribute. The new client will be protocol compatible with the 0.8
broker.

On Thu, Oct 23, 2014 at 3:54 PM, Rajiv Kurian ra...@signalfuse.com wrote:

 I really like the Kafka 0.9 consumer api. I want to start using it. Is it
 available on maven or maybe as a downloadable jar ? If not what is the best
 way to get it?

 Also wanted to ask if it the new client protocol is compatible with the 0.8
 broker.

 Thanks!



Re: 0.8.1.2

2014-10-22 Thread Neha Narkhede
Yes, 0.8.2 includes the new producer. 0.8.2 will have a lot of new features
which will take time to stabilize. If people want 0.8.1.2 for some critical
bug fixes, we can discuss the feasibility of doing the release.

On Wed, Oct 22, 2014 at 1:39 AM, Shlomi Hazan shl...@viber.com wrote:

 at the time I thought it was a good idea but if I understand correctly what
 Jun is saying is that 0.8.1.2 will not happen.
 I assume Jun sees 0.8.2 coming soon enough to remove any added value from
 0.8.1.2.
 Shlomi

 On Wed, Oct 22, 2014 at 5:21 AM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  Shlomi,
 
  As Jun mentioned, we are voting on a 0.8.2 beta release now. Are you
  suggesting there be an 0.8.1.2 release in addition to that? We can take a
  quick vote from the community to see how many people prefer to have this
  and why.
 
  Thanks,
  Neha
 
  On Tue, Oct 21, 2014 at 6:03 PM, Jun Rao jun...@gmail.com wrote:
 
   We are voting an 0.8.2 beta release right now.
  
   Thanks,
  
   Jun
  
   On Tue, Oct 21, 2014 at 11:17 AM, Shlomi Hazan shl...@viber.com
 wrote:
  
Hi All,
Will version 0.8.1.2 happen?
Shlomi
   
  
 



Re: frequent periods of ~1500 replicas not in sync

2014-10-22 Thread Neha Narkhede
Neil,

We fixed a bug related to the BadVersion problem in 0.8.1.1. Would you mind
repeating your test on 0.8.1.1 and if you can still reproduce this issue,
then send around the thread dump and attach the logs to KAFKA-1407?

Thanks,
Neha

On Tue, Oct 21, 2014 at 11:56 AM, Neil Harkins nhark...@gmail.com wrote:

 Hi. I've got a 5 node cluster running Kafka 0.8.1,
 with 4697 partitions (2 replicas each) across 564 topics.
 I'm sending it about 1% of our total messaging load now,
 and several times a day there is a period where 1~1500
 partitions have one replica not in sync. Is this normal?
 If a consumer is reading from a replica that gets deemed
 not in sync, does it get redirected to the good replica?
 Is there a #partitions over which maintenance tasks
 become infeasible?

 Relevant config bits:
 auto.leader.rebalance.enable=true
 leader.imbalance.per.broker.percentage=20
 leader.imbalance.check.interval.seconds=30
 replica.lag.time.max.ms=1
 replica.lag.max.messages=4000
 num.replica.fetchers=4
 replica.fetch.max.bytes=10485760

 Not necessarily correlated to those periods,
 I see a lot of these errors in the logs:

 [2014-10-20 21:23:26,999] 21963614 [ReplicaFetcherThread-3-1] ERROR
 kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-3-1], Error
 in fetch Name: FetchRequest; Version: 0; CorrelationId: 77423;
 ClientId: ReplicaFetcherThread-3-1; ReplicaId: 2; MaxWait: 500 ms;
 MinBytes: 1 bytes; RequestInfo: ...

 And a few of these:

 [2014-10-20 21:23:39,555] 3467527 [kafka-scheduler-2] ERROR
 kafka.utils.ZkUtils$  - Conditional update of path
 /brokers/topics/foo.bar/partitions/3/state with data
 {controller_epoch:11,leader:3,version:1,leader_epoch:109,isr:[3]}
 and expected version 197 failed due to
 org.apache.zookeeper.KeeperException$BadVersionException:
 KeeperErrorCode = BadVersion for
 /brokers/topics/foo.bar/partitions/3/state

 And this one I assume is a client closing the connection non-gracefully,
 thus should probably be a warning, not an error?:

 [2014-10-20 21:54:15,599] 23812214 [kafka-processor-9092-3] ERROR
 kafka.network.Processor  - Closing socket for /10.31.0.224 because of
 error

 -neil



Re: Errors after reboot on single node setup

2014-10-22 Thread Neha Narkhede
Can you provide steps to reproduce this? I'm not sure I understand how you
run into this. It does look like a bug.

On Wed, Oct 22, 2014 at 9:55 AM, Ciprian Hacman ciprian.hac...@sematext.com
 wrote:

 Hi,

 First of all, I am new to Kafka and more of a user than a developer. I will
 try to clarify things as much as possible though.

 We are using Kafka as a message system for our apps and works nicely in our
 SaaS cluster.
 I am trying to make the apps also work on a single node for demo purposes.
 I set up Zookeeper, Kafka and our apps on a node and things were ok until
 rebooting the node. After that I see the following messages in Kafka log:

 [2014-10-22 16:37:22,206] INFO [Controller 0]: Controller starting up
 (kafka.controller.KafkaController)
 [2014-10-22 16:37:22,419] INFO [Controller 0]: Controller startup complete
 (kafka.controller.KafkaController)
 [2014-10-22 16:37:22,554] INFO conflict in /brokers/ids/0 data:

 {jmx_port:-1,timestamp:1413995842465,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092}
 stored data:

 {jmx_port:-1,timestamp:1413994171579,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092}
 (kafka.utils.ZkUtils$)
 [2014-10-22 16:37:22,736] INFO I wrote this conflicted ephemeral node

 [{jmx_port:-1,timestamp:1413995842465,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092}]
 at /brokers/ids/0 a while back in a different session, hence I will backoff
 for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
 [2014-10-22 16:37:25,010] ERROR Error handling event ZkEvent[Data of
 /controller changed sent to
 kafka.server.ZookeeperLeaderElector$LeaderChangeListener@a6af882]
 (org.I0Itec.zkclient.ZkEventThread)
 java.lang.IllegalStateException: Kafka scheduler has not been started
 at
 kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114)
 at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86)
 at

 kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350)
 at

 kafka.controller.KafkaController$$anonfun$2.apply$mcV$sp(KafkaController.scala:162)
 at

 kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:138)
 at

 kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134)
 at

 kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134)
 at kafka.utils.Utils$.inLock(Utils.scala:535)
 at

 kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:134)
 at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
 at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
 [2014-10-22 16:37:28,757] INFO Registered broker 0 at path /brokers/ids/0
 with address ip-10-91-142-54.eu-west-1.compute.internal:9092.
 (kafka.utils.ZkUtils$)
 [2014-10-22 16:37:28,849] INFO [Kafka Server 0], started
 (kafka.server.KafkaServer)
 [2014-10-22 16:38:56,718] INFO Closing socket connection to /127.0.0.1.
 (kafka.network.Processor)
 [2014-10-22 16:38:56,850] INFO Closing socket connection to /127.0.0.1.
 (kafka.network.Processor)
 [2014-10-22 16:38:56,985] INFO Closing socket connection to /127.0.0.1.
 (kafka.network.Processor)


 The last log line repeats forever and is correlated with errors on the app
 side.
 Restarting Kafka fixes the errors.

 I am using Kafka 0.8.2 from github to avoid
 https://issues.apache.org/jira/browse/KAFKA-1451.

 Does anyone have any idea why this happens and how it can be fixed?

 Thanks,
 Ciprian
 --
 Performance Monitoring * Log Analytics * Search Analytics
 Solr  Elasticsearch Support * http://sematext.com/



Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Neha Narkhede
In my experience, RAID 10 doesn't really provide value in the presence of
replication. When a disk fails, the RAID resync process is so I/O intensive
that it renders the broker useless until it completes. When this happens,
you actually have to take the broker out of rotation and move the leaders
off of it to prevent it from serving requests in a degraded state. You
might as well shutdown the broker, delete the broker's data and let it
catch up from the leader.

On Wed, Oct 22, 2014 at 11:20 AM, Gwen Shapira gshap...@cloudera.com
wrote:

 Makes sense. Thanks :)

 On Wed, Oct 22, 2014 at 11:10 AM, Jonathan Weeks
 jonathanbwe...@gmail.com wrote:
  There are various costs when a broker fails, including broker leader
 election for each partition, etc., as well as exposing possible issues for
 in-flight messages, and client rebalancing etc.
 
  So even though replication provides partition redundancy, RAID 10 on
 each broker is usually a good tradeoff to prevent the typical most common
 cause of broker server failure (e.g. disk failure) as well, and overall
 smoother operation.
 
  Best Regards,
 
  -Jonathan
 
 
  On Oct 22, 2014, at 11:01 AM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  RAID-10?
  Interesting choice for a system where the data is already replicated
  between nodes. Is it to avoid the cost of large replication over the
  network? how large are these disks?
 
  On Wed, Oct 22, 2014 at 10:00 AM, Todd Palino tpal...@gmail.com
 wrote:
  In fact there are many more than 4000 open files. Many of our brokers
 run
  with 28,000+ open files (regular file handles, not network
 connections). In
  our case, we're beefing up the disk performance as much as we can by
  running in a RAID-10 configuration with 14 disks.
 
  -Todd
 
  On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She xiaobin...@gmail.com
 wrote:
 
  Todd,
 
  Actually I'm wondering how kafka handle so much partition, with one
  partition there is at least one file on disk, and with 4000 partition,
  there will be at least 4000 files.
 
  When all these partitions have write request, how did Kafka make the
 write
  operation on the disk to be sequential (which is emphasized in the
 design
  document of Kafka) and make sure the disk access is effective?
 
  Thank you for your reply.
 
  xiaobinshe
 
 
 
  2014-10-22 5:10 GMT+08:00 Todd Palino tpal...@gmail.com:
 
  As far as the number of partitions a single broker can handle, we've
 set
  our cap at 4000 partitions (including replicas). Above that we've
 seen
  some
  performance and stability issues.
 
  -Todd
 
  On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She xiaobin...@gmail.com
  wrote:
 
  hello, everyone
 
  I'm new to kafka, I'm wondering what's the max num of partition can
 one
  siggle machine handle in Kafka?
 
  Is there an sugeest num?
 
  Thanks.
 
  xiaobinshe
 
 
 
 



Re: Sending Same Message to Two Topics on Same Broker Cluster

2014-10-21 Thread Neha Narkhede
I'm not sure I understood your concern about invoking send() twice, once
with each topic. Are you worried about the network overhead? Whether Kafka
does this transparently or not, sending messages to different topics will
carry some overhead. I think the design of the API is much more intuitive
and cleaner if a message is sent to a topic partition.

On Mon, Oct 20, 2014 at 9:17 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com
wrote:

 Hi Neha,

 Yes, I understand that but when transmitting single message (I can not set
 List of all topics)  Only Single one.  So I will to add same message in
 buffer with different topic. If Kakfa protocol, allows to add multiple
 topic then message does not have to be re-transmited over the wire to add
 to multiple topic.

 The Producer record only allow one topic.

 http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/producer/ProducerRecord.html

 Thanks for your quick response and I appreciate your help.

 Thanks,

 Bhavesh


 On Mon, Oct 20, 2014 at 9:10 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  Not really. You need producers to send data to Kafka.
 
  On Mon, Oct 20, 2014 at 9:05 PM, Bhavesh Mistry 
  mistry.p.bhav...@gmail.com
  wrote:
 
   Hi Kakfa Team,
  
  
   I would like to send a single message to multiple topics (two for now)
   without re-transmitting the message from producer to brokers.  Is this
   possible?
  
   Both Producers Scala and Java does not allow this.   I do not have to
 do
   this all the time only based on application condition.
  
  
   Thanks in advance of your help !!
  
  
   Thanks,
  
  
   Bhavesh
  
 



Re: Rebalance not happening even after increasing max retries causing conflict in ZK

2014-10-20 Thread Neha Narkhede
Mohit,

I wonder if it is related to
https://issues.apache.org/jira/browse/KAFKA-1585. When zookeeper expires a
session, it doesn't delete the ephemeral nodes immediately. So if you end
up trying to recreate ephemeral nodes quickly, it could either be in the
valid latest session or from the previously expired session. If you hit
this problem, then waiting would resolve it. But if not, then this may be a
legitimate bug in ZK 3.4.6.

Can you try shutting down all your consumers, waiting until session timeout
and restarting them?

Thanks,
Neha

On Mon, Oct 20, 2014 at 6:15 AM, Mohit Kathuria mkathu...@sprinklr.com
wrote:

 Dear Experts,

 We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I have of
 topic with 30 partitions and 2 replicas. We are using High level consumer
 api.
 Each consumer process which is a storm topolofy has 5 streams which
 connects to 1 or more partitions. We are not using storm's inbuilt kafka
 spout. Everything runs fine till the 5th consumer process(25 streams) is
 added for this topic.

 As soon as the sixth consumer process is added, the newly added partition
 does not get the ownership of the partitions that it requests for as the
 already existing owners have not yet given up the ownership.

 We changed certain properties on consumer :

 1. Max Rebalance attempts - 20 ( rebalance.backoff.ms *
 rebalance.max.retries  zk connection timeout)
 2. Back off ms between rebalances - 1 (10seconds)
 3. ZK connection timeout - 100,000 (100 seconds)

 Although when I am looking in the zookeeper shell when the rebalance is
 happening, the consumer is registered fine on the zookeeper. Just that the
 rebalance does not happen.
 After the 20th rebalance gets completed, we get


 *2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO]
 [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], Committing all
 offsets after clearing the fetcher queues*
 *2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN] Ignoring
 exception while trying to start streamer threads:
 rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after
 20 retries*
 *kafka.common.ConsumerRebalanceFailedException:
 rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after
 20 retries*
 *at

 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
 ~[stormjar.jar:na]*
 *at

 kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
 ~[stormjar.jar:na]*
 *at

 kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)
 ~[stormjar.jar:na]*
 *at

 kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80)
 ~[stormjar.jar:na]*
 *at

 com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.createAndStartThreads(KafkaFeedStreamer.java:79)
 ~[stormjar.jar:na]*
 *at

 com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.startKafkaStreamThreadsIfNecessary(KafkaFeedStreamer.java:64)
 ~[stormjar.jar:na]*
 *at

 com.spr.messaging.kafka.impl.consumer.KafkaFeedConsumerFactoryImpl.startStreamerIfNotRunning(KafkaFeedConsumerFactoryImpl.java:71)
 [stormjar.jar:na]*
 *at

 com.spr.messaging.kafka.impl.consumer.KafkaFeedPullConsumerImpl.startStreamerIfNotRunning(KafkaFeedPullConsumerImpl.java:48)
 [stormjar.jar:na]*
 *at

 com.spr.messaging.kafka.impl.KafkaFeedServiceImpl.getKafkaFeedPullConsumer(KafkaFeedServiceImpl.java:63)
 [stormjar.jar:na]*
 *at

 com.spr.storm.topology.spout.AbstractSprKafkaSpout.nextTuple(AbstractSprKafkaSpout.java:121)
 [stormjar.jar:na]*
 *at

 backtype.storm.daemon.executor$eval3848$fn__3849$fn__3864$fn__3893.invoke(executor.clj:562)
 [na:0.9.1-incubating]*
 *at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433)
 [na:0.9.1-incubating]*
 *at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]*
 *at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]*
 *2014-10-11 11:10:10 k.c.ZookeeperConsumerConnector [INFO]
 [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], begin registering
 consumer rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b in ZK*
 *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] conflict in

 /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b
 data:

 {version:1,subscription:{rule-engine-feed:5},pattern:static,timestamp:1413025810635}
 stored data:

 {version:1,subscription:{rule-engine-feed:5},pattern:static,timestamp:1413025767483}*
 *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] I wrote this conflicted ephemeral
 node

 [{version:1,subscription:{rule-engine-feed:5},pattern:static,timestamp:1413025810635}]
 at

 /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b
 a while back in a different session, hence I will backoff for this node to
 

Re: taking broker down and returning it does not restore cluster state (nor rebalance)

2014-10-20 Thread Neha Narkhede
Did you ensure that your replication factor was set higher than 1? If so,
things should recover automatically after adding the killed broker back
into the cluster.

On Mon, Oct 20, 2014 at 1:32 AM, Shlomi Hazan shl...@viber.com wrote:

 Hi,

 Running some tests on 0811 and wanted to see what happens when a broker is
 taken down with 'kill'. I bumped into the situation at the subject where
 launching the broker back left him a bit out of the game as far as I could
 see using stack driver metrics.
 Trying to rebalance with verify consumer rebalance return an error no
 owner for partition for all partitions of that topic (128 partitions).
 moreover, yet aside from the issue at hand, changing the group name to a
 non-existent group returned success.
 taking both the consumers and producers down allowed the rebalance to
 return success...

 And the question is:
 How do you restore 100% state after taking down a broker? what is the best
 practice? what needs be checked and what needs be done?

 Shlomi



Re: How to produce and consume events in 2 DCs?

2014-10-20 Thread Neha Narkhede
Another way to set up this kind of mirroring is by deploying 2 clusters in
each DC - a local Kafka cluster and an aggregate Kafka cluster. The mirror
maker copies data from both the DC's local clusters into the aggregate
clusters. So if you want access to a topic with data from both DC's, you
subscribe to the aggregate cluster.

Thanks,
Neha

On Mon, Oct 20, 2014 at 7:07 AM, Erik van oosten 
e.vanoos...@grons.nl.invalid wrote:

 Hi,

 We have 2 data centers that produce events. Each DC has to process events
 from both DCs.

 I had the following in mind:

DC 1 | DC 2
 events  |events
+  +  +  |   +  +  +
|  |  |  |   |  |  |
v  v  v  |   v  v  v
  ++ | ++
  | Receiver topic | | | Receiver topic |
  ++   ++
  |  |   mirroring  ||
  |  |   +--+|
  |  |   |   |
  |  ++  |
  v  vv  v
  ++ | ++
  | Consumer topic | | | Consumer topic |
  ++ | ++
+  +  +  |   +  +  +
|  |  |  |   |  |  |
v  v  v  |   v  v  v
   consumers |  consumers


 As each DC has a single Kafka cluster, on each DC the receiver topic and
 consumer topic needs to be on the same cluster.
 Unfortunately, mirror maker does not seem to support mirroring to a topic
 with another name.

 Is there another tool we could use?
 Or, is there another approach for producing and consuming from 2 DCs?

 Kind regards,
 Erik.

 —
 Erik van Oosten
 http://www.day-to-day-stuff.blogspot.nl/




Re: log.cleanup.interval.mins still valid for 0.8.1?

2014-10-20 Thread Neha Narkhede
Which example are you referring to?

On Mon, Oct 20, 2014 at 7:47 AM, Libo Yu yu_l...@hotmail.com wrote:

 Hi all,


 This config property does not appear in the table of broker config
 properties. But it appears in the example on the Web page. So I wonder if
 this is still a valid config property for 0.8.1. Thanks.

 Libo




Re: MBeans, dashes, underscores, and KAFKA-1481

2014-10-17 Thread Neha Narkhede
+1 on getting rid of the quotes.

On Fri, Oct 17, 2014 at 12:31 PM, Magnus Spångdal 
magnus.spang...@deltaprojects.com wrote:

 +1 to get rid of quotes, thanks!






 —
 Sent from Mailbox

 On Fri, Oct 17, 2014 at 8:54 PM, Jun Rao jun...@gmail.com wrote:

  Hi, everyone,
  We are fixing the mbean names in kafka-1482, by adding separate explicit
  tags in the name for things like clientId and topic. Another thing that
  some people have complained before is that we use quotes in the jmx name.
  Should we also just get rid of the quotes as part of kafka-1482? So,
  instead of
 kafka.server:type=BrokerTopicMetrics,name=topic-1-BytesInPerSec
  we will have
 kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=topic-1
  Thanks,
  Jun
  On Thu, Oct 9, 2014 at 11:12 AM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
  I am going to vote for 1482 to be included in 0.8.2, if we have a patch
  submitted in a week. I think we've had this JIRA opened for too long
 and we
  held people back so it's only fair to release this.
 
  On Wed, Oct 8, 2014 at 9:40 PM, Jun Rao jun...@gmail.com wrote:
 
   Otis,
  
   Just have the patch ready asap. We can make a call then.
  
   Thanks,
  
   Jun
  
   On Wed, Oct 8, 2014 at 6:13 AM, Otis Gospodnetic 
   otis.gospodne...@gmail.com
wrote:
  
Hi Jun,
   
Would by the end of next week be acceptable for 0.8.2?
   
Thanks,
Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log
 Management
Solr  Elasticsearch Support * http://sematext.com/
   
   
On Tue, Oct 7, 2014 at 4:04 PM, Jun Rao jun...@gmail.com wrote:
   
 Otis,

 Yes, if you guys can help provide a patch in a few days, we can
   probably
 get it to the 0.8.2 release.

 Thanks,

 Jun

 On Tue, Oct 7, 2014 at 12:10 PM, Otis Gospodnetic 
 otis.gospodne...@gmail.com wrote:

  Hi Jun,
 
  I think your MBean renaming approach will work.  I see
  https://issues.apache.org/jira/browse/KAFKA-1481 has Fix
 Version
0.8.2,
  but
  is not marked as a Blocker.  We'd love to get the MBeans fixed
 so
   this
  makes it in 0.8.2 release.  Do you know if this is on anyone's
  plate
(the
  issue is currently Unassigned)?  If not, should we provide a new
   patch
 that
  uses your approach?
 
  Thanks,
  Otis
  --
  Monitoring * Alerting * Anomaly Detection * Centralized Log
   Management
  Solr  Elasticsearch Support * http://sematext.com/
 
 
  On Thu, Sep 18, 2014 at 4:49 PM, Jun Rao jun...@gmail.com
 wrote:
 
   Otis,
  
   In kafka-1481, we will have to change the mbean names (at
 least
  the
 ones
   with clientid and topic) anyway. Using the name/value pair in
 the
mbean
   name allows us to do this in a cleaner way. Yes, , is not
  allowed
in
   clientid or topic.
  
   Bhavesh,
  
   Yes, I was thinking of making changes in the new metrics
 package.
  Something
   like allowing the sensor names to have name/value pairs. The
 jmx
names
  will
   just follow accordingly. This is probably cleaner than doing
 the
  escaping.
   Also, the metric names are more intuitive (otherwise, you
 have to
know
   which part is the clientid and which part is the topic).
  
   Thanks,
  
   Jun
  
   On Wed, Sep 17, 2014 at 2:32 PM, Otis Gospodnetic 
   otis.gospodne...@gmail.com wrote:
  
Hi Jun,
   
On Wed, Sep 17, 2014 at 12:35 PM, Jun Rao jun...@gmail.com
 
wrote:
   
 Bhavesh,

 Yes, allowing dot in clientId and topic makes it a bit
 harder
   to
  define
the
 JMX bean names. I see a couple of solutions here.

 1. Disable dot in clientId and topic names. The issue is
 that
   dot
 may
 already be used in existing deployment.

 2. We can represent the JMX bean name differently in the
 new
  producer.
 Instead of
   kafka.producer.myclientid:type=mytopic
 we could change it to
   kafka.producer:clientId=myclientid,topic=mytopic

 I felt that option 2 is probably better since it doesn't
  affect
   existing
 users.

   
If it doesn't affect existing users, great.
   
If you are saying that each piece of MBean name could be
expressed
 as
name=value pair, with something like , (forbidden in host
   names,
  topic
names, client IDs, etc. I assume?) then yes, I think this
 would
   be
  easier
to parse and it would be easier for people to understand
 what
  is
 what.
   
Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log
 Management
Solr  Elasticsearch Support * http://sematext.com/
   
   
   

 Otis

Re: 0.8.x = 0.8.2 upgrade - live seamless?

2014-10-16 Thread Neha Narkhede
Yes, you should be able to upgrade seamlessly.

On Wed, Oct 15, 2014 at 10:07 PM, Otis Gospodnetic 
otis.gospodne...@gmail.com wrote:

 Hi,

 Some of our SPM users who are eager to monitor their Kafka 0.8.x clusters
 with SPM are asking us whether the upgrade to 0.8.2 from 0.8.1 will be
 seamless.  I believe this will be the case, but wanted to double-check on
 that...

 Thanks,
 Otis
 --
 Monitoring * Alerting * Anomaly Detection * Centralized Log Management
 Solr  Elasticsearch Support * http://sematext.com/



Re: Kafka/Zookeeper deployment Questions

2014-10-16 Thread Neha Narkhede
In other words, if I change the number of partitions, can I restart the
brokers one at a time so that I can continue processing data?

Changing the # of partitions is an online operation and doesn't require
restarting the brokers. However, any other configuration (with the
exception of a few operations) that requires a broker restart can be done
in a rolling manner.

On Wed, Oct 15, 2014 at 7:16 PM, Sybrandy, Casey 
casey.sybra...@six3systems.com wrote:

 Hello,

 We're looking into deploying Kafka and Zookeeper into an environment where
 we want things to be as easy to stand up and administer.  To do this, we're
 looking into using Consul, or similar, and Confd to try to make this as
 automatic as possible.  I was wondering if anyone had an experience in this
 area.  My major concern is reconfiguring Kafka as, in my experience, is
 making sure we don't end up losing messages.

 Also, can kafka and zookeeper be reconfigured in a rolling manner?  In
 other words, if I change the number of partitions, can I restart the
 brokers one at a time so that I can continue processing data?

 Thanks.


Re: Broker brought down and under replicated partitions

2014-10-16 Thread Neha Narkhede
Is there a known issue in the 0.8.0 version that was
fixed later on? What can I do to diagnose/fix the situation?

Yes, quite a few bugs related to this have been fixed since 0.8.0. I'd
suggest upgrading to 0.8.1.1

On Wed, Oct 15, 2014 at 11:09 PM, Jean-Pascal Billaud j...@tellapart.com
wrote:

 The only thing that I find very weird is the fact that brokers that are
 dead are still part of the ISR set for hours... and are basically not
 removed. Note this is not constantly the case, most of the dead brokers are
 properly removed and it is really just in a few cases. I am not sure why
 this would happen. Is there a known issue in the 0.8.0 version that was
 fixed later on? What can I do to diagnose/fix the situation?

 Thanks,

 On Wed, Oct 15, 2014 at 9:58 AM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

  So I am using 0.8.0. I think I found the issue actually. It turns out
 that
  some partitions only had a single replica and the leaders of those
  partitions would basically refuse new writes. As soon as I reassigned
  replicas to those partitions things kicked off again. Not sure if that's
  expected... but that seemed to make the problem go away.
 
  Thanks,
 
 
  On Wed, Oct 15, 2014 at 6:46 AM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
 
  Which version of Kafka are you using? The current stable one is 0.8.1.1
 
  On Tue, Oct 14, 2014 at 5:51 PM, Jean-Pascal Billaud j...@tellapart.com
  wrote:
 
   Hey Neha,
  
   so I removed another broker like 30mn ago and since then basically the
   Producer is dying with:
  
   Event queue is full of unsent messages, could not send event:
   KeyedMessage(my_topic,[B@1b71b7a6,[B@35fdd1e7)
   kafka.common.QueueFullException: Event queue is full of unsent
 messages,
   could not send event: KeyedMessage(my_topic,[B@1b71b7a6,[B@35fdd1e7)
   at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Unknown Source)
   ~[kafka_2.10-0.8.0.jar:0.8.0]
   at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Unknown Source)
   ~[kafka_2.10-0.8.0.jar:0.8.0]
   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
   ~[scala-library-2.10.3.jar:na]
   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
   ~[scala-library-2.10.3.jar:na]
   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
   ~[scala-library-2.10.3.jar:na]
   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
   ~[scala-library-2.10.3.jar:na]
   at kafka.producer.Producer.asyncSend(Unknown Source)
   ~[kafka_2.10-0.8.0.jar:0.8.0]
   at kafka.producer.Producer.send(Unknown Source)
   ~[kafka_2.10-0.8.0.jar:0.8.0]
   at kafka.javaapi.producer.Producer.send(Unknown Source)
   ~[kafka_2.10-0.8.0.jar:0.8.0]
  
   It seems like it cannot recover for some reasons. The new leaders were
   elected it seems like so it should have picked up the new meta data
   information about the partitions. Is this something known from 0.8.0?
  What
   should be looking for to debug/fix this?
  
   Thanks,
  
   On Tue, Oct 14, 2014 at 2:22 PM, Neha Narkhede 
 neha.narkh...@gmail.com
  
   wrote:
  
Regarding (1), I am assuming that it is expected that brokers going
  down
will be brought back up soon. At which point, they will pick up from
  the
current leader and get back into the ISR. Am I right?
   
The broker will be added back to the ISR once it is restarted, but
 it
   never
goes out of the replica list until the admin explicitly moves it
 using
   the
reassign partitions tool.
   
Regarding (2), I finally kicked off a reassign_partitions admin task
   adding
broker 7 to the replicas list for partition 0 which finally fixed
 the
   under
replicated issue:
Is this therefore expected that the user will fix up the under
   replication
situation?
   
Yes. Currently, partition reassignment is purely an admin initiated
  task.
   
Another thing I'd like to clarify is that for another topic Y,
 broker
  5
   was
never removed from the ISR array. Note that Y is an unused topic so
 I
  am
guessing that technically broker 5 is not out of sync... though it
 is
   still
dead. Is this the expected behavior?
   
Not really. After replica.lag.time.max.ms (which defaults to 10
   seconds),
the leader should remove the dead broker out of the ISR.
   
Thanks,
Neha
   
On Tue, Oct 14, 2014 at 9:27 AM, Jean-Pascal Billaud 
  j...@tellapart.com
wrote:
   
 hey folks,

 I have been testing a kafka cluster of 10 nodes on AWS using
 version
 2.8.0-0.8.0
 and see some behavior on failover that I want to make sure I
   understand.

 Initially, I have a topic X with 30 partitions and a replication
  factor
of
 3. Looking at the partition 0:
 partition: 0 - leader: 5 preferred leader: 5 brokers: [5, 3, 4]
   in-sync:
 [5, 3, 4]

 While killing broker 5, the controller immediately grab the next
   replica
in
 the ISR and assign it as a leader

Re: read N items from topic

2014-10-16 Thread Neha Narkhede
Josh,

The consumer's API doesn't allow you to specify N messages, but you can
invoke iter.next() as Gwen suggested and count the messages. Note that the
iterator can block if you have less than N messages so you will have to
careful design around it. The new consumer's API provides a non blocking
poll() API so this sort of use case is better handled. In any case, getting
messages based on a count is something that has to happen on the consumer
side since the server sends the bytes using the sendfile API that doesn't
allow it to inspect the bytes.

Thanks,
Neha

On Thu, Oct 16, 2014 at 8:37 AM, gshap...@cloudera.com wrote:

 Using the high level consumer, each consumer in the group can call
 iter.next () in a loop until they get the number of messages you need.

 —
 Sent from Mailbox

 On Thu, Oct 16, 2014 at 10:18 AM, Josh J joshjd...@gmail.com wrote:

  hi,
  How do I read N items from a topic? I also would like to do this for a
  consumer group, so that each consumer can specify an N number of tuples
 to
  read, and each consumer reads distinct tuples.
  Thanks,
  Josh



Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?

2014-10-16 Thread Neha Narkhede
Another JIRA that will be nice to include as part of 0.8.2-beta is
https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean
naming. Looking for people's thoughts on 2 things here -

1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2
final 4-5 weeks later?
2. Do people want to include any JIRAs (other than the ones mentioned
above) in 0.8.2-beta? If so, it will be great to know now so it will allow
us to move forward with the beta release quickly.

Thanks,
Neha

On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com
wrote:

 Hi,

 We have accumulated an impressive list of pretty major features in 0.8.2 -
 Delete topic
 Automated leader rebalancing
 Controlled shutdown
 Offset management
 Parallel recovery
 min.isr and
 clean leader election

 In the past, what has worked for major feature releases is a beta release
 prior to a final release. I'm proposing we do the same for 0.8.2. The only
 blockers for 0.8.2-beta, that I know of are -

 https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and
 requires some thinking about the new dependency. Since it is not fully
 ready and there are things to think about, I suggest we take it out, think
 it end to end and then include it in 0.8.3.)
 https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner:
 Guozhang Wang)
 https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is
 waiting on a review by Joe Stein)

 It seems that 1634 and 1671 can get wrapped up in a week. Do people think
 we should cut 0.8.2-beta by next week?

 Thanks,
 Neha



Re: [Kafka-users] Producer not distributing across all partitions

2014-10-16 Thread Neha Narkhede
A topic.metadata.refresh.interval.ms of 10 mins means that the producer
will take 10 mins to detect new partitions. So newly added or reassigned
partitions might not get data for 10 mins. In general, if you're still at
prototyping stages, I'd recommend using the new producer available on kafka
trunk (org.apache.kafka.clients.producer.KafkaProducer). It has better
performance and APIs.

On Thu, Oct 16, 2014 at 3:07 AM, Mungeol Heo mungeol@gmail.com wrote:

 Hi,

 I have a question about 'topic.metadata.refresh.interval.ms'
 configuration.
 As I know, the default value of it is 10 minutes.
 Does it means that producer will change the partition at every 10 minutes?
 What I am experiencing is producer does not change to another
 partition at every 10 minutes.
 Sometime, It never changed during the process which costs about 25 minutes.
 I also changed the value of it to 1 minute for testing.
 It looks like working well at first time.
 However, same problem happens start from second test.
 Sometime, it takes more than 10 minutes to change the partition even
 if I set the value as 1 minute.
 Am i missing something?
 Any help will be great.

 Thanks.

 - Mungeol



  1   2   3   4   5   6   7   >