Re: [kafka-clients] [VOTE] 1.0.0 RC3

2017-10-23 Thread Dana Powers
+1. passed kafka-python integration tests, and manually verified
producer/consumer on both compressed and non-compressed data.

-Dana

On Mon, Oct 23, 2017 at 6:00 PM, Guozhang Wang  wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the third candidate for release of Apache Kafka 1.0.0. The main
> PRs that gets merged in after RC1 are the following:
>
> https://github.com/apache/kafka/commit/dc6bfa553e73ffccd1e60
> 4963e076c78d8ddcd69
>
> It's worth noting that starting in this version we are using a different
> version protocol with three digits: *major.minor.bug-fix*
>
> Any and all testing is welcome, but the following areas are worth
> highlighting:
>
> 1. Client developers should verify that their clients can produce/consume
> to/from 1.0.0 brokers (ideally with compressed and uncompressed data).
> 2. Performance and stress testing. Heroku and LinkedIn have helped with
> this in the past (and issues have been found and fixed).
> 3. End users can verify that their apps work correctly with the new
> release.
>
> This is a major version release of Apache Kafka. It includes 29 new KIPs.
> See the release notes and release plan 
> (*https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71764913
> *)
> for more details. A few feature highlights:
>
> * Java 9 support with significantly faster TLS and CRC32C implementations
> * JBOD improvements: disk failure only disables failed disk but not the
> broker (KIP-112/KIP-113 part I)
> * Controller improvements: reduced logging change to greatly accelerate
> admin request handling.
> * Newly added metrics across all the modules (KIP-164, KIP-168, KIP-187,
> KIP-188, KIP-196)
> * Kafka Streams API improvements (KIP-120 / 130 / 138 / 150 / 160 / 161),
> and drop compatibility "Evolving" annotations
>
> Release notes for the 1.0.0 release:
> *http://home.apache.org/~guozhang/kafka-1.0.0-rc3/RELEASE_NOTES.html
> *
>
>
>
> *** Please download, test and vote by Friday, October 20, 8pm PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> *http://home.apache.org/~guozhang/kafka-1.0.0-rc3/
> *
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc:
> *http://home.apache.org/~guozhang/kafka-1.0.0-rc3/javadoc/
> *
>
> * Tag to be voted upon (off 1.0 branch) is the 1.0.0-rc2 tag:
>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> 7774b0da8ead0d9edd1d4b2f7e1cd743af694112
>
>
> * Documentation:
> Note the documentation can't be pushed live due to changes that will not go
> live until the release. You can manually verify by downloading
> http://home.apache.org/~guozhang/kafka-1.0.0-rc3/
> kafka_2.11-1.0.0-site-docs.tgz
>
> I will update this thread with up coming Jenkins builds for this RC later,
> they are currently being executed and will be done tomorrow.
>
>
> /**
>
>
> Thanks,
> -- Guozhang
>
> --
> You received this message because you are subscribed to the Google Groups
> "kafka-clients" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to kafka-clients+unsubscr...@googlegroups.com.
> To post to this group, send email to kafka-clie...@googlegroups.com.
> Visit this group at https://groups.google.com/group/kafka-clients.
> To view this discussion on the web visit https://groups.google.com/d/
> msgid/kafka-clients/CAHwHRrU8QrK7cPSRAj7uaEQ1vgnwv
> o8Y5rJxa1-54dLqxLAsHw%40mail.gmail.com
> 
> .
> For more options, visit https://groups.google.com/d/optout.
>


Re: [VOTE] 0.11.0.0 RC1

2017-06-19 Thread Dana Powers
+1 -- passes kafka-python test suite.

-Dana

On Sun, Jun 18, 2017 at 10:49 PM, Magnus Edenhill 
wrote:

> +1 (non-binding)
>
> Passes librdkafka integration tests (v0.9.5 and master)
>
>
> 2017-06-19 0:32 GMT+02:00 Ismael Juma :
>
> > Hello Kafka users, developers and client-developers,
> >
> > This is the second candidate for release of Apache Kafka 0.11.0.0.
> >
> > This is a major version release of Apache Kafka. It includes 32 new KIPs.
> > See
> > the release notes and release plan (https://cwiki.apache.org/conf
> > luence/display/KAFKA/Release+Plan+0.11.0.0) for more details. A few
> > feature
> > highlights:
> >
> > * Exactly-once delivery and transactional messaging
> > * Streams exactly-once semantics
> > * Admin client with support for topic, ACLs and config management
> > * Record headers
> > * Request rate quotas
> > * Improved resiliency: replication protocol improvement and
> single-threaded
> > controller
> > * Richer and more efficient message format
> >
> > A number of issues have been resolved since RC0 and there are no known
> > blockers remaining.
> >
> > Release notes for the 0.11.0.0 release:
> > http://home.apache.org/~ijuma/kafka-0.11.0.0-rc1/RELEASE_NOTES.html
> >
> > *** Please download, test and vote by Thursday, June 22, 9am PT
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > http://kafka.apache.org/KEYS
> >
> > * Release artifacts to be voted upon (source and binary):
> > http://home.apache.org/~ijuma/kafka-0.11.0.0-rc1/
> >
> > * Maven artifacts to be voted upon:
> > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> >
> > * Javadoc:
> > http://home.apache.org/~ijuma/kafka-0.11.0.0-rc1/javadoc/
> >
> > * Tag to be voted upon (off 0.11.0 branch) is the 0.11.0.0 tag:
> > https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> > 4818d4e1cbef1a8e9c027100fef317077fb3fb99
> >
> > * Documentation:
> > http://kafka.apache.org/0110/documentation.html
> >
> > * Protocol:
> > http://kafka.apache.org/0110/protocol.html
> >
> > * Successful Jenkins builds for the 0.11.0 branch:
> > Unit/integration tests: https://builds.apache.org/job/
> > kafka-0.11.0-jdk7/167/
> > System tests: https://jenkins.confluent.io/job/system-test-kafka-0.11.0/
> > 16/
> > (all 274 tests passed, the reported failure was not related to the tests)
> >
> > /**
> >
> > Thanks,
> > Ismael
> >
>


Re: Resetting offsets

2017-05-04 Thread Dana Powers
kafka-python, yes.

On May 4, 2017 2:28 AM, "Paul van der Linden" <p...@sportr.co.uk> wrote:

Thanks everyone. @Dana is that using the kafka-python library?

On Thu, May 4, 2017 at 4:52 AM, Dana Powers <dana.pow...@gmail.com> wrote:

> Requires stopping your existing consumers, but otherwise should work:
>
> from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
>
> def reset_offsets(group_id, topic, bootstrap_servers):
>   consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,
> group_id=group_id)
>   consumer.assign([TopicPartition(topic, i) for i in
> consumer.partitions_for_topic(topic)])
>   consumer.commit({tp: OffsetAndMetadata(0, b'') for tp in
> consumer.assignment()})
>   consumer.close()
>
> On May 3, 2017 9:20 AM, "Ben Stopford" <b...@confluent.io> wrote:
>
> > Hu is correct, there isn't anything currently, but there is an active
> > proposal:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 122%3A+Add+Reset+Consumer+Group+Offsets+tooling
> >
> > On Wed, May 3, 2017 at 1:23 PM Hu Xi <huxi...@hotmail.com> wrote:
> >
> > > Seems there is no command line out of box, but if you could write a
> > simple
> > > Java client application that firstly calls 'seek' or 'seekToBeginning'
> to
> > > reset offsets to what you expect and then invoke commitSync to commit
> the
> > > offsets.
> > >
> > >
> > > 
> > > 发件人: Paul van der Linden <p...@sportr.co.uk>
> > > 发送时间: 2017年5月3日 18:28
> > > 收件人: users@kafka.apache.org
> > > 主题: Resetting offsets
> > >
> > > I'm trying to reset the offsets for all partitions for all topics for
a
> > > consumer group, but I can't seem to find a working way.
> > >
> > > The command line tool provides a tool to remove a consumer group
(which
> > > would be fine in this occasion), but this is not working with the
"new"
> > > style consumer groups. I tried to set consumer offsets with a client,
> > which
> > > also didn't work (we are using confluent-kafka-python with
librdkafka).
> > >
> > > Is there any way to reset the offsets (preferable with python or a
> > command
> > > line tool)?
> > >
> > > Thanks
> > >
> >
>


Re: Resetting offsets

2017-05-03 Thread Dana Powers
Requires stopping your existing consumers, but otherwise should work:

from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata

def reset_offsets(group_id, topic, bootstrap_servers):
  consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,
group_id=group_id)
  consumer.assign([TopicPartition(topic, i) for i in
consumer.partitions_for_topic(topic)])
  consumer.commit({tp: OffsetAndMetadata(0, b'') for tp in
consumer.assignment()})
  consumer.close()

On May 3, 2017 9:20 AM, "Ben Stopford"  wrote:

> Hu is correct, there isn't anything currently, but there is an active
> proposal:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 122%3A+Add+Reset+Consumer+Group+Offsets+tooling
>
> On Wed, May 3, 2017 at 1:23 PM Hu Xi  wrote:
>
> > Seems there is no command line out of box, but if you could write a
> simple
> > Java client application that firstly calls 'seek' or 'seekToBeginning' to
> > reset offsets to what you expect and then invoke commitSync to commit the
> > offsets.
> >
> >
> > 
> > 发件人: Paul van der Linden 
> > 发送时间: 2017年5月3日 18:28
> > 收件人: users@kafka.apache.org
> > 主题: Resetting offsets
> >
> > I'm trying to reset the offsets for all partitions for all topics for a
> > consumer group, but I can't seem to find a working way.
> >
> > The command line tool provides a tool to remove a consumer group (which
> > would be fine in this occasion), but this is not working with the "new"
> > style consumer groups. I tried to set consumer offsets with a client,
> which
> > also didn't work (we are using confluent-kafka-python with librdkafka).
> >
> > Is there any way to reset the offsets (preferable with python or a
> command
> > line tool)?
> >
> > Thanks
> >
>


Re: [VOTE] Add REST Server to Apache Kafka

2016-10-26 Thread Dana Powers
-1


On Wed, Oct 26, 2016 at 9:23 AM, Shekar Tippur  wrote:
> +1
>
> Thanks


Re: [kafka-clients] [VOTE] 0.10.1.0 RC2

2016-10-12 Thread Dana Powers
+1; all kafka-python integration tests pass.

-Dana


On Wed, Oct 12, 2016 at 10:41 AM, Jason Gustafson  wrote:
> Hello Kafka users, developers and client-developers,
>
> One more RC for 0.10.1.0. I think we're getting close!
>
> Release plan:
> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.10.1.
>
> Release notes for the 0.10.1.0 release:
> http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/RELEASE_NOTES.html
>
> *** Please download, test and vote by Saturday, Oct 15, 11am PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/
>
> * Javadoc:
> http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/javadoc/
>
> * Tag to be voted upon (off 0.10.1 branch) is the 0.10.1.0-rc2 tag:
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=8702d66434b86092a3738472f9186d6845ab0720
>
> * Documentation:
> http://kafka.apache.org/0101/documentation.html
>
> * Protocol:
> http://kafka.apache.org/0101/protocol.html
>
> * Tests:
> Unit tests: https://builds.apache.org/job/kafka-0.10.1-jdk7/68/
> System tests:
> http://confluent-kafka-0-10-1-system-test-results.s3-us-west-2.amazonaws.com/2016-10-11--001.1476197348--apache--0.10.1--d981dd2/
>
> Thanks,
>
> Jason
>
> --
> You received this message because you are subscribed to the Google Groups
> "kafka-clients" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to kafka-clients+unsubscr...@googlegroups.com.
> To post to this group, send email to kafka-clie...@googlegroups.com.
> Visit this group at https://groups.google.com/group/kafka-clients.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/kafka-clients/CAJDuW%3DDk7Mi6ZsiniHcdbCCBdBhasjSeb7_N3EW%3D97OrfvFyew%40mail.gmail.com.
> For more options, visit https://groups.google.com/d/optout.


Re: Kafka Streams Python client?

2016-09-22 Thread Dana Powers
I am not aware of any active development. I did some initial work on a
branch on my laptop w/ basic functionality built on kafka-python. I'm
happy to ping you if/when I push to github. I expect that Confluent
may also be preparing something with their python client wrapper
around librdkafka, but I haven't heard anything specific.

-Dana

On Thu, Sep 22, 2016 at 12:45 PM, Samuel Taylor  wrote:
> Hi all,
>
> Is there active development on a Kafka Streams-esque API for Python? I'm
> interested in working on such a project.
>
> Jay Kreps' article "Introducing Kafka Streams" implies that introducing the
> API in other languages is upcoming, but I can't find anything about a
> Python-specific effort either in Jira or online.
>
> Thanks,
> Samuel
>
> --
> *Samuel Taylor*
> Data Science
>
> *Square Root, Inc. *
> Square-Root.com 


Consul / Zookeeper [was Re: any update on this?]

2016-09-19 Thread Dana Powers
[+ dev list]

I have not worked on KAFKA-1793 directly, but I believe most of the
work so far has been in removing all zookeeper dependencies from
clients. The two main areas for that are (1) consumer rebalancing, and
(2) administrative commands.

1) Consumer rebalancing APIs were added to the broker in 0.9. The "new
consumer" uses these apis and does not connect directly to zookeeper
to manage group leadership and rebalancing. So my understanding is
that this work is complete.

2) Admin commands are being converted to direct API calls instead of
direct zookeeper access with KIP-4. A small part of this project was
released in 0.10.0.0 and are open PRs for additional chunks that may
make it into 0.10.1.0 . If you are interested in helping or getting
involved, you can follow the KIP-4 discussions on the dev mailing
list.

When the client issues are completed I think the next step will be to
refactor the broker's zookeeper access (zkUtils) into an abstract
interface that could potentially be provided by consul or etcd. With
an interface in place, it should be possible to write an alternate
implementation of that interface for consul.

Hope this helps,

-Dana

On Mon, Sep 19, 2016 at 6:31 AM, Martin Gainty  wrote:
> Jens/Kant
> not aware of any shortfall with zookeeper so perhaps you can suggest 
> advantages for Consul vs Zookeeper?
> Maven (I am building, testing and running kafka internally with maven) 
> implements wagon-providers for URLConnection vs HttpURLConnection 
> wagonshttps://maven.apache.org/guides/mini/guide-wagon-providers.html
> Thinking a network_provider should work for integrating external network 
> provider. how would you architect this integration?
>
> would a configurable network-provider such as maven-wagon-provider work for 
> kafka?Martin
>
>> From: kanth...@gmail.com
>> To: users@kafka.apache.org
>> Subject: Re: any update on this?
>> Date: Mon, 19 Sep 2016 09:41:10 +
>>
>> Yes ofcourse the goal shouldn't be moving towards consul. It should just be
>> flexible enough for users to pick any distributed coordinated system.
>>
>>
>>
>>
>>
>>
>> On Mon, Sep 19, 2016 2:23 AM, Jens Rantil jens.ran...@tink.se
>> wrote:
>> I think I read somewhere that the long-term goal is to make Kafka
>>
>> independent of Zookeeper alltogether. Maybe not worth spending time on
>>
>> migrating to Consul in that case.
>>
>>
>>
>>
>> Cheers,
>>
>> Jens
>>
>>
>>
>>
>> On Sat, Sep 17, 2016 at 10:38 PM Jennifer Fountain 
>>
>> wrote:
>>
>>
>>
>>
>> > +2 watching.
>>
>> >
>>
>> > On Sat, Sep 17, 2016 at 2:45 AM, kant kodali  wrote:
>>
>> >
>>
>> > > https://issues.apache.org/jira/browse/KAFKA-1793
>>
>> > > It would be great to use Consul instead of Zookeeper for Kafka and I
>>
>> > think
>>
>> > > it
>>
>> > > would benefit Kafka a lot from the exponentially growing consul
>>
>> > community.
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> > --
>>
>> >
>>
>> >
>>
>> > Jennifer Fountain
>>
>> > DevOPS
>>
>> >
>>
>> --
>>
>>
>>
>>
>> Jens Rantil
>>
>> Backend Developer @ Tink
>>
>>
>>
>>
>> Tink AB, Wallingatan 5, 111 60 Stockholm, Sweden
>>
>> For urgent matters you can reach me at +46-708-84 18 32.
>


Re: Kafka Producer performance - 400GB of transfer on single instance taking > 72 hours?

2016-08-25 Thread Dana Powers
kafka-python includes some benchmarking scripts in
https://github.com/dpkp/kafka-python/tree/master/benchmarks

The concurrency and execution model of the JVM are both significantly
different than python. I would definitely recommend some background reading
on CPython GIL if you are interested on python threads being restricted to
a single CPU.

-Dana

On Thu, Aug 25, 2016 at 9:53 AM, Tauzell, Dave <dave.tauz...@surescripts.com>
wrote:
> I would write a python client that writes dummy data to kafka to measure
how fast you can write to Kafka without MongoDB in the mix. I've been doing
load testing recently can with 3 brokers I can write 100MB/s (using Java
clients).
>
> -Dave
>
> -Original Message-
> From: Dominik Safaric [mailto:dominiksafa...@gmail.com]
> Sent: Thursday, August 25, 2016 11:51 AM
> To: users@kafka.apache.org
> Subject: Re: Kafka Producer performance - 400GB of transfer on single
instance taking > 72 hours?
>
> Dear Dana,
>
>> I would recommend
>> other tools for bulk transfers.
>
>
> What tools/languages would you rather recommend then using Python?
>
> I could for sure accomplish the same by using the native Java Kafka
Producer API, but should this really affect the performance under the
assumption that the Kafka configuration stays as is?
>
>> On 25 Aug 2016, at 18:43, Dana Powers <dana.pow...@gmail.com> wrote:
>>
>> python is generally restricted to a single CPU, and kafka-python will
>> max out a single CPU well before it maxes a network card. I would
>> recommend other tools for bulk transfers. Otherwise you may find that
>> partitioning your data set and running separate python processes for
>> each will increase the overall CPU available and therefore the
throughput.
>>
>> One day I will spend time improving the CPU performance of
>> kafka-python, but probably not in the near term.
>>
>> -Dana
>
> This e-mail and any files transmitted with it are confidential, may
contain sensitive information, and are intended solely for the use of the
individual or entity to whom they are addressed. If you have received this
e-mail in error, please notify the sender by reply e-mail immediately and
destroy all copies of the e-mail and any attachments.


Re: Same partition number of different Kafka topcs

2016-08-03 Thread Dana Powers
kafka-python by default uses the same partitioning algorithm as the Java
client. If there are bugs, please let me know. I think the issue here is
with the default nodejs partitioner.

-Dana
On Aug 3, 2016 7:03 PM, "Jack Huang"  wrote:

I see, thanks for the clarification.

On Tue, Aug 2, 2016 at 10:07 PM, Ewen Cheslack-Postava 
wrote:

> Jack,
>
> The partition is always selected by the client -- if it weren't the
brokers
> would need to forward requests since different partitions are handled by
> different brokers. The only "default Kafka partitioner" is the one that
you
> could consider "standardized" by the Java client implementation. Some
> client libraries will make this pluggable like the Java client does so you
> could use a compatible implementation.
>
> -Ewen
>
> On Fri, Jul 29, 2016 at 11:27 AM, Jack Huang  wrote:
>
> > Hi Gerard,
> >
> > After further digging, I found that the clients we are using also have
> > different partitioner. The Python one uses murmur2 (
> >
> >
>
https://github.com/dpkp/kafka-python/blob/master/kafka/partitioner/default.py
> > ),
> > and the NodeJS one uses its own impl (
> > https://github.com/SOHU-Co/kafka-node/blob/master/lib/partitioner.js).
> > Does
> > Kafka delegate the task of partitioning to client? From their
> documentation
> > it doesn't seem like they provide an option to select the "default Kafka
> > partitioner".
> >
> > Thanks,
> > Jack
> >
> >
> > On Fri, Jul 29, 2016 at 7:42 AM, Gerard Klijs 
> > wrote:
> >
> > > The default partitioner will take the key, make the hash from it, and
> do
> > a
> > > modulo operation to determine the partition it goes to. Some things
> which
> > > might cause it to and up different for different topics:
> > > - partition number are not the same (you already checked)
> > > - key is not exactly the same, for example one might have a space
after
> > the
> > > id
> > > - the other topic is configured to use another partitioner
> > > - the serialiser for the key is different for both topics, since the
> hash
> > > is created based on the bytes of key of the serialised message
> > > - all the topics use another partitioner (for example round robin)
> > >
> > > On Thu, Jul 28, 2016 at 9:11 PM Jack Huang  wrote:
> > >
> > > > Hi all,
> > > >
> > > > I have an application where I need to join events from two different
> > > > topics. Every event is identified by an id, which is used as the key
> > for
> > > > the topic partition. After doing some experiment, I observed that
> > events
> > > > will go into different partitions even if the number of partitions
> for
> > > both
> > > > topics are the same. I can't find any documentation on this point
> > though.
> > > > Does anyone know if this is indeed the case?
> > > >
> > > >
> > > > Thanks,
> > > > Jack
> > > >
> > >
> >
>
>
>
> --
> Thanks,
> Ewen
>


Re: newbie does python del, gc.collect() release all resources?

2016-08-01 Thread Dana Powers
Are you asking about a Kafka python driver? Or are referring to pyspark?
On Aug 1, 2016 10:03, "Andy Davidson"  wrote:

> I am new to python.
>
> I find my self working with several data frames at the same time. I have
> run
> into some driver memory problems and want to make sure I release all
> resource as soon as possible.
>
> 1. should I be calling del and gc.collect() ?
> 2. If a dataframe was cached do I need to explicitly call unpersist() or
> will del,gc.collect() do this for me?
>
> Thanks
>
> Andy
>
>
>
>
>


Re: [kafka-clients] [VOTE] 0.10.0.1 RC0

2016-07-29 Thread Dana Powers
+1

tested against kafka-python integration test suite = pass.

Aside: as the scope of kafka gets bigger, it may be useful to organize
release notes into functional groups like core, brokers, clients,
kafka-streams, etc. I've found this useful when organizing
kafka-python release notes.

-Dana

On Fri, Jul 29, 2016 at 7:46 AM, Ismael Juma  wrote:
> Hello Kafka users, developers and client-developers,
>
> This is the first candidate for the release of Apache Kafka 0.10.0.1. This
> is a bug fix release and it includes fixes and improvements from 50 JIRAs
> (including a few critical bugs). See the release notes for more details:
>
> http://home.apache.org/~ijuma/kafka-0.10.0.1-rc0/RELEASE_NOTES.html
>
> *** Please download, test and vote by Monday, 1 August, 8am PT ***
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~ijuma/kafka-0.10.0.1-rc0/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging
>
> * Javadoc:
> http://home.apache.org/~ijuma/kafka-0.10.0.1-rc0/javadoc/
>
> * Tag to be voted upon (off 0.10.0 branch) is the 0.10.0.1-rc0 tag:
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=0c2322c2cf7ab7909cfd8b834d1d2fffc34db109
>
> * Documentation:
> http://kafka.apache.org/0100/documentation.html
>
> * Protocol:
> http://kafka.apache.org/0100/protocol.html
>
> * Successful Jenkins builds for the 0.10.0 branch:
> Unit/integration tests: https://builds.apache.org/job/kafka-0.10.0-jdk7/170/
> System tests: https://jenkins.confluent.io/job/system-test-kafka-0.10.0/130/
>
> Thanks,
> Ismael
>
> --
> You received this message because you are subscribed to the Google Groups
> "kafka-clients" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to kafka-clients+unsubscr...@googlegroups.com.
> To post to this group, send email to kafka-clie...@googlegroups.com.
> Visit this group at https://groups.google.com/group/kafka-clients.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/kafka-clients/CAD5tkZYz8fbLAodpqKg5eRiCsm4ze9QK3ufTz3Q4U%3DGs0CRb1A%40mail.gmail.com.
> For more options, visit https://groups.google.com/d/optout.


Re: Compatibility question

2016-07-12 Thread Dana Powers
Yes. The expectation is that brokers are backwards compatible: new broker
releases should work with old clients (to 0.8, but not back to 0.7).

The opposite, backwards compatible clients, is generally not supported: new
clients may not always work with old brokers (except for, *cough* *cough*,
kafka-python, which does maintain backwards compatible client releases from
0.10 to 0.8).
On Jul 12, 2016 12:32 PM, "Chris Barlock"  wrote:

Can the kafka-clients Maven bundle at version 0.8.2.1 be used to
communicate with a Kafka 0.10.0.0 server?

Chris


Re: consumer.poll() takes approx. 30 seconds - 0.9 new consumer api

2016-06-18 Thread Dana Powers
Is your test reusing a group name? And if so, are your consumer instances
gracefully leaving? This may cause subsequent 'rebalance' operations to
block until those old consumers check-in or the session timeout happens
(30secs)

-Dana
On Jun 18, 2016 8:56 PM, "Rohit Sardesai" 
wrote:

> I am using the group management feature of Kafka 0.9 to handle partition
> assignment to consumer instances. I use the subscribe() API to subscribe to
> the topic I am interested in reading data from.  I have an environment
> where I have 3 Kafka brokers  with a couple of Zookeeper nodes . I created
> a topic with 9 partitions . The performance tests attempt to send 9
> parallel poll() requests to the Kafka brokers every second. The results
> show that each poll() operation takes around 30 seconds for the first time
> it polls and returns 0 records. Also , when I print the partition
> assignment to this consumer instance , I see no partitions assigned to it.
> The next poll() does return quickly ( ~ 10-20 ms) with data and some
> partitions assigned to it.
>
> With each consumer taking 30 seconds , the performance tests report very
> low throughput since I run the tests for around 1000 seconds out which I
> produce messages on the topic for the complete duration and I start the
> parallel consume requests after 400 seconds. So out of 400 seconds , with 9
> consumers taking 30 seconds each , around 270 seconds are spent in the
> first poll without any data. Is this because of the re-balance operation
> that the consumers are blocked on the poll() ? What is the best way to use
> poll()  if I have to serve many parallel requests per second ?  Should I
> prefer manual assignment of partitions in this case instead of relying on
> re-balance ?
>
>
> Regards,
>
> Rohit Sardesai
>
>


Re: Python kafka client benchmarks

2016-06-15 Thread Dana Powers
Very nice!

On Wed, Jun 15, 2016 at 6:40 PM, John Dennison  wrote:
> My team has published a post comparing python kafka clients. Might be of
> interest to python users.
>
> http://activisiongamescience.github.io/2016/06/15/Kafka-Client-Benchmarking/


Re: error: ... protocols are incompatible with those of existing members ??

2016-06-10 Thread Dana Powers
Barry - i believe the error refers to the consumer group "protocol" that is
used to decide which partitions get assigned to which consumers. The way it
works is that each consumer says it wants to join X group and it can
support protocols (1, 2, 3...). The broker looks at all consumers in group
X and picks a protocol that all can support. If there is no common protocol
you would see this error. Example protocols are 'roundrobin' and 'range' .

You should check the configuration of the group protocol for each consumer
and also check that you don't have extra consumers in the group, perhaps
because the group id is reused / common.

Hope this helps,

-Dana
On Jun 10, 2016 4:24 AM, "Barry Kaplan"  wrote:

I delete the group using kafka-consumer-groups.sh --delete and still I get
the error.


Re: Kafka Protocol API versions

2016-05-17 Thread Dana Powers
I'd also add that there is some discussion of the mixed-broker issue
on the KIP-35 wiki page as well:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-35+-+Retrieving+protocol+version#KIP-35-Retrievingprotocolversion-Aclientdeveloperwantstoaddsupportforanewfeature

Of course, since 0.9 (and earlier) brokers do not support ApiVersion,
you're still in a bit of a pickle. I think like most shiny new stuff,
we'll probably have to wait and see how different clients approach the
problem and see what works best over time. The 2 non-java clients that
I'm aware of that are attempting to tackle broker
backwards-compatibility are librdkafka (Magnus) and kafka-python
(myself). Asish or others can correct me if I'm wrong here, but I
believe the java client is not attempting to do this yet outside of a
single request flow (SASL authentication).

FWIW, the approach we take in kafka-python is to pick a single
version-state for the cluster and use it for all brokers. At the
present that version-state is chosen during bootstrap by checking the
first contacted broker. That works great for most clusters, where all
brokers are the same version. A more robust approach might be to check
the versions for all brokers in the cluster, not rely on the first
contacted, and pick the most conservative / lowest version-state
found. That's not likely to be perfect, but would support clients that
bootstrap into a mixed cluster. But in any event, providing a manual
override for users to specifically configure the version-state and
skip the dynamic broker version checks is always a good fallback for
managing cluster upgrades.

HTH,

-Dana

On Tue, May 17, 2016 at 6:21 AM, Ismael Juma  wrote:
> Hi Oleksiy,
>
> Ashish has just created a PR to add some information about how to use
> KIP-35:
>
> https://github.com/apache/kafka/pull/1395
>
> Maybe you can have a look and leave a comment with areas that are not
> covered by the first draft.
>
> Ismael
>
> On Tue, May 17, 2016 at 1:34 PM, Oleksiy Krivoshey 
> wrote:
>
>> Given lots of protocol changes in 0.10 compared to 0.9, with many API
>> methods now having up to 3 possible versions, what is the suggested way for
>> the client library to coordinate with Kafka brokers? I can see there is
>> ApiVersions request which is great, but what happens if Kafka cluster
>> consists of mixed 0.9 and 0.10 brokers (for example during upgrade), should
>> the library keep and refresh the state of supported APIs for each broker
>> the same way it does for topic metadata?
>> Another example is the Metadata request (Api Key 3) which has two versions,
>> how would the client library in the first place know which version of the
>> request to send to each of the initial brokers?
>>
>> I can't see an easy way for the client library to support for example both
>> 0.9 and 0.10 without complicated state switches and error handling. I'm not
>> talking about official Java client, but about other language libraries.
>>
>> What would you suggest? Thanks!
>>


RE: Client Protocols

2016-05-06 Thread Dana Powers
If you're attempting to run a heterogeneous consumer group (different
clients joining same group) then you must take care that the group
protocols have been implemented the same. kafka-python attempts to mirror
the Java client exactly. I haven't looked at pykafka in a while, but it is
possible that they ship a different implementation.

Happy to follow up on github/dpkp/kafka-python if you want to open a ticket
there.

-Dana
On May 6, 2016 06:04, "Martin Gainty"  wrote:

>
>
>
> > To: users@kafka.apache.org
> > From: michael.to...@headforwards.com
> > Subject: Client Protocols
> > Date: Fri, 6 May 2016 09:26:59 +0100
> >
> >
> > Hi,
> >
> > I'm doing some testing with two different kafka clients: pykafka and
> > kafkapython
> >
> > When switching between client library, I get an error:
> > InconsistentGroupProtocol
> >
> > I thought I was using latest protocol in both cases, but seems not.
> >
> > What different client protocols are there and is there a way I can check
> > which is being used?
> MG>I *thought* the protocol need to be listed in GroupProtocols array
> during Join Group Request
>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-TheProtocol
> >
> >
> > Regards,
> >
> > Mike
>


Re: KafkaProducer block on send

2016-05-04 Thread Dana Powers
I think changes of this sort (design changes as opposed to bugs) typically
go through a KIP process before work is assigned. You might consider
starting a KIP discussion and see if there is interest in pursuing your
proposed changes.

-Dana
On May 4, 2016 7:58 AM, "Oleg Zhurakousky" <ozhurakou...@hortonworks.com>
wrote:

> Indeed it is.
>
> Oleg
> > On May 4, 2016, at 10:54 AM, Paolo Patierno <ppatie...@live.com> wrote:
> >
> > It's sad that after almost one month it's still "unassigned" :-(
> >
> > Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> > Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
> > Twitter : @ppatierno
> > Linkedin : paolopatierno
> > Blog : DevExperience
> >
> >> Subject: Re: KafkaProducer block on send
> >> From: ozhurakou...@hortonworks.com
> >> To: users@kafka.apache.org
> >> Date: Wed, 4 May 2016 14:47:25 +
> >>
> >> Sure
> >>
> >> Here are both:
> >> https://issues.apache.org/jira/browse/KAFKA-3539
> >> https://issues.apache.org/jira/browse/KAFKA-3540
> >>
> >> On May 4, 2016, at 3:24 AM, Paolo Patierno <ppatie...@live.com ppatie...@live.com>> wrote:
> >>
> >> Hi Oleg,
> >>
> >> can you share the JIRA link here because I totally agree with you.
> >> For me the send() should be totally asynchronous and not blocking for
> the max.block.ms timeout.
> >>
> >> Currently I'm using the overload with callback that, of course, isn't
> called if the send() fails due to timeout.
> >> In order to catch this scenario I need to do the following :
> >>
> >> Future future = this.producer.send();
> >>
> >> if (future.isDone()) {
> >>   try {
> >>   future.get();
> >>   } catch (InterruptedException e) {
> >>   // TODO Auto-generated catch block
> >>   e.printStackTrace();
> >>   } catch (ExecutionException e) {
> >>   // TODO Auto-generated catch block
> >>   e.printStackTrace();
> >>   }
> >>   }
> >>
> >> I don't like it so much ...
> >>
> >> Thanks,
> >> Paolo.
> >>
> >> Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
> >> Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
> >> Twitter : @ppatierno
> >> Linkedin : paolopatierno
> >> Blog : DevExperience
> >>
> >> Subject: Re: KafkaProducer block on send
> >> From: ozhurakou...@hortonworks.com<mailto:ozhurakou...@hortonworks.com>
> >> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> >> Date: Mon, 11 Apr 2016 19:42:17 +
> >>
> >> Dana
> >>
> >> Thanks for the explanation, but it sounds more like a workaround since
> everything you describe could be encapsulated within the Future itself.
> After all it "represents the result of an asynchronous computation"
> >>
> >> executor.submit(new Callable() {
> >>@Override
> >>public RecordMetadata call() throws Exception {
> >>// first make sure the metadata for the topic is available
> >>long waitedOnMetadataMs = waitOnMetadata(record.topic(),
> this.maxBlockTimeMs);
> >>. . .
> >>  }
> >> });
> >>
> >>
> >> The above would eliminate the confusion and keep user in control where
> even a legitimate blockage could be interrupted/canceled etc., based on
> various business/infrastructure requirements.
> >> Anyway, I’ll raise the issue in JIRA and reference this thread
> >>
> >> Cheers
> >> Oleg
> >>
> >> On Apr 8, 2016, at 10:31 AM, Dana Powers <dana.pow...@gmail.com dana.pow...@gmail.com><mailto:dana.pow...@gmail.com>> wrote:
> >>
> >> The prior discussion explained:
> >>
> >> (1) The code you point to blocks for a maximum of max.block.ms, which
> is
> >> user configurable. It does not block indefinitely with no user control
> as
> >> you suggest. You are free to configure this to 0 if you like at it will
> not
> >> block at all. Have you tried this like I suggested before?
> >>
> >> (2) Even if you convinced people to remove waitOnMetadata, the send
> method
> >> *still* blocks on memory back pressure (also configured by max.block.ms
> ).
> >> This is for goo

Re: 0.8.x consumer group protocol

2016-05-04 Thread Dana Powers
0.8 clients manage groups by connecting directly to zookeeper and
implementing shared group management code. There are no broker APIs used.

0.9 clients manage groups using new kafka broker APIs. These clients no
longer connect directly to zookeeper. JoinGroupRequest is an 0.9 api.

For an 0.8ish reimplementation, you might take a look at pykafka. There is
also the old Kafka Java code. I'm not aware of great docs, though. FWIW,
there are not many unofficial clients that have implemented group
management for 0.8 (unless they are just wrapping the official library
running on the jvm).

-Dana
On May 4, 2016 02:52, "Zaiming Shi"  wrote:

Hi there!

I'm investigating what it means to implement consumer group protocol for
0.8.
However all the documents I can find on line is for 0.9
e.g.
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

Also, in kafka code base, JOIN_GROUP_REQUEST_V0 schema in 0.8 is different
from 0.9.
Why is the protocol upgraded without bumping the JoinGroupRequest API
version number?

Anyone have some tips on where to start?

Regards
-Zaiming


Re: Receiving ILLEGAL_GENERATION, but I can't find information on the exception.

2016-05-02 Thread Dana Powers
It means there was a consumer group rebalance that this consumer missed.
You may be spending too much time in msg processing between poll() calls.

-Dana


Re: Reg slack channel

2016-04-26 Thread Dana Powers
There's also an irc channel on freenode #apache-kafka that hosts some
periodic user discussion.

On Tue, Apr 26, 2016 at 7:17 PM, Harsha  wrote:
> We use apache JIRA and mailing lists for any discussion.
> Thanks,
> Harsha
>
> On Tue, Apr 26, 2016, at 06:20 PM, Kanagha wrote:
>> Hi,
>>
>> Is there a slack channel for discussing issues related to Kafka? I would
>> like to get an invite. Thanks!
>>
>>
>> --
>> Kanagha


Re: kafka.common.QueueFullException

2016-04-25 Thread Dana Powers
This error is thrown by the old scala producer, correct? You might
also consider switching to the new java producer. It handles this a
bit differently by blocking during the send call until the internal
buffers can enqueue a new message. There are a few more configuration
options available as well, if I recall.

-Dana

On Mon, Apr 25, 2016 at 12:42 PM, Gwen Shapira  wrote:
> few more things you can do:
>
> * Increase "batch.size" - this will give you a larger queue and
> usually better throughput
> * More producers - very often the bottleneck is not in Kafka at all.
> Maybe its the producer? or the network?
> * Increate max.inflight.requests for the producer - it will allow
> sending more requests concurrently and perhaps increase throughput.
>
> The important bit is: Don't add more brokers if you don't have
> information that the broker is the bottleneck.
>
> Gwen
>
> On Mon, Apr 25, 2016 at 12:06 PM, Saurabh Kumar  wrote:
>> Thanks Alex and Sorry for the delayed response. We never could solv this
>> problem so am resurrecting the thread.  As i understand, from a
>> client/tenant which is producing messages to a kafka topic, there is not
>> much that can be controlled. I assume "Event queue is full of unsent
>> messages" signify that :
>> 1) We need to expand our cluster by adding more resources/brokers
>> 2) We need to add a blocking behaviour incase we see that the average
>> volume of messages is sustainable, and its just the spikes that are causing
>> problems.
>>
>> --Saurabh
>>
>> On Thu, Feb 18, 2016 at 11:51 PM, John Yost  wrote:
>>
>>> Hi Alex,
>>>
>>> Great info, thanks! I asked a related question this AM--is a full queue
>>> possibly a symptom of back pressure within Kafka?
>>>
>>> --John
>>>
>>> On Thu, Feb 18, 2016 at 12:38 PM, Alex Loddengaard 
>>> wrote:
>>>
>>> > Hi Saurabh,
>>> >
>>> > This is occurring because the produce message queue is full when a
>>> produce
>>> > request is made. The size of the queue is configured
>>> > via queue.buffering.max.messages. You can experiment with increasing this
>>> > (which will require more JVM heap space), or fiddling with
>>> > queue.enqueue.timeout.ms to control the blocking behavior when the queue
>>> > is
>>> > full. Both of these configuration options are explained here:
>>> >
>>> > https://kafka.apache.org/08/configuration.html
>>> >
>>> > I didn't quite follow your last paragraph, so I'm not sure if the
>>> following
>>> > advice is applicable to you or not. You may also experiment with adding
>>> > more producers (either on the same or different machines).
>>> >
>>> > I hope this helps.
>>> >
>>> > Alex
>>> >
>>> > On Thu, Feb 18, 2016 at 2:12 AM, Saurabh Kumar 
>>> > wrote:
>>> >
>>> > > Hi,
>>> > >
>>> > > We have a Kafka server deployment shared between multiple teams and i
>>> > have
>>> > > created a topic with multiple partitions on it for pushing some JSON
>>> > data.
>>> > >
>>> > > We have multiple such Kafka producers running from different machines
>>> > which
>>> > > produce/push data to a Kafka topic. A lot of times i see the following
>>> > > exception in the logs : "*Event queue is full of unsent messages, could
>>> > not
>>> > > send event"*
>>> > >
>>> > > Any idea how to solve this ? We can not synchronise the volume or
>>> timing
>>> > of
>>> > > different Kafka producers across machines and between multiple
>>> processes.
>>> > > There is a limit on maximum number of concurrent processes (kafka
>>> > producer)
>>> > >  that can run on a mchine but it is only going to increase with time as
>>> > we
>>> > > scale. What is the right way to solve this problem ?
>>> > >
>>> > > Thanks,
>>> > > Saurabh
>>> > >
>>> >
>>> >
>>> >
>>> > --
>>> > *Alex Loddengaard | **Solutions Architect | Confluent*
>>> > *Download Apache Kafka and Confluent Platform: www.confluent.io/download
>>> > *
>>> >
>>>


Re: "Close the consumer, waiting indefinitely for any needed cleanup."

2016-04-11 Thread Dana Powers
If you pay me, I might write the code for you, too ;)

-Dana

On Mon, Apr 11, 2016 at 1:34 PM, Oleg Zhurakousky
<ozhurakou...@hortonworks.com> wrote:
> Dana, I am sorry, but I can’t accept that as an answer.
> Regardless, the API exposed to the end user must never “block indefinitely”. 
> And saying you have to move a few mountains to work around what most would 
> perceive to be a design issue is not the acceptable answer.
> I’ll raise the JIRA
>
> Cheers
> Oleg
>
>> On Apr 11, 2016, at 4:25 PM, Dana Powers <dana.pow...@gmail.com> wrote:
>>
>> If you wanted to implement a timeout, you'd need to wire it up in
>> commitOffsetsSync and plumb the timeout from Coordinator.close() and
>> Consumer.close(). That's your answer. Code changes required.
>>
>> -Dana
>>
>> On Mon, Apr 11, 2016 at 1:17 PM, Oleg Zhurakousky
>> <ozhurakou...@hortonworks.com> wrote:
>>> Dana
>>> Everything your are saying does not answer my question of how to interrupt 
>>> a potential deadlock artificially forced upon users of KafkaConsumer API.
>>> I may be OK with duplicate messages, I may be OK with data loss and I am OK 
>>> with doing an extra work to do all kind of things. I am NOT OK with getting 
>>> stuck ok close() call when I really want my system that uses KafkaConsumer 
>>> to exit. So Consumer.close(timeout) is what I was really asking about.
>>> So, is there a way now to interrupt such block?
>>>
>>> Cheers
>>> Oleg
>>>
>>>> On Apr 11, 2016, at 4:08 PM, Dana Powers <dana.pow...@gmail.com> wrote:
>>>>
>>>> Not a typo. This happens because the consumer closes the coordinator,
>>>> and the coordinator attempts to commit any pending offsets
>>>> synchronously in order to avoid duplicate message delivery. The
>>>> Coordinator method commitOffsetsSync will retry indefinitely unless a
>>>> non-recoverable error is encountered. If you wanted to implement a
>>>> timeout, you'd need to wire it up in commitOffsetsSync and plumb the
>>>> timeout from Coordinator.close() and Consumer.close(). It doesn't look
>>>> terribly complicated, but you should check on the dev list for more
>>>> opinions.
>>>>
>>>> -Dana
>>>>
>>>> On Mon, Apr 11, 2016 at 12:45 PM, Oleg Zhurakousky
>>>> <ozhurakou...@hortonworks.com> wrote:
>>>>> The subject line is from the javadoc of the new KafkaConsumer.
>>>>> Is this for real? I mean I am hoping the use of ‘indefinitely' is a typo.
>>>>> In any event if it is indeed true, how does one break out of indefinitely 
>>>>> blocking consumer.close() invocation?
>>>>>
>>>>> Cheers
>>>>> Oleg
>>>>
>>>
>>
>


Re: "Close the consumer, waiting indefinitely for any needed cleanup."

2016-04-11 Thread Dana Powers
If you wanted to implement a timeout, you'd need to wire it up in
commitOffsetsSync and plumb the timeout from Coordinator.close() and
Consumer.close(). That's your answer. Code changes required.

-Dana

On Mon, Apr 11, 2016 at 1:17 PM, Oleg Zhurakousky
<ozhurakou...@hortonworks.com> wrote:
> Dana
> Everything your are saying does not answer my question of how to interrupt a 
> potential deadlock artificially forced upon users of KafkaConsumer API.
> I may be OK with duplicate messages, I may be OK with data loss and I am OK 
> with doing an extra work to do all kind of things. I am NOT OK with getting 
> stuck ok close() call when I really want my system that uses KafkaConsumer to 
> exit. So Consumer.close(timeout) is what I was really asking about.
> So, is there a way now to interrupt such block?
>
> Cheers
> Oleg
>
>> On Apr 11, 2016, at 4:08 PM, Dana Powers <dana.pow...@gmail.com> wrote:
>>
>> Not a typo. This happens because the consumer closes the coordinator,
>> and the coordinator attempts to commit any pending offsets
>> synchronously in order to avoid duplicate message delivery. The
>> Coordinator method commitOffsetsSync will retry indefinitely unless a
>> non-recoverable error is encountered. If you wanted to implement a
>> timeout, you'd need to wire it up in commitOffsetsSync and plumb the
>> timeout from Coordinator.close() and Consumer.close(). It doesn't look
>> terribly complicated, but you should check on the dev list for more
>> opinions.
>>
>> -Dana
>>
>> On Mon, Apr 11, 2016 at 12:45 PM, Oleg Zhurakousky
>> <ozhurakou...@hortonworks.com> wrote:
>>> The subject line is from the javadoc of the new KafkaConsumer.
>>> Is this for real? I mean I am hoping the use of ‘indefinitely' is a typo.
>>> In any event if it is indeed true, how does one break out of indefinitely 
>>> blocking consumer.close() invocation?
>>>
>>> Cheers
>>> Oleg
>>
>


Re: "Close the consumer, waiting indefinitely for any needed cleanup."

2016-04-11 Thread Dana Powers
Not a typo. This happens because the consumer closes the coordinator,
and the coordinator attempts to commit any pending offsets
synchronously in order to avoid duplicate message delivery. The
Coordinator method commitOffsetsSync will retry indefinitely unless a
non-recoverable error is encountered. If you wanted to implement a
timeout, you'd need to wire it up in commitOffsetsSync and plumb the
timeout from Coordinator.close() and Consumer.close(). It doesn't look
terribly complicated, but you should check on the dev list for more
opinions.

-Dana

On Mon, Apr 11, 2016 at 12:45 PM, Oleg Zhurakousky
 wrote:
> The subject line is from the javadoc of the new KafkaConsumer.
> Is this for real? I mean I am hoping the use of ‘indefinitely' is a typo.
> In any event if it is indeed true, how does one break out of indefinitely 
> blocking consumer.close() invocation?
>
> Cheers
> Oleg


RE: KafkaProducer block on send

2016-04-08 Thread Dana Powers
The prior discussion explained:

(1) The code you point to blocks for a maximum of max.block.ms, which is
user configurable. It does not block indefinitely with no user control as
you suggest. You are free to configure this to 0 if you like at it will not
block at all. Have you tried this like I suggested before?

(2) Even if you convinced people to remove waitOnMetadata, the send method
*still* blocks on memory back pressure (also configured by max.block.ms).
This is for good reason:

while True:
  producer.send(msg)

Can quickly devour all of you local memory and crash your process if the
outflow rate decreases, say if brokers go down or network partition occurs.

-Dana
I totally agree with Oleg.

As documentation says the producers send data in an asynchronous way and it
is enforced by the send method signature with a Future returned.
It can't block indefinitely without returning to the caller.
I'm mean, you can decide that the code inside the send method blocks
indefinitely but in an "asynchronous way", it should first return a Future
to the caller that can handle it.

Paolo.

Paolo PatiernoSenior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoTMicrosoft Azure Advisor
Twitter : @ppatierno
Linkedin : paolopatierno
Blog : DevExperience

> Subject: KafkaProducer block on send
> From: ozhurakou...@hortonworks.com
> To: users@kafka.apache.org
> Date: Thu, 7 Apr 2016 13:04:49 +
>
> I know it’s been discussed before, but that conversation never really
concluded with any reasonable explanation, so I am bringing it up again as
I believe this is a bug that would need to be fixed in some future release.
> Can someone please explain the rational for the following code in
KafkaProducer:
>
> @Override
> public Future send(ProducerRecord record, Callback
callback) {
> try {
> // first make sure the metadata for the topic is available
> long waitedOnMetadataMs = waitOnMetadata(record.topic(),
this.maxBlockTimeMs);
> . . .
> }
>
> By definition the method that returns Future implies that caller decides
how long to wait for the completion via Future.get(TIMETOWAIT). In this
case there is an explicit blocking call (waitOnMetadata), that can hang
infinitely (regardless of the reasons) which essentially results in user’s
code deadlock since the Future may never be returned in the first place.
>
> Thoughts?
>
> Oleg
>


Re: Reading consumer offsets in Kafka 0.9.0 and later

2016-03-31 Thread Dana Powers
Hi Marcos,

ConsumerMetadata* was renamed to GroupCoordinator* in 0.9 . the api
protocol is unchanged.

However, the new Java clients use non-blocking network channels. It looks
like the example code may reference the deprecated, or
soon-to-be-deprecated, Scala client.

Rather than roll your own monitoring, you might take a look at some of the
options already out there:

https://github.com/linkedin/Burrow
https://github.com/quantifind/KafkaOffsetMonitor

You might also consider monitoring JMX data (though for consumer lag this
will be limited to Java clients), described well here:

http://docs.confluent.io/2.0.1/kafka/monitoring.html

Hope this helps,

-Dana
We're building an application to monitor our Kafka consumers, and since the
offsets are now not stored in Zookeeper as before, I was trying to use the
following example in the Kafka website:

https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka

However, it seems either the example is stale, or it doesn't apply for the
latest 0.9+ Kafka releases.  Specifically, the ConsumerMetadataRequest
doesn't seem to exist in the latest releases, so I can't even instantiate
the class.

Can you point us to a current example of how we can get consumer offsets
from Kafka?

Thanks,

Marcos Juarez


Re: KafkaProducer "send" blocks on first attempt with Kafka server offline

2016-03-29 Thread Dana Powers
Somewhat of an aside, but you might note that the kafka producer is
intended to block during send() as backpressure on memory allocation.
This is admittedly different than blocking on metadata, but it is worth
considering that the premise that send() should *never* block because
it returns a Future seems fundamentally at odds with the current design.

In any event, there is a configuration that you can tweak to set the max
time the producer will spend blocking in send(): max.block.ms

-Dana


On Tue, Mar 29, 2016 at 7:26 PM, Steven Wu  wrote:

> I also agree that returning a Future should never block. I have brought
> this up when 0.8.2 was first released for new Java  producer.
>
> As Oleg said, KafkaProducer can also block if metadata is not fetched. This
> is probably more often than offline broker, because metadata is loaded
> lazily when there is a first send attempt for the topic. In another word,
> first send to a topic will always block until metadata is fetched or timed
> out.
>
> We had to handle this in our wrapper code. Basically, we check if metadata
> not available, we put msg into a queue and drain the queue from a diff
> thread.
>
> Thanks,
> Steven
>
>
> On Tue, Mar 29, 2016 at 4:59 AM, Oleg Zhurakousky <
> ozhurakou...@hortonworks.com> wrote:
>
> > I agree and considering that send(..) method returns Future one would
> > argue it must never block, otherwise what’s the point of returning Future
> > if you remove user’s ability to control how long are they willing to wait
> > and what to do when certain types of exception arise. Nevertheless it
> does
> > and it happens in the very first line of code:
> > // first make sure the metadata for the topic is available
> > waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
> > So I am curious as well as to what is the motivation for it as we’ve seen
> > the same symptoms multiple times.
> > Cheers
> > Oleg
> > On Mar 29, 2016, at 4:31 AM, Paolo Patierno  ppatie...@live.com>> wrote:
> >
> > Hello,
> > as documentation says, the KafkaProducer.send() method is asynchronous
> and
> > it just returns immediately.I found out that it's not so true when the
> > Kafka server it's trying to connect isn't online.Of course it happens
> only
> > on the first send() method invocation. It means that if the Kafka server
> > isn't reachable when my application starts for the first time, the send()
> > method isn't so asynchronous but it blocks.I know that it's trying to
> > connect to the Kafka server but why it doesn't save the message into the
> > buffer and returns immediately ?Is it a behavior or a bug ?
> > Thanks,Paolo
> >
> > Paolo PatiernoSenior Software Engineer
> >
> >
> > Windows Embedded & IoTMicrosoft Azure Advisor
> > Twitter : @ppatierno
> > Linkedin : paolopatierno
> > Blog : DevExperienceBlog : Embedded101
> >
> >
> >
>


Re: Documentation

2016-03-29 Thread Dana Powers
I also found the documentation difficult to parse when it came time to
implement group APIs. I ended up just reading the client source code and
trying api calls until it made sense.

My general description from off the top of my head:

(1) have all consumers submit a shared protocol_name string* and
bytes-serialized protocol_metadata, which includes the topic(s) for
subscription.
(2) All consumers should be prepared to parse a JoinResponse and identify
whether they have been selected as the group leader.
(3) The non-leaders go straight to SyncGroupRequest, with group_assignment
empty, and will wait get their partition assignments via the
SyncGroupResponse.
(4) Unlike the others, the consumer-leader will get the agreed
protocol_name and all member's metadata in its JoinGroupResponse.
(5) The consumer-leader must apply the protocol_name assignment strategy to
the member list + metadata (plus the cluster metadata), generating a set of
member assignments (member_id -> topic partitions).
(6)The consumer-leader then submits that member_assignment data in its
SyncGroupRequest.
(7) The broker will take the group member_assignment data and split it out
into each members SyncGroupResponse (including the consumer-leader).

At this point all members have their assignments and can start chugging
along.

I've tried to encapsulate the group request / response protocol info
cleanly into kafka-python, if that helps you:

https://github.com/dpkp/kafka-python/blob/1.0.2/kafka/protocol/group.py

I'd be willing to help improve the docs if someone points me at the "source
of truth". Or feel free to ping on irc #apache-kafka

-Dana

*for java client these are 'range' or 'roundrobin', but are opaque and can
be anything if you aren't interested in joining groups that have
heterogeneous clients. related, the protocol_metadata and member_metadata
are also opaque and technically can be anything you like. But sticking with
the documented metadata specs should in theory allow heterogenous clients
to cooperate.

On Tue, Mar 29, 2016 at 8:21 AM, Heath Ivie  wrote:

> Does anyone have better documentation around the group membership APIs?
>
> The information about the APIs are great at the beginning but get
> progressively  sparse towards then end.
>
> I am not finding enough information about the values of the request fields
> to join / sync the group.
>
> Can anyone help me or send me the some additional documentation?
>
> Heath Ivie
> Solutions Architect
>
>
> Warning: This e-mail may contain information proprietary to AutoAnything
> Inc. and is intended only for the use of the intended recipient(s). If the
> reader of this message is not the intended recipient(s), you have received
> this message in error and any review, dissemination, distribution or
> copying of this message is strictly prohibited. If you have received this
> message in error, please notify the sender immediately and delete all
> copies.
>


Re: Unexpected response on SyncGroup call

2016-03-26 Thread Dana Powers
The MemberAssignment bytes returned in SyncResponse should be the bytes
that your leader sent in its SyncRequest. <<0, 0, 0, 0>> is simply an empty
Bytes array (32 bit length of 0). The broker does not alter those bytes as
far as I know, so despite the protocol doc describing what a
MemberAssignment struct *should* look like, it really is up to your client
to encode and decode that struct. Can you check what bytes your leader code
is sending?

I assume you have this covered, but for completeness: each consumer should
check its JoinResponse to determine whether it has been selected as the
group leader. If the consumer is the leader, it needs to do the assignments
and send those back in its SyncRequest. All other consumers just send an
empty SyncRequest. SyncResponse for all consumers then includes the
assignments sent by the leader.

-Dana

On Sat, Mar 26, 2016 at 2:39 PM, Cees de Groot  wrote:

> I'm helping out on adding 0.9 stuff to the Elixir Kafka driver (
> https://github.com/kafkaex/kafka_ex), and I'm getting an unexpected
> response on an integration test that makes a simple SyncGroup call.
>
> In Elixir terms, I'm getting <<0, 0, 0, 3, 0, 0, 0, 0, 0, 0>> back. My
> interpretation:
>
> 32 bits correlation id = 0,0,0,3
> 16 bits error code = 0,0
>
> There's now 32 bits left. However, I'm expecting (following
>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-SyncGroupResponse
> )
> a MemberAssignment structure here, which is
>
> 16 bits of version (0)
> 32 bits of length (it's a simple integration test, so for now i'm happy to
> accept the answer "0 assignments for you, dude!" ;-)).
>
> Which clearly is more data than left in the response.
>
> Am I misinterpreting the documentation here?
>


Re: Using lz4 compression

2016-03-21 Thread Dana Powers
The LZ4 implementation "works" but has a framing bug that can make third
party client use difficult. See KAFKA-3160. If you only plan to use the
official Java client then that issue shouldn't be a problem.

-Dana
On Mar 21, 2016 12:26 PM, "Pete Wright"  wrote:

>
>
> On 03/17/2016 04:03 PM, Virendra Pratap Singh wrote:
>
>> More like getting a feel from the community about using lz4 for
>> compression? Has anyone used in the kafka setup.
>> I am aware that gzip and snappy are more older implementation and
>> regressed. Given that lz4 has better compression/decompression cycles
>> (though slightly less compression ratio), was thinking to leveraging the
>> same.
>> Regards,Virendra
>>
>>
> i use the lz4 compression algorithm quite extensively in conjunction with
> ZFS (ZFS can configure a filesystem to do transparent compression) and have
> not had any issues with it under load.  i've also found that it does a
> better job than snappy with negligible overhead that i've been able to
> observe.
>
> i tend to avoid gzip in production as i have measured more overhead using
> this algorithm, and generally speaking i've found lz4 to compact data
> better.
>
> i am not super familiar with the lz4 code as implemented in kafka, but i
> would assume the java implementation is pretty solid.
>
> hope this helps,
> -pete
>
> --
> Pete Wright
> Lead Systems Architect
> Rubicon Project
> pwri...@rubiconproject.com
> 310.309.9298
>


Re: kafka-python consumer not receiving messages

2016-02-05 Thread Dana Powers
Hi karthik - I usually address kafka-python specific questions via github.
Can you file an issue at github.com/dpkp/kafka-python and I will follow up
there?

My initial reaction is you should leave group_id=None if you want to
duplicate behavior of the console consumer.

-Dana
Hello,

I am having trouble with KafaConsumer to make it read from the beginning,
or from any other explicit offset.

Running the command line tools for the consumer for the same topic , I do
see messages with the `--from-beginning` option and it hangs otherwise

$ ./kafka-console-consumer.sh --zookeeper {localhost:port} --topic
{topic_name} --from-beginning

If I run it through python, it hangs, which I suspect to be caused by
incorrect consumer configs

consumer = KafkaConsumer(topic_name,
 bootstrap_servers=['localhost:9092'],
 group_id="test-consumer-group",
 auto_commit_enable=False,
 auto_offset_reset='smallest')

print "Consuming messages from the given topic"
for message in consumer:
print "Message", message
if message is not None:
print message.offset, message.value

print "Quit"

Output:
--
Consuming messages from the given topic


I am using kafka-python 0.9.5 and the broker runs kafka 8.2. Not sure what
the exact problem is. Any help would be appreciated.

Thanks,
Karthik


Re: Detecting broker version programmatically

2016-02-04 Thread Dana Powers
I think it is possible to infer a version based on a remote broker's
response to various newer api methods. I wrote some probes in python that
check ListGroups for 0.9, GroupCoordinatorRequest for 0.8.2,
OffsetFetchRequest for 0.8.1, and MetadataRequest for 0.8.0. It seems to
work well enough.

Also, this is related to KIP 35:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-35+-+Retrieving+protocol+version

-Dana
On Feb 4, 2016 8:28 PM, "Manikumar Reddy"  wrote:

> Currently it is available through JMX Mbean. It is not available on wire
> protocol/requests.
>
> Pending JIRAs related to this:
> https://issues.apache.org/jira/browse/KAFKA-2061
>
> On Fri, Feb 5, 2016 at 4:31 AM,  wrote:
>
> > Is there a way to detect the broker version (even at a high level 0.8 vs
> > 0.9) using the kafka-clients Java library?
> >
> > --
> > Best regards,
> > Marko
> > www.kafkatool.com
> >
> >
>


Re: Protocol Question

2016-02-03 Thread Dana Powers
Hi Heath, a few comments:

(1) you should be looping on brokerCount
(2) you are missing the topic array count (4 bytes), and loop
(3) topic error code is int16, so only 2 bytes not 4
(4) you are missing the partition metadata array count (4 bytes), and loop
(5) you are missing the replicas and isr parsing.

-Dana

On Wed, Feb 3, 2016 at 6:01 PM, Heath Ivie  wrote:

> Hi,
>
> I am trying to navigate through the protocol and I am seeing some
> inconsistencies with the data.
>
> I am trying to parse out the MetadataResponse and I am seeing bytes in
> between where they shouldn't be.
>
> I know they are extra, because if I up the offset the data after is
> correct.
>
> Here is the part of the response that I am trying to parse:
> MetadataResponse => [Broker][TopicMetadata]
>   Broker => NodeId Host Port  (any number of brokers may be returned)
> NodeId => int32
> Host => string
> Port => int32
>   TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
> TopicErrorCode => int16
>   PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
> PartitionErrorCode => int16
> PartitionId => int32
> Leader => int32
> Replicas => [int32]
> Isr => [int32]
>
>
> I am seeing extra bytes after the topic error.
>
> I am not sure if this padding is expected or not?
>
> It may not be very readable, but here is my code (be nice :)).
>
> int offset =0;
> int correlationId =
> BitConverter.ToInt32(ReverseBytes(bytes.Skip(offset).Take(4).ToArray()), 0);
> offset += 4;
> int brokerCount =
> BitConverter.ToInt32(ReverseBytes(bytes.Skip(offset).Take(4).ToArray()), 0);
> offset += 4;
> int nodeId =
> BitConverter.ToInt32(ReverseBytes(bytes.Skip(offset).Take(4).ToArray()), 0);
> offset += 4;
> int hostNameLength =
> BitConverter.ToInt16(ReverseBytes(bytes.Skip(offset).Take(2).ToArray()), 0);
> offset += 2;
>
> string hostName = Encoding.ASCII.GetString(bytes,
> offset, hostNameLength);
> offset += hostNameLength;
>
> int port =
> BitConverter.ToInt32(ReverseBytes(bytes.Skip(offset).Take(4).ToArray()), 0);
> offset += 4;
>
> int topicErrorCode =
> BitConverter.ToInt32(ReverseBytes(bytes.Skip(offset).Take(4).ToArray()), 0);
> offset += 4;
>
> int topicNameLength =
> BitConverter.ToInt16(ReverseBytes(bytes.Skip(offset).Take(2).ToArray()), 0);
> offset += 2;
>
> string topicName = Encoding.ASCII.GetString(bytes,
> offset, topicNameLength);
> offset += topicNameLength;
>
> int partitionErrorCode =
> BitConverter.ToInt16(ReverseBytes(bytes.Skip(offset).Take(2).ToArray()), 0);
> offset += 2;
>
> int partitionId =
> BitConverter.ToInt32(ReverseBytes(bytes.Skip(offset).Take(4).ToArray()), 0);
> offset += 4;
>
> int leaderId =
> BitConverter.ToInt32(ReverseBytes(bytes.Skip(offset).Take(4).ToArray()), 0);
> offset += 4;
>
>
> Any help would be cool, thanks
>
> Heath Ivie
> Solutions Architect
>
>
> Warning: This e-mail may contain information proprietary to AutoAnything
> Inc. and is intended only for the use of the intended recipient(s). If the
> reader of this message is not the intended recipient(s), you have received
> this message in error and any review, dissemination, distribution or
> copying of this message is strictly prohibited. If you have received this
> message in error, please notify the sender immediately and delete all
> copies.
>


Re: Protocol Question

2016-02-03 Thread Dana Powers
Some comments based on your code snippet:

(1) you aren't looping on brokerCount -- you should be decoding broker
metadata for each count
(2) you are missing the topic metadata array count (and loop) -- 4 bytes
(3) topic errorcode is an Int16, so you should be reading 2 bytes, not 4
(4) you are missing the partition array count (and loop) -- 4 bytes
(5) you are missing replicas and isr array parsing.

hope this helps,

-Dana

On Wed, Feb 3, 2016 at 6:01 PM, Heath Ivie  wrote:

> Hi,
>
> I am trying to navigate through the protocol and I am seeing some
> inconsistencies with the data.
>
> I am trying to parse out the MetadataResponse and I am seeing bytes in
> between where they shouldn't be.
>
> I know they are extra, because if I up the offset the data after is
> correct.
>
> Here is the part of the response that I am trying to parse:
> MetadataResponse => [Broker][TopicMetadata]
>   Broker => NodeId Host Port  (any number of brokers may be returned)
> NodeId => int32
> Host => string
> Port => int32
>   TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
> TopicErrorCode => int16
>   PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
> PartitionErrorCode => int16
> PartitionId => int32
> Leader => int32
> Replicas => [int32]
> Isr => [int32]
>
>
> I am seeing extra bytes after the topic error.
>
> I am not sure if this padding is expected or not?
>
> It may not be very readable, but here is my code (be nice :)).
>
> int offset =0;
> int correlationId =
> BitConverter.ToInt32(ReverseBytes(bytes.Skip(offset).Take(4).ToArray()), 0);
> offset += 4;
> int brokerCount =
> BitConverter.ToInt32(ReverseBytes(bytes.Skip(offset).Take(4).ToArray()), 0);
> offset += 4;
> int nodeId =
> BitConverter.ToInt32(ReverseBytes(bytes.Skip(offset).Take(4).ToArray()), 0);
> offset += 4;
> int hostNameLength =
> BitConverter.ToInt16(ReverseBytes(bytes.Skip(offset).Take(2).ToArray()), 0);
> offset += 2;
>
> string hostName = Encoding.ASCII.GetString(bytes,
> offset, hostNameLength);
> offset += hostNameLength;
>
> int port =
> BitConverter.ToInt32(ReverseBytes(bytes.Skip(offset).Take(4).ToArray()), 0);
> offset += 4;
>
> int topicErrorCode =
> BitConverter.ToInt32(ReverseBytes(bytes.Skip(offset).Take(4).ToArray()), 0);
> offset += 4;
>
> int topicNameLength =
> BitConverter.ToInt16(ReverseBytes(bytes.Skip(offset).Take(2).ToArray()), 0);
> offset += 2;
>
> string topicName = Encoding.ASCII.GetString(bytes,
> offset, topicNameLength);
> offset += topicNameLength;
>
> int partitionErrorCode =
> BitConverter.ToInt16(ReverseBytes(bytes.Skip(offset).Take(2).ToArray()), 0);
> offset += 2;
>
> int partitionId =
> BitConverter.ToInt32(ReverseBytes(bytes.Skip(offset).Take(4).ToArray()), 0);
> offset += 4;
>
> int leaderId =
> BitConverter.ToInt32(ReverseBytes(bytes.Skip(offset).Take(4).ToArray()), 0);
> offset += 4;
>
>
> Any help would be cool, thanks
>
> Heath Ivie
> Solutions Architect
>
>
> Warning: This e-mail may contain information proprietary to AutoAnything
> Inc. and is intended only for the use of the intended recipient(s). If the
> reader of this message is not the intended recipient(s), you have received
> this message in error and any review, dissemination, distribution or
> copying of this message is strictly prohibited. If you have received this
> message in error, please notify the sender immediately and delete all
> copies.
>


Re: Kafka Committed Offset Behavior off by 1

2016-02-01 Thread Dana Powers
The committed offset is actually the next message to consume, not the last
message consumed. So that sounds like expected behavior to me. The consumer
code handles this internally, but if you write code to commit offsets
manually, it can be a gotcha.

-Dana

On Mon, Feb 1, 2016 at 1:35 PM, Adam Kunicki  wrote:

> Hi,
>
> I've been noticing that a restarted consumer in 0.9 will start consuming
> from the last committed offset (inclusive). This means that any restarted
> consumer will get the last read (and committed) message causing a duplicate
> each time the consumer is restarted from the same position if there have
> been no new messages.
>
> Per:
>
> http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client
> <
> https://mailtrack.io/trace/link/9853c5856f2b5862212148c1a969575c970a3dcc?url=http%3A%2F%2Fwww.confluent.io%2Fblog%2Ftutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client=63a1a40b88347844
> >
> this seems like that is the intended behavior.
>
> Can anyone confirm this? If this is the case how are we expected to handle
> these duplicated messages?
>
> -Adam
>


Re: Documentation

2016-01-31 Thread Dana Powers
You can find protocol documentation here (including a list of api key #s):

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol


-Dana

On Sun, Jan 31, 2016 at 5:46 PM, Heath Ivie  wrote:

> To piggy back , where can I find the api key values?
>
> Sent from Outlook Mobile
>
>
>
>
> On Sun, Jan 31, 2016 at 9:25 AM -0800, "Heath Ivie"  > wrote:
>
> Hi Folks,
>
> I am working through the protocols to build a c# rest api.
>
> I am seeing inconsistencies in the way the document says it works with how
> it actually works, specifically the fetch messages.
>
> Could someone point me to the current documentation?
>
> Thanks Heath
>
> Sent from Outlook Mobile
>
>
> Warning: This e-mail may contain information proprietary to AutoAnything
> Inc. and is intended only for the use of the intended recipient(s). If the
> reader of this message is not the intended recipient(s), you have received
> this message in error and any review, dissemination, distribution or
> copying of this message is strictly prohibited. If you have received this
> message in error, please notify the sender immediately and delete all
> copies.
>


Re: unable to set consumer group

2016-01-20 Thread Dana Powers
version 0.9.5 of kafka-python does not support coordinated consumer groups.
You can get this feature in the master branch on github (
https://github.com/dpkp/kafka-python) using kafka 0.9.0.0 brokers. I expect
to release the updates to pypi soon, but for now you'll have to install
from source.

Other python alternatives: assign partitions statically to each instance
via chef/ansible/etc; or try pykafka's BalancedConsumer implementation,
which uses Zookeeper to coordinate assignment.

-Dana

On Wed, Jan 20, 2016 at 2:35 AM, Ilja Golshtein  wrote:

> Hello.
>
> I am trying to create consumer using kafka_python-0.9.5.
>
> I expect that several instances of the script
>
> ==
> consumer = KafkaConsumer('some-topic',
>  auto_offset_reset='largest',
>  metadata_broker_list=['localhost:9092'],
>  group_id='vasya_group',
>  auto_commit_enable=True)
> consumer.set_topic_partitions('some-topic')
> while True:
> msg=consumer.next()
> print(msg)
> ==
> would receive different messages, while in reality every instance receives
> all messages.
>
> What can be done to achieve
> "Consumers label themselves with a consumer group name, and each message
> published to a topic is delivered to one consumer instance within each
> subscribing consumer group" as advertised in Kafka Documentation?
>
> I am 100% that messages are distributed among several partitions (namely
> 10).
>
> Thanks.
>
> --
> Best regards
> Ilja Golshtein
>


Re: Can't save Kafka offset in Zookeeper

2016-01-19 Thread Dana Powers
0.9 brokers should be backwards compatible, yes. But as with everything --
you should verify in your own environment.

-Dana

On Tue, Jan 19, 2016 at 9:55 AM, Krzysztof Zarzycki <k.zarzy...@gmail.com>
wrote:

> Thank you Dana!
> I see...
> The pity is that Hortonworks claims in their release notes of HDP 2.3.2,
> that:
>  5.9. Kafka
>
> HDP 2.3.2 provides Kafka 0.8.2, with no additional Apache patches.
> (
>
> https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.2/bk_HDP_RelNotes/content/patch_kafka.html
>  )
>
> So I assumed that Kafka would come in stable release...
>
> So you say, that upgrading to HDP 2.3.4 would help? I see in release notes,
> that it is going to upgrade Kafka to 0.9.0.
> I'm affraid of this upgrade as I don't know whether Spark Streaming
> (spark-streaming-kafka) will support Kafka in 0.9.
>
> What do you think? Is Kafka 0.9 completely backward compatible? I.e.
> clients(both producers & consumers) using libraries for 0.8.2 (both
> "kafka-clients" as well as straight "kafka")  connecting to it will work
> after upgrade?
>
> Thanks for your answer,
> Krzysztof
>
>
>
>
> wt., 19.01.2016 o 18:39 użytkownik Dana Powers <dana.pow...@gmail.com>
> napisał:
>
> > Sadly HDP 2.3.2 shipped with a broken OffsetCommit api (the 0.8.2-beta
> > version). You should use the apache releases, or upgrade to HDP 2.3.4.0
> or
> > later.
> >
> > -Dana
> >
> > On Tue, Jan 19, 2016 at 12:03 AM, Krzysztof Zarzycki <
> k.zarzy...@gmail.com
> > >
> > wrote:
> >
> > > Hi Kafka users,
> > > I have an issue with saving Kafka offsets to Zookeeper through
> > > OffsetCommitRequest. It's the same issue I found unanswered on SO, so I
> > > kindly borrow the description:
> > >
> > >
> >
> http://stackoverflow.com/questions/33897683/cant-save-kafka-offset-in-zookeeper
> > >
> > > "I've installed Zookeeper and Kafka from Ambari, on CentoS 7.
> > >
> > > Ambari version: 2.1.2.1
> > > Zookeeper version: 3.4.6.2.3
> > > Kafka version: 0.8.2.2.3
> > > Java Kafka client:kafka_2.10, 0.8.2.2
> > >
> > > I'm trying to save the Kafka offset, using the following code:
> > >
> > > SimpleConsumer simpleConsumer = new SimpleConsumer(host, port,
> > > soTimeout, bufferSize, clientId);TopicAndPartition topicAndPartition =
> > > new TopicAndPartition(topicName, partitionId);Map<TopicAndPartition,
> > > OffsetAndMetadata> requestInfo = new HashMap<>();
> > > requestInfo.put(topicAndPartition, new OffsetAndMetadata(readOffset,
> > > "", ErrorMapping.NoError()));OffsetCommitRequest offsetCommitRequest =
> > > new OffsetCommitRequest(groupName, requestInfo, correlationId,
> > > clientName, (short)0);
> > > simpleConsumer.commitOffsets(offsetCommitRequest);
> > > simpleConsumer.close();
> > >
> > > But when I run this, I get the following error in my client:
> > >
> > > java.io.EOFException: Received -1 when reading from channel, socket
> > > has likely been closed.
> > >
> > > Also in the Kafka logs I have the following error:
> > >
> > > [2015-11-24 15:38:53,566] ERROR Closing socket for /192.168.186.1
> > > because of error (kafka.network.Processor)
> > > java.nio.BufferUnderflowException
> > > at java.nio.Buffer.nextGetIndex(Buffer.java:498)
> > > at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:406)
> > > at
> > >
> >
> kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:73)
> > > at
> > >
> >
> kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:68)
> > > at
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > > at
> > >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> > > at scala.collection.immutable.Range.foreach(Range.scala:141)
> > > at
> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> > > at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> > > at
> > >
> >
> kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:68)
> > > at
> > >
> >
> kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:65)
> > > at
> > >
> >
> scala.co

Re: Can't save Kafka offset in Zookeeper

2016-01-19 Thread Dana Powers
Sadly HDP 2.3.2 shipped with a broken OffsetCommit api (the 0.8.2-beta
version). You should use the apache releases, or upgrade to HDP 2.3.4.0 or
later.

-Dana

On Tue, Jan 19, 2016 at 12:03 AM, Krzysztof Zarzycki 
wrote:

> Hi Kafka users,
> I have an issue with saving Kafka offsets to Zookeeper through
> OffsetCommitRequest. It's the same issue I found unanswered on SO, so I
> kindly borrow the description:
>
> http://stackoverflow.com/questions/33897683/cant-save-kafka-offset-in-zookeeper
>
> "I've installed Zookeeper and Kafka from Ambari, on CentoS 7.
>
> Ambari version: 2.1.2.1
> Zookeeper version: 3.4.6.2.3
> Kafka version: 0.8.2.2.3
> Java Kafka client:kafka_2.10, 0.8.2.2
>
> I'm trying to save the Kafka offset, using the following code:
>
> SimpleConsumer simpleConsumer = new SimpleConsumer(host, port,
> soTimeout, bufferSize, clientId);TopicAndPartition topicAndPartition =
> new TopicAndPartition(topicName, partitionId);Map OffsetAndMetadata> requestInfo = new HashMap<>();
> requestInfo.put(topicAndPartition, new OffsetAndMetadata(readOffset,
> "", ErrorMapping.NoError()));OffsetCommitRequest offsetCommitRequest =
> new OffsetCommitRequest(groupName, requestInfo, correlationId,
> clientName, (short)0);
> simpleConsumer.commitOffsets(offsetCommitRequest);
> simpleConsumer.close();
>
> But when I run this, I get the following error in my client:
>
> java.io.EOFException: Received -1 when reading from channel, socket
> has likely been closed.
>
> Also in the Kafka logs I have the following error:
>
> [2015-11-24 15:38:53,566] ERROR Closing socket for /192.168.186.1
> because of error (kafka.network.Processor)
> java.nio.BufferUnderflowException
> at java.nio.Buffer.nextGetIndex(Buffer.java:498)
> at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:406)
> at
> kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:73)
> at
> kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:68)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.Range.foreach(Range.scala:141)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:68)
> at
> kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:65)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at scala.collection.immutable.Range.foreach(Range.scala:141)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at
> kafka.api.OffsetCommitRequest$.readFrom(OffsetCommitRequest.scala:65)
> at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
> at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
> at kafka.network.RequestChannel$Request.(RequestChannel.scala:55)
> at kafka.network.Processor.read(SocketServer.scala:547)
> at kafka.network.Processor.run(SocketServer.scala:405)
> at java.lang.Thread.run(Thread.java:745)
>
> Now I've also downloaded and installed the official Kafka 0.8.2.2 version
> from
>
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz
> and
> it works ok; you can save the Kafka offset without any error.
>
> Can anybody give me a some directions, why is the Ambari Kafka failing to
> save the offset?
>
> P.S: I know that if versionId is 0 (in OffsetCommitRequest), than the
> offset is actually saved in Zookeeper.
> "
> My only difference (IMHO, irrelevant)  is that I'm using HDP in version
> 2.3.2, but other than that versions are the same.
>
> Do you guys have any hints on what could be wrong? Is that something wrong
> with my use of offset committing? Or conflict of versions?
> Any hints would be highly appreciated :)
> Cheers,
> Krzysztof
>


Re: Possible Error Code for Kafka API Calls

2016-01-18 Thread Dana Powers
Not exactly - there is some documentation in the source code, but I agree
that a wiki on this would be extremely useful.

Can anyone create a wiki page? If so, I'm happy to get something started.
It is really the missing piece for folks writing custom clients / anything
at the api layer.

-Dana
On Jan 18, 2016 04:07, "Muqtafi Akhmad"  wrote:

> dear all,
>
> I found a guide in Kafka protocol (Kafka Protocol Guide
> <
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest
> >),
> the guide describes API calls complete with their request and response
> format, and list of error codes with their description . My question is, is
> there documentation of possible error code mapping for each API request?
>
> Thank you,
>
> --
> Muqtafi Akhmad
> Software Engineer
> Traveloka
>


Re: Possible Error Code for Kafka API Calls

2016-01-18 Thread Dana Powers
Awesome! Fwiw, my understanding of the possible produce errors is:

LeaderNotAvailable
NotLeaderForPartition
UnknownTopicOrPartition
InvalidMessage
MessageSizeTooLarge
RecordListTooLarge
InvalidTopic
NotEnoughReplicas
NotEnoughReplicasAfterAppend
InvalidRequiredAcks
TopicAuthorizationFailed


The other bits of relevant info for each error code are:

(1) whether it is retriable (after backoff)
(2) whether it should trigger a metadata refresh

[In the java code these are identified via inheritance]

-Dana


On Mon, Jan 18, 2016 at 9:34 AM, Gwen Shapira <g...@confluent.io> wrote:

> I added what I found in the code comments to the wiki. Note that there are
> some gaps. For example if anyone can fill in the producer error codes, it
> will be awesome :)
>
> On Mon, Jan 18, 2016 at 9:17 AM, Gwen Shapira <g...@confluent.io> wrote:
>
> > I'm wondering if the protocol docs can be auto-generated from our code to
> > a large extent. Or if we can enhance our protocol definition classes a
> bit
> > to make them self-documenting (the way we did for configuration).
> >
> > Regarding Dana's suggestion: I think you need special wiki-edit
> > privileges. If you don't see the "edit" bottom, I'd ask in "dev" list for
> > that privilege (sorry, looks like I'm not a wiki admin, so I can't add
> > you).
> >
> > On Mon, Jan 18, 2016 at 7:46 AM, Ismael Juma <ism...@juma.me.uk> wrote:
> >
> >> In addition to this, it would be great to move the protocol docs to the
> >> docs folder of the git repo:
> >>
> >> https://github.com/apache/kafka/tree/trunk/docs
> >>
> >> This way, we can ensure that the protocol docs are updated at the same
> >> time
> >> as the protocol code.
> >>
> >> Ismael
> >>
> >> On Mon, Jan 18, 2016 at 3:36 PM, Dana Powers <dana.pow...@gmail.com>
> >> wrote:
> >>
> >> > Not exactly - there is some documentation in the source code, but I
> >> agree
> >> > that a wiki on this would be extremely useful.
> >> >
> >> > Can anyone create a wiki page? If so, I'm happy to get something
> >> started.
> >> > It is really the missing piece for folks writing custom clients /
> >> anything
> >> > at the api layer.
> >> >
> >> > -Dana
> >> > On Jan 18, 2016 04:07, "Muqtafi Akhmad" <muqt...@traveloka.com>
> wrote:
> >> >
> >> > > dear all,
> >> > >
> >> > > I found a guide in Kafka protocol (Kafka Protocol Guide
> >> > > <
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetRequest
> >> > > >),
> >> > > the guide describes API calls complete with their request and
> response
> >> > > format, and list of error codes with their description . My question
> >> is,
> >> > is
> >> > > there documentation of possible error code mapping for each API
> >> request?
> >> > >
> >> > > Thank you,
> >> > >
> >> > > --
> >> > > Muqtafi Akhmad
> >> > > Software Engineer
> >> > > Traveloka
> >> > >
> >> >
> >>
> >
> >
>


RE: trouble sending produce requests to 0.9.0.0 broker cluster

2016-01-11 Thread Dana Powers
Looks like you aren't setting the request client-id, and server is crashing
on it. I'm not sure whether server api is expected to work w/o client-id,
but you can probably fix by sending one. Fwiw, kafka-python sends
'kafka-python' unless user specifies something else.

-Dana
On Jan 11, 2016 8:41 AM,  wrote:

> I forgot to include some information about this problem.  When I sent
> the produce request, the following appeared in server.log:
>
> [2016-01-10 23:03:44,957] ERROR [KafkaApi-3] error when handling
> request Name: ProducerRequest; Version: 0; CorrelationId: 1; ClientId:
> null; RequiredAcks: 1; AckTimeoutMs: 1 ms; TopicAndPartition:
> [topic_1,3] -> 37 (kafka.server.KafkaApis)
> java.lang.NullPointerException
>at
> org.apache.kafka.common.metrics.JmxReporter.getMBeanName(JmxReporter.java:127)
>at
> org.apache.kafka.common.metrics.JmxReporter.addAttribute(JmxReporter.java:106)
>at
> org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:76)
>at
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:288)
>at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177)
>at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162)
>at
> kafka.server.ClientQuotaManager.getOrCreateQuotaSensors(ClientQuotaManager.scala:209)
>at
> kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:111)
>at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$sendResponseCallback$2(KafkaApis.scala:353)
>at
> kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
>at
> kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
>at
> kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:348)
>at
> kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366)
>at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
>at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>at java.lang.Thread.run(Thread.java:745)
>
> Later when I tried sending another produce request I got a somewhat
> similar error:
>
> [2016-01-11 08:15:05,153] ERROR [KafkaApi-3] error when handling
> request Name: ProducerRequest; Version: 0; CorrelationId: 1; ClientId:
> null; RequiredAcks: 1; AckTimeoutMs: 1 ms; TopicAndPartition:
> [topic_1,3] -> 37 (kafka.server.KafkaApis)
> java.lang.IllegalArgumentException: A metric named 'MetricName
> [name=throttle-time, group=Produce, description=Tracking average
> throttle-time per client, tags={client-id=null}]' already exists, can't
> register another one.
>at
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:285)
>at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177)
>at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162)
>at
> kafka.server.ClientQuotaManager.getOrCreateQuotaSensors(ClientQuotaManager.scala:209)
>at
> kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:111)
>at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$sendResponseCallback$2(KafkaApis.scala:353)
>at
> kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
>at
> kafka.server.KafkaApis$$anonfun$handleProducerRequest$1.apply(KafkaApis.scala:371)
>at
> kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:348)
>at
> kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366)
>at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
>at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>at java.lang.Thread.run(Thread.java:745)
>
> In both cases I was running kafka-console-consumer.sh to consume
> messages from the topic I was sending to, and the consumer did see the
> message I sent.
>
> Dave
>
>
> -Original Message-
> From: d...@dspeterson.com
> Sent: Monday, January 11, 2016 12:32am
> To: users@kafka.apache.org
> Subject: trouble sending produce requests to 0.9.0.0 broker cluster
>
> Hi,
>
> I'm having trouble sending produce requests to a Kafka 0.9.0.0 broker
> cluster consisting of 4 brokers with IDs 0, 1, 2, and 3.  All 4
> brokers are running locally on my CentOS 7 development box, listening
> on ports 9092, 9093, 9094, and 9095 respectively.  I am running my
> own producer code (Bruce, see https://github.com/ifwe/bruce), which
> works without problems with Kafka 0.8, but has problems with 0.9.0.0.
> When I send a produce request consisting of a single message, I often
> get a response consisting of error ACK -1 (Unknown, unexpected server
> error) although I have also seen other errors such as 6
> (NotLeaderForPartition).  During one observed instance of this
> behavior I saw the 

Re: best python library to use?

2016-01-09 Thread Dana Powers
pykafka uses a custom zookeeper implementation for consumer groups.
kafka-python uses the 0.9.0.0 server apis to accomplish the same.

-Dana
On Jan 8, 2016 18:32, "chengxin Cai" <ia...@outlook.com> wrote:

> Hi
>
> I heard that Pykakfa can create a balanced consumer.
>
> And there should be no other big difference.
>
>
> Best Regards
>
> > 在 2016年1月9日,08:58,Dana Powers <dana.pow...@rd.io> 写道:
> >
> > Hi Doug,
> >
> > The differences are fairly subtle. kafka-python is a community-backed
> > project that aims to be consistent w/ the official java client; pykafka
> is
> > sponsored by parse.ly and aims to provide a pythonic interface.
> whichever
> > you go with, I would love to hear your specific feedback on kafka-python.
> >
> > -Dana (kafka-python maintainer)
> >
> >> On Fri, Jan 8, 2016 at 4:32 PM, Doug Tomm <dct...@gmail.com> wrote:
> >>
> >> we're using kafka-python, weighing pykafka, and wondering if there's
> >> another that is bettor to use.  does confluent endorse or recommend a
> >> particular python package (psorry for the alliteration)?
> >>
> >> doug
> >>
> >>
>


Re: best python library to use?

2016-01-08 Thread Dana Powers
Hi Doug,

The differences are fairly subtle. kafka-python is a community-backed
project that aims to be consistent w/ the official java client; pykafka is
sponsored by parse.ly and aims to provide a pythonic interface. whichever
you go with, I would love to hear your specific feedback on kafka-python.

-Dana (kafka-python maintainer)

On Fri, Jan 8, 2016 at 4:32 PM, Doug Tomm  wrote:

> we're using kafka-python, weighing pykafka, and wondering if there's
> another that is bettor to use.  does confluent endorse or recommend a
> particular python package (psorry for the alliteration)?
>
> doug
>
>


Re: Leader not available (initially)

2016-01-08 Thread Dana Powers
That is expected when a topic is first created. Once the topic and all
partitions are initialized, the leader(s) will be available and you can
produce messages.

Note that if you disable topic auto creation, you should get an
UnknownTopicOrPartitionError, and you would need to create the topic
manually.

-Dana
On Jan 8, 2016 03:20, "Cosmin Marginean"  wrote:

> Hi guys
>
> I’ve been seeing this in our logs a few times, when initially
> bootstrapping the system.
>
> WARN  2016-01-08 10:59:24,232 [pool-3-thread-11] [none]
> o.a.kafka.clients.NetworkClient: Error while fetching metadata with
> correlation id 3 : {mytopic=LEADER_NOT_AVAILABLE}
>
> It doesn’t seem to break anything, and it seems to be related to the fact
> that we do processing on the topic right after creating it. Below is our
> workflow (roughly)
> * Create topic (using AdminUtils)
> * Register some tens of consumers
> * Send some messages with a producer
>
> I’d like to know if this is something to worry about (the warning that is)
> or if it’s something we should investigate further.
>
> Thank you
> Cosmin
>
>


Re: [ANN] kinsky: clojure 0.9.0 client

2016-01-07 Thread Dana Powers
Very nice!
On Jan 7, 2016 04:41, "Pierre-Yves Ritschard"  wrote:

> Hi list,
>
> While the 0.9.0.0 client lib is great to work with, I extracted some of
> the facade code I use internally into a library which smooths some
> aspects of interacting with Kafka from Clojure.
>
> The library provides a simple way to build rebalance listeners,
> serializers and deserializers. It also has data representation of all
> Kafka classes. A core.async facade is also available, since production
> and consumption of messages fits well in the channel abstraction that
> core.async provides.
>
> https://github.com/pyr/kinsky, full API documentation is at
> http://pyr.github.io/kinsky.
>
> Cheers,
>   - pyr
>


Re: EOF Warning

2015-12-30 Thread Dana Powers
Do you have access to the server logs? Any error is likely recorded there
with a stack trace. You also might check what server version you are
connecting to.

-Dana
On Dec 30, 2015 3:49 AM, "Birendra Kumar Singh"  wrote:

> I keep getting such warnings intermittenly in my application . The
> application connects to a kafka server and pushes messages. None of my
> messages have failed howeever.
>
> The application is a spring application and it uses kafka-clients to
> establish connection and send messages to kafka
> kafka-clients used is a below
>
> 
>
> org.apache.kafka
>
> kafka-clients
>
> 0.8.2.0
>
> 
>
> ! java.io.EOFException: null! at
>
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
> ~[publisher2-0.0.1-SNAPSHOT.jar:na]! at
> org.apache.kafka.common.network.Selector.poll(Selector.java:248)
> ~[publisher2-0.0.1-SNAPSHOT.jar:na]! at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
> [publisher2-0.0.1-SNAPSHOT.jar:na]! at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
> [publisher2-0.0.1-SNAPSHOT.jar:na]! at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
> [publisher2-0.0.1-SNAPSHOT.jar:na]! at
> java.lang.Thread.run(Thread.java:745) [na:1.7.0_91]
>


Re: consuming 0 records

2015-12-30 Thread Dana Powers
A few thoughts from a non-expert:

connections are also processed asynchronously in the poll loop. If you are
not enabling any timeout, you may be seeing a few initial iterations spent
on setting up the channel connections. Also you probably need a few loop
iterations to get through an initial metadata request / response.

also, if I recall, records should be returned in batches per
topic-partition; not one-by-one. So if/when records are ready, you would
get as many as were received via completed FetchRequests -- depends on
message size and fetch configs max.partition.fetch.bytes, fetch.min.bytes,
and fetch.max.wait.ms. So you shouldn't expect to poll 500x.

I'd suggest using a small, but non-zero timeout when polling. 100ms is used
in the docs quite a bit.

-Dana

On Wed, Dec 30, 2015 at 10:03 AM, Franco Giacosa  wrote:

> Hi,
>
> I am running kafka 0.9.0 locally.
>
> I am having a particular situation in the following scenario.
>
> (1) 1 Producer inserts 500 records (300bytes each aprox) to 1 topic 0
> partition (or 1 as you prefer)
> (2) After the producer finished inserting the 500 records, 1 Consumer reads
> in a loop from this topic with consumer.poll(0)
> and max.partition.fetch.bytes=500, sometimes that call brings records and
> something the loop has to go over a few times until it brings something.
> Can someone explain me why it doesn't fetch a record each time that it
> polls? can a poll operation affect another poll operation?
> why if I've inserted 500 records I have to poll more than 500 times?
>
> I have tried using poll(0), because in the documentation it says, "if 0,
> returns with any records that are available now".
>
> Thanks
>


Re: EOF Warning

2015-12-30 Thread Dana Powers
I was thinking kafka logs, but KAFKA-2078 suggests it may be a deeper
issue. Sorry, I don't have any better suggestions / ideas right now than
you found in that JIRA ticket.

-Dana

On Wed, Dec 30, 2015 at 10:10 AM, Birendra Kumar Singh <singh...@gmail.com>
wrote:

> Looks like there is an open issue reated to the same.
> https://issues.apache.org/jira/browse/KAFKA-2078
>
> @Dana
> Which server logs do you want me to check. Zookeeper or the kafka??I didnt
> find any stack trace over there though.
> Its only in my appliation logs that I see them. And it comes as a WARN
> rather then ERROR
>
> On Wed, Dec 30, 2015 at 9:51 PM, Dana Powers <dana.pow...@gmail.com>
> wrote:
>
> > Do you have access to the server logs? Any error is likely recorded there
> > with a stack trace. You also might check what server version you are
> > connecting to.
> >
> > -Dana
> > On Dec 30, 2015 3:49 AM, "Birendra Kumar Singh" <singh...@gmail.com>
> > wrote:
> >
> > > I keep getting such warnings intermittenly in my application . The
> > > application connects to a kafka server and pushes messages. None of my
> > > messages have failed howeever.
> > >
> > > The application is a spring application and it uses kafka-clients to
> > > establish connection and send messages to kafka
> > > kafka-clients used is a below
> > >
> > > 
> > >
> > > org.apache.kafka
> > >
> > > kafka-clients
> > >
> > > 0.8.2.0
> > >
> > > 
> > >
> > > ! java.io.EOFException: null! at
> > >
> > >
> >
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
> > > ~[publisher2-0.0.1-SNAPSHOT.jar:na]! at
> > > org.apache.kafka.common.network.Selector.poll(Selector.java:248)
> > > ~[publisher2-0.0.1-SNAPSHOT.jar:na]! at
> > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
> > > [publisher2-0.0.1-SNAPSHOT.jar:na]! at
> > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
> > > [publisher2-0.0.1-SNAPSHOT.jar:na]! at
> > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
> > > [publisher2-0.0.1-SNAPSHOT.jar:na]! at
> > > java.lang.Thread.run(Thread.java:745) [na:1.7.0_91]
> > >
> >
>


Re: The JIRA Awakens [KAFKA-1841]

2015-12-24 Thread Dana Powers
Awesome ! Looks like a great distribution. I'd love to add HDP releases to
the kafka-python integration test suite.

-Dana
On Dec 24, 2015 9:33 AM, "Harsha" <ka...@harsha.io> wrote:

> HI Dana,
> I worked on that release. Yes HDP-2.3.0 has lot of
> additional patches on top of 0.8.2.1 mainly the kerberos
> patches.
> We did missed KAFKA-1841 which was fixed in later maint release. We
> already notified everyone to upgrade HDP-2.3.4 this is the apache kafka
> 0.9.0 + additional patches . We are making sure on our side not to miss
> any compatibility patches like these with 3rd party developers and have
> tests to ensure that.
>
> Thanks,
> Harsha
> On Wed, Dec 23, 2015, at 04:11 PM, Dana Powers wrote:
> > Hi all,
> >
> > I've been helping debug an issue filed against kafka-python related to
> > compatibility w/ Hortonworks 2.3.0.0 kafka release. As I understand it,
> > HDP
> > is currently based on snapshots of apache/kafka trunk, merged with some
> > custom patches from HDP itself.
> >
> > In this case, HDP's 2.3.0.0 kafka release missed a compatibility patch
> > that
> > I believe is critical for third-party library support. Unfortunately the
> > patch -- KAFKA-1841 -- was initially only applied to the 0.8.2 branch (it
> > was merged to trunk several months later in KAFKA-2068). Because it
> > wasn't
> > on trunk, it didn't get included in the HDP kafka releases.
> >
> > If you recall, KAFKA-1841 was needed to maintain backwards and forwards
> > compatibility wrt the change from zookeeper to kafka-backed offset
> > storage.
> > Not having this patch is fine if you only ever use the clients /
> > libraries
> > distributed in the that release -- and I imagine that is probably most
> > folks that are using it. But if you remember the thread on this issue
> > back
> > in the 0.8.2-beta review, the API incompatibility made third-party
> > clients
> > hard to develop and maintain if the goal is to support multiple broker
> > versions w/ the same client code [this is the goal of kafka-python].
> > Anyways, I'm really glad that the fix made it into the apache release,
> > but
> > now I'm sad that it didn't make it into HDP's release.
> >
> > Anyways, I think there's a couple takeaways here:
> >
> > (1) I'd recommend anyone using HDP who intends to use third-party kafka
> > consumers should upgrade to 2.3.4.0 or later. That version appears to
> > include the compatibility patch (KAFKA-2068). Of course if anyone is on
> > list from HDP, they may be able to provide better help on this.
> >
> > (2) I think more care should probably be taken to help vendors or anyone
> > tracking changes on trunk wrt released versions. Is there a list of all
> > KAFKA- patches that are released but not merged into trunk ?
> > KAFKA-1841
> > is obviously near and dear to my heart, but I wonder if there are other
> > patches like it?
> >
> > Happy holidays to all, and may the force be with you
> >
> > -Dana
>


The JIRA Awakens [KAFKA-1841]

2015-12-23 Thread Dana Powers
Hi all,

I've been helping debug an issue filed against kafka-python related to
compatibility w/ Hortonworks 2.3.0.0 kafka release. As I understand it, HDP
is currently based on snapshots of apache/kafka trunk, merged with some
custom patches from HDP itself.

In this case, HDP's 2.3.0.0 kafka release missed a compatibility patch that
I believe is critical for third-party library support. Unfortunately the
patch -- KAFKA-1841 -- was initially only applied to the 0.8.2 branch (it
was merged to trunk several months later in KAFKA-2068). Because it wasn't
on trunk, it didn't get included in the HDP kafka releases.

If you recall, KAFKA-1841 was needed to maintain backwards and forwards
compatibility wrt the change from zookeeper to kafka-backed offset storage.
Not having this patch is fine if you only ever use the clients / libraries
distributed in the that release -- and I imagine that is probably most
folks that are using it. But if you remember the thread on this issue back
in the 0.8.2-beta review, the API incompatibility made third-party clients
hard to develop and maintain if the goal is to support multiple broker
versions w/ the same client code [this is the goal of kafka-python].
Anyways, I'm really glad that the fix made it into the apache release, but
now I'm sad that it didn't make it into HDP's release.

Anyways, I think there's a couple takeaways here:

(1) I'd recommend anyone using HDP who intends to use third-party kafka
consumers should upgrade to 2.3.4.0 or later. That version appears to
include the compatibility patch (KAFKA-2068). Of course if anyone is on
list from HDP, they may be able to provide better help on this.

(2) I think more care should probably be taken to help vendors or anyone
tracking changes on trunk wrt released versions. Is there a list of all
KAFKA- patches that are released but not merged into trunk ? KAFKA-1841
is obviously near and dear to my heart, but I wonder if there are other
patches like it?

Happy holidays to all, and may the force be with you

-Dana


Re: how to reset kafka offset in zookeeper

2015-12-18 Thread Dana Powers
If you don't like messing w/ ZK directly, another alternative is to
manually seek to offset 0 on all relevant topic-partitions (via
OffsetCommitRequest or your favorite client api) and change the
auto-offset-reset policy on your consumer to earliest/smallest. Bonus is
that this should also work for consumers that use kafka-backed offset
commit storage.

-Dana

On Fri, Dec 18, 2015 at 9:38 AM, Marko Bonaći 
wrote:

> Hmm, I guess you're right Tod :)
> Just to confirm, you meant that, while you're changing the exported file it
> might happen that one of the segment files becomes eligible for cleanup by
> retention, which would then make the imported offsets out of range?
>
> Marko Bonaći
> Monitoring | Alerting | Anomaly Detection | Centralized Log Management
> Solr & Elasticsearch Support
> Sematext  | Contact
> 
>
> On Fri, Dec 18, 2015 at 6:29 PM, Todd Palino  wrote:
>
> > That works if you want to set to an arbitrary offset, Marko. However in
> the
> > case the OP described, wanting to reset to smallest, it is better to just
> > delete the consumer group and start the consumer with auto.offset.reset
> set
> > to smallest. The reason is that while you can pull the current smallest
> > offsets from the brokers and set them in Zookeeper for the consumer, by
> the
> > time you do that the smallest offset is likely no longer valid. This
> means
> > you’re going to resort to the offset reset logic anyways.
> >
> > -Todd
> >
> >
> > On Fri, Dec 18, 2015 at 7:10 AM, Marko Bonaći  >
> > wrote:
> >
> > > You can also do this:
> > > 1. stop consumers
> > > 2. export offsets from ZK
> > > 3. make changes to the exported file
> > > 4. import offsets to ZK
> > > 5. start consumers
> > >
> > > e.g.
> > > bin/kafka-run-class.sh kafka.tools.ExportZkOffsets --group group-name
> > > --output-file /tmp/zk-offsets --zkconnect localhost:2181
> > > bin/kafka-run-class.sh kafka.tools.ImportZkOffsets --input-file
> > > /tmp/zk-offsets --zkconnect localhost:2181
> > >
> > > Marko Bonaći
> > > Monitoring | Alerting | Anomaly Detection | Centralized Log Management
> > > Solr & Elasticsearch Support
> > > Sematext  | Contact
> > > 
> > >
> > > On Fri, Dec 18, 2015 at 4:06 PM, Jens Rantil 
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > I noticed that a consumer in the new consumer API supports setting
> the
> > > > offset for a partition to beginning. I assume doing so also would
> > update
> > > > the offset in Zookeeper eventually.
> > > >
> > > > Cheers,
> > > > Jens
> > > >
> > > > On Friday, December 18, 2015, Akhilesh Pathodia <
> > > > pathodia.akhil...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I want to reset the kafka offset in zookeeper so that the consumer
> > will
> > > > > start reading messages from first offset. I am using flume as a
> > > consumer
> > > > to
> > > > > kafka. I have set the kafka property kafka.auto.offset.reset to
> > > > "smallest",
> > > > > but it does not reset the offset in zookeeper and that's why flume
> > will
> > > > not
> > > > > read messages from first offset.
> > > > >
> > > > > Is there any way to reset kafka offset in zookeeper?
> > > > >
> > > > > Thanks,
> > > > > Akhilesh
> > > > >
> > > >
> > > >
> > > > --
> > > > Jens Rantil
> > > > Backend engineer
> > > > Tink AB
> > > >
> > > > Email: jens.ran...@tink.se
> > > > Phone: +46 708 84 18 32
> > > > Web: www.tink.se
> > > >
> > > > Facebook  Linkedin
> > > > <
> > > >
> > >
> >
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> > > > >
> > > >  Twitter 
> > > >
> > >
> >
> >
> >
> > --
> > *—-*
> > *Todd Palino*
> > Staff Site Reliability Engineer
> > Data Infrastructure Streaming
> >
> >
> >
> > linkedin.com/in/toddpalino
> >
>


Re: failed with LeaderNotAvailableError -

2015-12-17 Thread Dana Powers
Hi Ben and Marko -- great suggestions re: connection failures and docker.

The specific error here is: LeaderNotAvailableError:
TopicMetadata(topic='topic-test-production', error=5, partitions=[])

That is an error code (5) returned from a MetadataRequest. In this context
it means that the topic did not exist and so the request triggered an
auto-create initialization (i.e., the connection was fine). Topic
initialization tends to take a few seconds to complete, but only needs to
happen once per topic. A retry here is generally fine. This retry should
probably be handled under the covers by the client code. So in this case I
would treat it as a simple kafka-python issue (#488).

-Dana

On Thu, Dec 17, 2015 at 4:58 AM, Ben Davison 
wrote:

> I probably should of mentioned that this was using Amazon ECS.
>
> On Thu, Dec 17, 2015 at 12:18 PM, Marko Bonaći 
> wrote:
>
> > It doesn't have to be FQDN.
> >
> > Here's how I run Kafka in a container:
> > docker run --name st-kafka -p 2181:2181 -p 9092:9092 -e
> > ADVERTISED_HOST=`docker-machine ip dev-st` -e ADVERTISED_PORT=9092 -d
> > spotify/kafka
> >
> > And then you have access to Kafka on the docker host VM from any other
> > machine.
> > BTW I use Spotify's image since it contains both ZK and Kafka, but I
> think
> > the latest version they built is 0.8.2.1, so you might have to build the
> > new image yourself if you need 0.9, but that's trivial to do.
> >
> > Marko Bonaći
> > Monitoring | Alerting | Anomaly Detection | Centralized Log Management
> > Solr & Elasticsearch Support
> > Sematext  | Contact
> > 
> >
> > On Thu, Dec 17, 2015 at 11:33 AM, Ben Davison 
> > wrote:
> >
> > > Hi David,
> > >
> > > Are you running in docker? Are you trying to connect from to a remote
> > box?
> > > We found we could connect locally but couldn't connect from another
> > remote
> > > host.
> > >
> > > (I've just started using kafka also)
> > >
> > > We had the same issue and found out: host.name=<%=@ipaddress%> needed
> to
> > > be
> > > the FQDN of the box.
> > >
> > > Thanks,
> > >
> > > Ben
> > >
> > > On Thu, Dec 17, 2015 at 5:40 AM, David Montgomery <
> > > davidmontgom...@gmail.com
> > > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I am very concerned about using kafka in production given the below
> > > > errors:
> > > >
> > > > Now issues with myt zookeeper.  Other services use ZK.  Only kafka
> > fails.
> > > > I have 2 kafka servers using 8.x.  How do I resolve?  I tried
> > restarting
> > > > services for kafka.  Below is my kafka server.properties file
> > > >
> > > > 'Traceback (most recent call last):
> > > >   File
> > > >
> > > >
> > >
> >
> "/usr/local/lib/python2.7/dist-packages/gevent-1.1b6-py2.7-linux-x86_64.egg/gevent/greenlet.py",
> > > > line 523, in run
> > > > result = self._run(*self.args, **self.kwargs)
> > > >   File "/var/feed-server/ad-server/pixel-server.py", line 145, in
> > > > send_kafka_message
> > > > res = producer.send_messages(topic, message)
> > > >   File "build/bdist.linux-x86_64/egg/kafka/producer/simple.py", line
> > 52,
> > > in
> > > > send_messages
> > > > partition = self._next_partition(topic)
> > > >   File "build/bdist.linux-x86_64/egg/kafka/producer/simple.py", line
> > 36,
> > > in
> > > > _next_partition
> > > > self.client.load_metadata_for_topics(topic)
> > > >   File "build/bdist.linux-x86_64/egg/kafka/client.py", line 383, in
> > > > load_metadata_for_topics
> > > > kafka.common.check_error(topic_metadata)
> > > >   File "build/bdist.linux-x86_64/egg/kafka/common.py", line 233, in
> > > > check_error
> > > > raise error_class(response)
> > > > LeaderNotAvailableError: TopicMetadata(topic='topic-test-production',
> > > > error=5, partitions=[])
> > > >  > send_kafka_message('topic-test-production',
> > > > '{"adfadfadf)> failed with LeaderNotAvailableError
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > # limitations under the License.
> > > > # see kafka.server.KafkaConfig for additional details and defaults
> > > >
> > > > # Server Basics
> > #
> > > >
> > > > # The id of the broker. This must be set to a unique integer for each
> > > > broker.
> > > > broker.id=<%=@broker_id%>
> > > > advertised.host.name=<%=@ipaddress%>
> > > > advertised.port=9092
> > > > # Socket Server Settings
> > > > #
> > > >
> > > > # The port the socket server listens on
> > > > port=9092
> > > >
> > > > # Hostname the broker will bind to and advertise to producers and
> > > > consumers.
> > > > # If not set, the server will bind to all interfaces and advertise
> the
> > > > value returned from
> > > > # from java.net.InetAddress.getCanonicalHostName().
> > > > host.name=<%=@ipaddress%>
> > > >
> > > > # The number of threads handling network 

Re: Fallout from upgrading to kafka 0.9 from 0.8.2.3

2015-12-17 Thread Dana Powers
I don't have much to add on this, but q: what is version 0.8.2.3? I thought
the latest in 0.8 series was 0.8.2.2?

-Dana
On Dec 17, 2015 5:56 PM, "Rajiv Kurian"  wrote:

> Yes we are in the process of upgrading to the new producers. But the
> problem seems deeper than a compatibility issue. We have one environment
> where the old producers work with the new 0.9 broker. Further when we
> reverted our messed up 0.9 environment to 0.8.2.3 the problem with those
> topics didn't go away.
>
> Didn't see any ZK issues on the brokers. There were other topics on the
> very same brokers that didn't seem to be affected.
>
> On Thu, Dec 17, 2015 at 5:46 PM, Jun Rao  wrote:
>
> > Yes, the new java producer is available in 0.8.2.x and we recommend
> people
> > use that.
> >
> > Also, when those producers had the issue, were there any other things
> weird
> > in the broker (e.g., broker's ZK session expires)?
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Dec 17, 2015 at 2:37 PM, Rajiv Kurian 
> wrote:
> >
> > > I can't think of anything special about the topics besides the clients
> > > being very old (Java wrappers over Scala).
> > >
> > > I do think it was using ack=0. But my guess is that the logging was
> done
> > by
> > > the Kafka producer thread. My application itself was not getting
> > exceptions
> > > from Kafka.
> > >
> > > On Thu, Dec 17, 2015 at 2:31 PM, Jun Rao  wrote:
> > >
> > > > Hmm, anything special with those 3 topics? Also, the broker log shows
> > > that
> > > > the producer uses ack=0, which means the producer shouldn't get
> errors
> > > like
> > > > leader not found. Could you clarify on the ack used by the producer?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, Dec 17, 2015 at 12:41 PM, Rajiv Kurian 
> > > wrote:
> > > >
> > > > > The topic which stopped working had clients that were only using
> the
> > > old
> > > > > Java producer that is a wrapper over the Scala producer. Again it
> > > seemed
> > > > to
> > > > > work perfectly in another of our realms where we have the same
> > topics,
> > > > same
> > > > > producers/consumers etc but with less traffic.
> > > > >
> > > > > On Thu, Dec 17, 2015 at 12:23 PM, Jun Rao 
> wrote:
> > > > >
> > > > > > Are you using the new java producer?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Thu, Dec 17, 2015 at 9:58 AM, Rajiv Kurian <
> ra...@signalfx.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Jun,
> > > > > > > Answers inline:
> > > > > > >
> > > > > > > On Thu, Dec 17, 2015 at 9:41 AM, Jun Rao 
> > wrote:
> > > > > > >
> > > > > > > > Rajiv,
> > > > > > > >
> > > > > > > > Thanks for reporting this.
> > > > > > > >
> > > > > > > > 1. How did you verify that 3 of the topics are corrupted? Did
> > you
> > > > use
> > > > > > > > DumpLogSegments tool? Also, is there a simple way to
> reproduce
> > > the
> > > > > > > > corruption?
> > > > > > > >
> > > > > > > No I did not. The only reason I had to believe that was no
> > writers
> > > > > could
> > > > > > > write to the topic. I have actually no idea what the problem
> > was. I
> > > > saw
> > > > > > > very frequent (much more than usual) messages of the form:
> > > > > > > INFO  [kafka-request-handler-2]
> > [kafka.server.KafkaApis
> > > > > > >   ]: [KafkaApi-6] Close connection due to error handling
> > > produce
> > > > > > > request with correlation id 294218 from client id  with ack=0
> > > > > > > and also message of the form:
> > > > > > > INFO  [kafka-network-thread-9092-0]
> > > [kafka.network.Processor
> > > > > > >   ]: Closing socket connection to /some ip
> > > > > > > The cluster was actually a critical one so I had no recourse
> but
> > to
> > > > > > revert
> > > > > > > the change (which like noted didn't fix things). I didn't have
> > > enough
> > > > > > time
> > > > > > > to debug further. The only way I could fix it with my limited
> > Kafka
> > > > > > > knowledge was (after reverting) deleting the topic and
> recreating
> > > it.
> > > > > > > I had updated a low priority cluster before that worked just
> > fine.
> > > > That
> > > > > > > gave me the confidence to upgrade this higher priority cluster
> > > which
> > > > > did
> > > > > > > NOT work out. So the only way for me to try to reproduce it is
> to
> > > try
> > > > > > this
> > > > > > > on our larger clusters again. But it is critical that we don't
> > mess
> > > > up
> > > > > > this
> > > > > > > high priority cluster so I am afraid to try again.
> > > > > > >
> > > > > > > > 2. As Lance mentioned, if you are using snappy, make sure
> that
> > > you
> > > > > > > include
> > > > > > > > the right snappy jar (1.1.1.7).
> > > > > > > >
> > > > > > > Wonder why I don't see Lance's email in this thread. Either way
> > we
> > > > are
> > > > > > not
> > > > > > > using compression of any 

Re: failed with LeaderNotAvailableError -

2015-12-17 Thread Dana Powers
No - sorry if I wasn't clear. The "error" is not related to your servers.
Just make sure that you create the topic before sending messages to it. If
you do not create the topic beforehand, the server will auto-create, but it
does take a few seconds to initialize. If you plan to rely on auto-creation
in production, I recommend either wrapping producer.send_messages in a try
/ except and retrying on LeaderNotAvailableError, or alternatively calling
client.ensure_topic_exists (topic) to block until initialization is done.

Also I'll try to get out a new release of kafka-python soon that handles
the retry internally.

-Dana
On Dec 17, 2015 6:17 PM, "David Montgomery" <davidmontgom...@gmail.com>
wrote:

> So what do I do?  Kill my production servers and rebuild?  Restarting all
> services does nit work.  This seems kinda extreme.  At this point I feel I
> have to kill all servers and rebuild.
>
> Thanks
>
> On Fri, Dec 18, 2015 at 2:28 AM, Dana Powers <dana.pow...@gmail.com>
> wrote:
>
> > Hi Ben and Marko -- great suggestions re: connection failures and docker.
> >
> > The specific error here is: LeaderNotAvailableError:
> > TopicMetadata(topic='topic-test-production', error=5, partitions=[])
> >
> > That is an error code (5) returned from a MetadataRequest. In this
> context
> > it means that the topic did not exist and so the request triggered an
> > auto-create initialization (i.e., the connection was fine). Topic
> > initialization tends to take a few seconds to complete, but only needs to
> > happen once per topic. A retry here is generally fine. This retry should
> > probably be handled under the covers by the client code. So in this case
> I
> > would treat it as a simple kafka-python issue (#488).
> >
> > -Dana
> >
> > On Thu, Dec 17, 2015 at 4:58 AM, Ben Davison <ben.davi...@7digital.com>
> > wrote:
> >
> > > I probably should of mentioned that this was using Amazon ECS.
> > >
> > > On Thu, Dec 17, 2015 at 12:18 PM, Marko Bonaći <
> > marko.bon...@sematext.com>
> > > wrote:
> > >
> > > > It doesn't have to be FQDN.
> > > >
> > > > Here's how I run Kafka in a container:
> > > > docker run --name st-kafka -p 2181:2181 -p 9092:9092 -e
> > > > ADVERTISED_HOST=`docker-machine ip dev-st` -e ADVERTISED_PORT=9092 -d
> > > > spotify/kafka
> > > >
> > > > And then you have access to Kafka on the docker host VM from any
> other
> > > > machine.
> > > > BTW I use Spotify's image since it contains both ZK and Kafka, but I
> > > think
> > > > the latest version they built is 0.8.2.1, so you might have to build
> > the
> > > > new image yourself if you need 0.9, but that's trivial to do.
> > > >
> > > > Marko Bonaći
> > > > Monitoring | Alerting | Anomaly Detection | Centralized Log
> Management
> > > > Solr & Elasticsearch Support
> > > > Sematext <http://sematext.com/> | Contact
> > > > <http://sematext.com/about/contact.html>
> > > >
> > > > On Thu, Dec 17, 2015 at 11:33 AM, Ben Davison <
> > ben.davi...@7digital.com>
> > > > wrote:
> > > >
> > > > > Hi David,
> > > > >
> > > > > Are you running in docker? Are you trying to connect from to a
> remote
> > > > box?
> > > > > We found we could connect locally but couldn't connect from another
> > > > remote
> > > > > host.
> > > > >
> > > > > (I've just started using kafka also)
> > > > >
> > > > > We had the same issue and found out: host.name=<%=@ipaddress%>
> > needed
> > > to
> > > > > be
> > > > > the FQDN of the box.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Ben
> > > > >
> > > > > On Thu, Dec 17, 2015 at 5:40 AM, David Montgomery <
> > > > > davidmontgom...@gmail.com
> > > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I am very concerned about using kafka in production given the
> below
> > > > > > errors:
> > > > > >
> > > > > > Now issues with myt zookeeper.  Other services use ZK.  Only
> kafka
> > > > fails.
> > > > > > I have 2 kafka servers using 8.x.  How do I resolve?  I tried
&g

Re: kafka-python question

2015-08-13 Thread Dana Powers
Hi AL,

kafka deals in blobs, so you generally have to manage serialization /
deserialization at the producer + consumer level. kafka-python's
SimpleProducer and SimpleConsumer classes are fairly naive and operate
exclusively on bytes, so if you use those you will have to serialize before
producing, and deserialize after consuming. `dump` and `loads` from json or
msgpack or equivalent should work fine.

Note that the new KafkaConsumer class exposes the `deserializer_class`
configuration option, which operates the same as the official java client
library. Unfortunately, there is no such class for the producer side in
kafka-python (though an unfinished WIP exists at
https://github.com/mumrah/kafka-python/pull/333). So `KafkaConsumer(...,
deserializer_class=json.loads)` would be sufficient to consume directly
into python objects if you've produced as json.

HTH

-Dana
(kafka-python maintainer)

On Thu, Aug 13, 2015 at 3:14 PM, Sa Li sal...@gmail.com wrote:

 Hi, All

 I have a question about kafka-python producer,  here is the record I have

 id (uuid)  | sensor_id (character)  |  timestamp | period (int)  | current
 (numeric)  |  date_received |  factor (bigint)
 75da661c-bd5c-40e3-8691-9034f34262e3”  |  “ff0057”  |  2013-03-21
 11:44:00-07”  |  60  |0.1200  |2013-03-26 14:40:51.829-07”  |
 7485985

 I am getting data from database and publishing to kafka, I am having the
 error of timestamp  decimal serialization, can’t just publish each record
 as a list. I am thinking to convert each record to son object, before I do
 that, anyone knows more straightforward way to publish directly to a list
 (kafka consumer can read each record as a list or dictionary?

 thanks

 AL


Re: kafka-python message offset?

2015-07-29 Thread Dana Powers
 and awaiting new
 messages. If I run through python, it also hangs, which is why I suspect it
 is insistently reading from the end.  See below:
 
  
  print Begin constructing SimpleConsumer
  client = KafkaClient(servers)
  s_consumer = SimpleConsumer(client,
  topic,
  group_id,
  partitions=[0], #Not sure I need this
  auto_commit=False, #Tried with and
 without, doesn't fix the problem
  auto_offset_reset='smallest' #Tried with
 and without, doesn't fix the problem
  )
  print End constructing SimpleConsumer\n
 
  print Begin reading messages
  try:
  for message in s_consumer:
  print   New message
  print+ message.topic
  print+ message.partition
  print+ message.offset
  print+ message.key
  print+ message.value
  except Exception as e:
  print Exception: , e
  print End reading messages\n
 
  print End all
  
 
  Output:
 
  Begin all
 
  Begin constructing SimpleConsumer
  End constructing SimpleConsumer
 
  Begin reading messages
 
  
  It just hangs after that.  I also tried with a KafkaConsumer instead of
 a SimpleConsumer and it does exactly the same thing.  I'm not sure what to
 do.
 
 
 
  Keith Wiley
  Senior Software Engineer, Atigeo
  keith.wi...@atigeo.com
 
 
 
 
 
  
  From: Dana Powers dana.pow...@rd.io
  Sent: Tuesday, July 28, 2015 09:58 PM
  To: users@kafka.apache.org
  Subject: Re: kafka-python message offset?
 
  Hi Keith,
 
  you can use the `auto_offset_reset` parameter to kafka-python's
  KafkaConsumer. It behaves the same as the java consumer configuration of
  the same name. See
 
 http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.configure
  for more details on how to configure a KafkaConsumer instance.
 
  For fine-grained control wrt configuring topic/partition offsets, use
  KafkaConsumer.set_topic_partitions() . For the most control, pass a
  dictionary of {(topic, partition): offset, ...} .
  see
 
 http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.kafka.KafkaConsumer.set_topic_partitions
 
  -Dana
 
  On Tue, Jul 28, 2015 at 8:47 PM, Keith Wiley keith.wi...@atigeo.com
 wrote:
 
   I haven?t found a way to specify that a consumer should read from the
   beginning, or from any other explicit offset, or that the offset
 should be
   ?reset? in any way.  The command-line shell scripts (which I believe
 simply
   wrap the Scala tools) have flags for this sort of thing.  Is there any
 way
   to do this through the python library?
  
   Thanks.



Re: kafka-python message offset?

2015-07-28 Thread Dana Powers
Hi Keith,

you can use the `auto_offset_reset` parameter to kafka-python's
KafkaConsumer. It behaves the same as the java consumer configuration of
the same name. See
http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.configure
for more details on how to configure a KafkaConsumer instance.

For fine-grained control wrt configuring topic/partition offsets, use
KafkaConsumer.set_topic_partitions() . For the most control, pass a
dictionary of {(topic, partition): offset, ...} .
see
http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.kafka.KafkaConsumer.set_topic_partitions

-Dana

On Tue, Jul 28, 2015 at 8:47 PM, Keith Wiley keith.wi...@atigeo.com wrote:

 I haven’t found a way to specify that a consumer should read from the
 beginning, or from any other explicit offset, or that the offset should be
 “reset” in any way.  The command-line shell scripts (which I believe simply
 wrap the Scala tools) have flags for this sort of thing.  Is there any way
 to do this through the python library?

 Thanks.


Re: KafkaConfigurationError: No topics or partitions configured

2015-07-28 Thread Dana Powers
Hi Keith,

kafka-python raises FailedPayloadsError on unspecified server failures.
Typically this is caused by a server exception that results in a 0 byte
response. Have you checked your server logs?

-Dana


On Tue, Jul 28, 2015 at 2:01 PM, JIEFU GONG jg...@berkeley.edu wrote:

 This won't be very helpful as I am not too experienced with Python or your
 use case, but Java Consumers to indeed have to create a String out of the
 byte array returned from a successful consumption like:

 String actualmsg = new String(messageAndMetadata.message())

 Consult something like this as it seems you'll need to decode the bytes are
 you are receiving:
 http://stackoverflow.com/questions/606191/convert-bytes-to-a-python-string


 On Tue, Jul 28, 2015 at 1:41 PM, Keith Wiley keith.wi...@atigeo.com
 wrote:

  Thank you. It looks like I had the 'topic' slightly wrong.  I didn't
  realize it was case-sensitive.  I got past that error, but now I'm
 bumping
  up against another error:
 
  Traceback (most recent call last):
  ...
File /usr/local/lib/python2.7/dist-packages/kafka/consumer/kafka.py,
  line 59, in __init__
  self.set_topic_partitions(*topics)
File /usr/local/lib/python2.7/dist-packages/kafka/consumer/kafka.py,
  line 242, in set_topic_partitions
  self._get_commit_offsets()
File /usr/local/lib/python2.7/dist-packages/kafka/consumer/kafka.py,
  line 618, in _get_commit_offsets
  check_error(resp)
File /usr/local/lib/python2.7/dist-packages/kafka/common.py, line
 230,
  in check_error
  raise response
  kafka.common.FailedPayloadsError
 
  I've been told the producer is generating byte array data, not string
  data.  I'm unsure whether that is the cause or what to do about it.
 
  Keith Wiley
  Senior Software Engineer, Atigeo
  keith.wi...@atigeo.com
 
 
 
 
 
  
  From: JIEFU GONG jg...@berkeley.edu
  Sent: Tuesday, July 28, 2015 01:31 PM
  To: users@kafka.apache.org
  Subject: Re: KafkaConfigurationError: No topics or partitions configured
 
  Can you confirm that there are indeed messages in the topic that you
  published to?
 
  bin/kafka-console-consumer.sh --zookeeper [details] --topic [topic]
  --from-beginning
 
  That should be the right command, and you can use that to first verify
 that
  messages have indeed been published to the topic in question.
  ᐧ
 
  On Tue, Jul 28, 2015 at 11:33 AM, Keith Wiley keith.wi...@atigeo.com
  wrote:
 
   I'm trying to get a basic consumer off the ground.  I can create the
   consumer but I can't do anything at the message level:
  
  
   consumer = KafkaConsumer(topic,
group_id=group_id,
bootstrap_servers=[ip + : + port])
  
   for m in consumer:
   print x
  
   Note that I'm not even trying to use the message, I'm just trying loop
   over the consumer.  I'm getting an exception there somehow:
  
  
   Exception: No topics or partitions configured
   Traceback (most recent call last):
 File timed exec, line 3, in module
 File
 /usr/local/lib/python2.7/dist-packages/kafka/consumer/kafka.py,
   line 290, in next
   return six.next(self._get_message_iterator())
 File
 /usr/local/lib/python2.7/dist-packages/kafka/consumer/kafka.py,
   line 324, in fetch_messages
   raise KafkaConfigurationError('No topics or partitions configured')
   KafkaConfigurationError: No topics or partitions configured
  
  
   Any ideas?  I've been assured (although perhaps incorrectly) that the
   producer is configured, up and running.  Thanks.
  
  
   Keith Wiley
   Senior Software Engineer, Atigeo
   keith.wi...@atigeo.com
  
   [atigeo]http://atigeo.com/
   [twitter]https://twitter.com/atigeo [LinkedIn] 
   https://www.linkedin.com/company/atigeo  [YouTube] 
   https://www.youtube.com/user/AtigeoXpatterns/  [blog] 
   http://xpatterns.com/blog/
  
  
 
 
  --
 
  Jiefu Gong
  University of California, Berkeley | Class of 2017
  B.A Computer Science | College of Letters and Sciences
 
  jg...@berkeley.edu elise...@berkeley.edu | (925) 400-3427
 



 --

 Jiefu Gong
 University of California, Berkeley | Class of 2017
 B.A Computer Science | College of Letters and Sciences

 jg...@berkeley.edu elise...@berkeley.edu | (925) 400-3427



Re: Load Balancing Kafka

2015-07-16 Thread Dana Powers
I think the answer here is that the Kafka protocol includes a broker
metadata api. The client uses the broker host(s) you provide to discover
the full list of brokers in the cluster (and the topics+partitions each
manages/leads). The java client has a similar interface via
metadata.brokers.list / bootstrap.servers.

-Dana
AhŠ It seems you are more focusing on producer side workload balanceŠ If
that is the case, please ignore my previous comments.

Jiangjie (Becket) Qin

On 7/15/15, 6:01 PM, Jiangjie Qin j...@linkedin.com wrote:

If you have pretty balanced traffic on each partition and have set
auto.leader.rebalance.enabled to true or false, you might not need to do
further workload balance.

However, in most cases you probably still need to do some sort of load
balancing based on the traffic and disk utilization of each broker. You
might want to do leader migration and/or partition reassignment.

Leader migration is a cheaper rebalance and mostly addresses CPU and
Network unbalance. Partition reassignment is a much more expensive
operation as it moves actual data, this can help with disk utilization in
addition to CPU and network.

Thanks,

Jiangjie (Becket) Qin

On 7/15/15, 5:19 PM, Sandy Waters sandy.watermell...@gmail.com wrote:

Hi all,

Do I need to load balance against the brokers?  I am using the python
driver and it seems to only want a single kafka broker host.  However, in
a
situation where I have 10 brokers, is it still fine to just give it one
host.  Does zookeeper and kafka handle the load balancing and redirect my
push somewhere else?

Would it hurt if I load balanced with Nginx and had it do round robin to
the brokers?

Much thanks for any help.

-Sandy



[ANNOUNCE] kafka-python 0.9.4

2015-06-12 Thread Dana Powers
Pleased to announce that the kafka-python community released v0.9.4 to pypi
last night. This release is focused on stability enhancements, cleanups,
and bugfixes. We had a record 13 different contributors w/ merged PRs, and
even more folks from the community supporting w/ issues, comments, and
discussion. Thanks to everyone that contributed!

https://pypi.python.org/pypi/kafka-python

Dana Powers
Rdio, Inc.
dana.pow...@rd.io
rdio.com/people/dpkp/


Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Dana Powers
Hi Gwen, I am using/writing kafka-python to construct api requests and have
not dug too deeply into the server source code.  But I believe it is
kafka/api/OffsetCommitRequest.scala and specifically the readFrom method
used to decode the wire protocol.

-Dana
OffsetCommitRequest has two constructors now:

For version 0:
 OffsetCommitRequest(String groupId, MapTopicPartition,
PartitionData offsetData)

And version 1:
OffsetCommitRequest(String groupId, int generationId, String
consumerId, MapTopicPartition, PartitionData offsetData)

None of them seem to require timestamps... so I'm not sure where you
see that this is required. Can you share an example?

Gwen

On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers dana.pow...@rd.io wrote:
 Hi Joel,

 I'm looking more closely at the OffsetCommitRequest wire protocol change
 you mentioned below, and I cannot figure out how to explicitly construct a
 request with the earlier version.  Should the api version be different for
 requests that do not include it and/or servers that do not support the
 timestamp field?  It looks like 0.8.1.1 did not include the timestamp
field
 and used api version 0.  But 0.8.2-beta seems to now require timestamps
 even when I explicitly encode OffsetCommitRequest api version 0 (server
 logs a BufferUnderflowException).

 Is this the expected server behavior?  Can you provide any tips on how
 third-party clients should manage the wire-protocol change for this api
 method (I'm working on kafka-python)?

 Thanks,

 -Dana

 On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Yes it should be backwards compatible. So for e.g., you should be able
 to use an 0.8.1 client with an 0.8.2 broker. In general, you should
 not upgrade your clients until after the brokers have been upgraded.
 However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
 protocol change I'm aware of is the OffsetCommitRequest.  There is a
 change in the OffsetCommitRequest format (KAFKA-1634) although you can
 explicitly construct an OffsetCommitRequest with the earlier version.

 Thanks,

 Joel

 On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
  Hi Joel,
 
  Thanks for all the clarifications!  Just another question on this: will
  0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
  Generally speaking, would there be any concerns with using the 0.8.2
  consumer with a 0.8.1 broker, for instance?
 
  Marius
 
  On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy jjkosh...@gmail.com
wrote:
 
   Inline..
  
   On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
Hello everyone,
   
I have a few questions about the current status and future of the
 Kafka
consumers.
   
We have been working to adding Kafka support in Spring XD [1],
 currently
using the high level consumer via Spring Integration Kafka [2]. We
 are
working on adding features such as:
- the ability to control offsets/replay topics;
- the ability to control partition allocation across multiple
 consumers;
   
We are currently at version 0.8.1.1, so using the simple consumer
is
 a
pretty straightforward choice right now. However, in the light of
the
upcoming consumer changes for 0.8.2 and 0.9, I have a few
questions:
   
1) With respect to the consumer redesign for 0.9, what is the
future
 of
   the
Simple Consumer and High Level Consumer? To my best understanding,
 the
existing high level consumer API will be deprecated in favour of
the
 new
consumer API. What is the future of the Simple Consumer, in this
 case? it
will continue to exist as a low-level API implementing the Kafka
 protocol
[3] and providing the building blocks for the new consumer, or will
 it be
deprecated as well?
  
   The new consumer will subsume both use-cases (simple and high-level).
   You can still use the old SimpleConsumer if you wish - i.e., the wire
   protocol for fetch and other requests will still be supported.
  
   
2) Regarding the new consumer: the v0.8.2 codebase contains an
early
implementation of it, but since this a feature scheduled only for
 0.9,
   what
is its status as well? Is it included only as a future reference
and
 for
stabilizing the API?
  
   It is a WIP so you cannot really use it.
  
3) Obviously, offset management is a concern if using the simple
   consumer,
so - wondering about the Offset Management API as well. The Kafka
   protocol
document specifically indicates that it will be fully functional in
 0.8.2
[4] - however, a functional implementation is already available in
   0.8.1.1
(accessible via the SimpleConsumer API but not documented in [5]).
 Again,
trying to understand the extent of what 0.8.1.1 already supports
(ostensibly, the offset manager support seems to have been added
 only in
0.8.2 - please correct me if I am wrong), and whether if it is
   recommended
for use in production in any form

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Dana Powers
ok, opened KAFKA-1841 .  KAFKA-1634 also related.

-Dana

On Mon, Jan 5, 2015 at 10:55 AM, Gwen Shapira gshap...@cloudera.com wrote:

 Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
 part of the Map changed, which will modify the wire protocol.

 This is actually not handled in the Java client either. It will send
 the timestamp no matter which version is used.

 This looks like a bug and I'd even mark it as blocker for 0.8.2 since
 it may prevent rolling upgrades.

 Are you opening the JIRA?

 Gwen

 On Mon, Jan 5, 2015 at 10:28 AM, Dana Powers dana.pow...@rd.io wrote:
  specifically comparing 0.8.1 --
 
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
  ```
  (1 to partitionCount).map(_ = {
val partitionId = buffer.getInt
val offset = buffer.getLong
val metadata = readShortString(buffer)
(TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset,
  metadata))
  })
  ```
 
  totrunk --
 
 
 https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
  ```
  (1 to partitionCount).map(_ = {
val partitionId = buffer.getInt
val offset = buffer.getLong
val timestamp = {
  val given = buffer.getLong
  if (given == -1L) now else given
}
val metadata = readShortString(buffer)
(TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset,
  metadata, timestamp))
  })
  ```
 
  should the `timestamp` buffer read be wrapped in an api version check?
 
 
  Dana Powers
  Rdio, Inc.
  dana.pow...@rd.io
  rdio.com/people/dpkp/
 
  On Mon, Jan 5, 2015 at 9:49 AM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  Ah, I see :)
 
  The readFrom function basically tries to read two extra fields if you
  are on version 1:
 
  if (versionId == 1) {
groupGenerationId = buffer.getInt
consumerId = readShortString(buffer)
  }
 
  The rest looks identical in version 0 and 1, and still no timestamp in
  sight...
 
  Gwen
 
  On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers dana.pow...@rd.io wrote:
   Hi Gwen, I am using/writing kafka-python to construct api requests and
  have
   not dug too deeply into the server source code.  But I believe it is
   kafka/api/OffsetCommitRequest.scala and specifically the readFrom
 method
   used to decode the wire protocol.
  
   -Dana
   OffsetCommitRequest has two constructors now:
  
   For version 0:
OffsetCommitRequest(String groupId, MapTopicPartition,
   PartitionData offsetData)
  
   And version 1:
   OffsetCommitRequest(String groupId, int generationId, String
   consumerId, MapTopicPartition, PartitionData offsetData)
  
   None of them seem to require timestamps... so I'm not sure where you
   see that this is required. Can you share an example?
  
   Gwen
  
   On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers dana.pow...@rd.io
 wrote:
   Hi Joel,
  
   I'm looking more closely at the OffsetCommitRequest wire protocol
 change
   you mentioned below, and I cannot figure out how to explicitly
  construct a
   request with the earlier version.  Should the api version be
 different
  for
   requests that do not include it and/or servers that do not support
 the
   timestamp field?  It looks like 0.8.1.1 did not include the timestamp
   field
   and used api version 0.  But 0.8.2-beta seems to now require
 timestamps
   even when I explicitly encode OffsetCommitRequest api version 0
 (server
   logs a BufferUnderflowException).
  
   Is this the expected server behavior?  Can you provide any tips on
 how
   third-party clients should manage the wire-protocol change for this
 api
   method (I'm working on kafka-python)?
  
   Thanks,
  
   -Dana
  
   On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy jjkosh...@gmail.com
  wrote:
  
   Yes it should be backwards compatible. So for e.g., you should be
 able
   to use an 0.8.1 client with an 0.8.2 broker. In general, you should
   not upgrade your clients until after the brokers have been upgraded.
   However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
   protocol change I'm aware of is the OffsetCommitRequest.  There is a
   change in the OffsetCommitRequest format (KAFKA-1634) although you
 can
   explicitly construct an OffsetCommitRequest with the earlier
 version.
  
   Thanks,
  
   Joel
  
   On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
Hi Joel,
   
Thanks for all the clarifications!  Just another question on this:
  will
0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with
 0.8?
Generally speaking, would there be any concerns with using the
 0.8.2
consumer with a 0.8.1 broker, for instance?
   
Marius
   
On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy jjkosh...@gmail.com
   wrote:
   
 Inline..

 On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici
 wrote:
  Hello everyone,
 
  I have a few questions about the current status and future

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Dana Powers
specifically comparing 0.8.1 --

https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
```
(1 to partitionCount).map(_ = {
  val partitionId = buffer.getInt
  val offset = buffer.getLong
  val metadata = readShortString(buffer)
  (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset,
metadata))
})
```

totrunk --

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
```
(1 to partitionCount).map(_ = {
  val partitionId = buffer.getInt
  val offset = buffer.getLong
  val timestamp = {
val given = buffer.getLong
if (given == -1L) now else given
  }
  val metadata = readShortString(buffer)
  (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset,
metadata, timestamp))
})
```

should the `timestamp` buffer read be wrapped in an api version check?


Dana Powers
Rdio, Inc.
dana.pow...@rd.io
rdio.com/people/dpkp/

On Mon, Jan 5, 2015 at 9:49 AM, Gwen Shapira gshap...@cloudera.com wrote:

 Ah, I see :)

 The readFrom function basically tries to read two extra fields if you
 are on version 1:

 if (versionId == 1) {
   groupGenerationId = buffer.getInt
   consumerId = readShortString(buffer)
 }

 The rest looks identical in version 0 and 1, and still no timestamp in
 sight...

 Gwen

 On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers dana.pow...@rd.io wrote:
  Hi Gwen, I am using/writing kafka-python to construct api requests and
 have
  not dug too deeply into the server source code.  But I believe it is
  kafka/api/OffsetCommitRequest.scala and specifically the readFrom method
  used to decode the wire protocol.
 
  -Dana
  OffsetCommitRequest has two constructors now:
 
  For version 0:
   OffsetCommitRequest(String groupId, MapTopicPartition,
  PartitionData offsetData)
 
  And version 1:
  OffsetCommitRequest(String groupId, int generationId, String
  consumerId, MapTopicPartition, PartitionData offsetData)
 
  None of them seem to require timestamps... so I'm not sure where you
  see that this is required. Can you share an example?
 
  Gwen
 
  On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers dana.pow...@rd.io wrote:
  Hi Joel,
 
  I'm looking more closely at the OffsetCommitRequest wire protocol change
  you mentioned below, and I cannot figure out how to explicitly
 construct a
  request with the earlier version.  Should the api version be different
 for
  requests that do not include it and/or servers that do not support the
  timestamp field?  It looks like 0.8.1.1 did not include the timestamp
  field
  and used api version 0.  But 0.8.2-beta seems to now require timestamps
  even when I explicitly encode OffsetCommitRequest api version 0 (server
  logs a BufferUnderflowException).
 
  Is this the expected server behavior?  Can you provide any tips on how
  third-party clients should manage the wire-protocol change for this api
  method (I'm working on kafka-python)?
 
  Thanks,
 
  -Dana
 
  On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy jjkosh...@gmail.com
 wrote:
 
  Yes it should be backwards compatible. So for e.g., you should be able
  to use an 0.8.1 client with an 0.8.2 broker. In general, you should
  not upgrade your clients until after the brokers have been upgraded.
  However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
  protocol change I'm aware of is the OffsetCommitRequest.  There is a
  change in the OffsetCommitRequest format (KAFKA-1634) although you can
  explicitly construct an OffsetCommitRequest with the earlier version.
 
  Thanks,
 
  Joel
 
  On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
   Hi Joel,
  
   Thanks for all the clarifications!  Just another question on this:
 will
   0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
   Generally speaking, would there be any concerns with using the 0.8.2
   consumer with a 0.8.1 broker, for instance?
  
   Marius
  
   On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy jjkosh...@gmail.com
  wrote:
  
Inline..
   
On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
 Hello everyone,

 I have a few questions about the current status and future of the
  Kafka
 consumers.

 We have been working to adding Kafka support in Spring XD [1],
  currently
 using the high level consumer via Spring Integration Kafka [2].
 We
  are
 working on adding features such as:
 - the ability to control offsets/replay topics;
 - the ability to control partition allocation across multiple
  consumers;

 We are currently at version 0.8.1.1, so using the simple consumer
  is
  a
 pretty straightforward choice right now. However, in the light of
  the
 upcoming consumer changes for 0.8.2 and 0.9, I have a few
  questions:

 1) With respect to the consumer redesign for 0.9, what is the
  future
  of
the
 Simple Consumer and High Level Consumer? To my best
 understanding

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-04 Thread Dana Powers
Hi Joel,

I'm looking more closely at the OffsetCommitRequest wire protocol change
you mentioned below, and I cannot figure out how to explicitly construct a
request with the earlier version.  Should the api version be different for
requests that do not include it and/or servers that do not support the
timestamp field?  It looks like 0.8.1.1 did not include the timestamp field
and used api version 0.  But 0.8.2-beta seems to now require timestamps
even when I explicitly encode OffsetCommitRequest api version 0 (server
logs a BufferUnderflowException).

Is this the expected server behavior?  Can you provide any tips on how
third-party clients should manage the wire-protocol change for this api
method (I'm working on kafka-python)?

Thanks,

-Dana

On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Yes it should be backwards compatible. So for e.g., you should be able
 to use an 0.8.1 client with an 0.8.2 broker. In general, you should
 not upgrade your clients until after the brokers have been upgraded.
 However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
 protocol change I'm aware of is the OffsetCommitRequest.  There is a
 change in the OffsetCommitRequest format (KAFKA-1634) although you can
 explicitly construct an OffsetCommitRequest with the earlier version.

 Thanks,

 Joel

 On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
  Hi Joel,
 
  Thanks for all the clarifications!  Just another question on this: will
  0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
  Generally speaking, would there be any concerns with using the 0.8.2
  consumer with a 0.8.1 broker, for instance?
 
  Marius
 
  On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy jjkosh...@gmail.com wrote:
 
   Inline..
  
   On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
Hello everyone,
   
I have a few questions about the current status and future of the
 Kafka
consumers.
   
We have been working to adding Kafka support in Spring XD [1],
 currently
using the high level consumer via Spring Integration Kafka [2]. We
 are
working on adding features such as:
- the ability to control offsets/replay topics;
- the ability to control partition allocation across multiple
 consumers;
   
We are currently at version 0.8.1.1, so using the simple consumer is
 a
pretty straightforward choice right now. However, in the light of the
upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
   
1) With respect to the consumer redesign for 0.9, what is the future
 of
   the
Simple Consumer and High Level Consumer? To my best understanding,
 the
existing high level consumer API will be deprecated in favour of the
 new
consumer API. What is the future of the Simple Consumer, in this
 case? it
will continue to exist as a low-level API implementing the Kafka
 protocol
[3] and providing the building blocks for the new consumer, or will
 it be
deprecated as well?
  
   The new consumer will subsume both use-cases (simple and high-level).
   You can still use the old SimpleConsumer if you wish - i.e., the wire
   protocol for fetch and other requests will still be supported.
  
   
2) Regarding the new consumer: the v0.8.2 codebase contains an early
implementation of it, but since this a feature scheduled only for
 0.9,
   what
is its status as well? Is it included only as a future reference and
 for
stabilizing the API?
  
   It is a WIP so you cannot really use it.
  
3) Obviously, offset management is a concern if using the simple
   consumer,
so - wondering about the Offset Management API as well. The Kafka
   protocol
document specifically indicates that it will be fully functional in
 0.8.2
[4] - however, a functional implementation is already available in
   0.8.1.1
(accessible via the SimpleConsumer API but not documented in [5]).
 Again,
trying to understand the extent of what 0.8.1.1 already supports
(ostensibly, the offset manager support seems to have been added
 only in
0.8.2 - please correct me if I am wrong), and whether if it is
   recommended
for use in production in any form (with the caveats that accompany
 the
   use
of ZooKeeper).
  
   In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use Kafka
   as the offsets storage mechanism (not zookeeper). High-level Java
   consumers can choose to store offsets in ZooKeeper instead by setting
   offsets.storage=zookeeper
  
   However, if you are using the simple consumer and wish to store
   offsets in ZooKeeper you will need to commit to ZooKeeper directly.
   You can use ZkUtils in the kafka.utils package for this.
  
   If you wish to move to Kafka-based offsets we will be adding a new
   OffsetsClient that can be used to commit/fetch offsets to/from Kafka.
   This is currently not listed as a blocker for 0.8.2 but I think we
   should include it. I will update 

step by step guide for writing a non-java 0.9 client

2014-08-23 Thread Dana Powers
The 0.9 Consumer re-write design document refers to a step by step guide
for writing a non-java 0.9 client but the link doesnt work -- it appears
to be an internal LinkedIn document?  Is this something that could be
published more broadly?

This is the link:

https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/How+to+write+a+0.9+consumer

From here:

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-ConsumerHowTo


Thanks,

Dana Powers
Rdio, Inc.
dana.pow...@rd.io
rdio.com/people/dpkp/


Re: Improving the Kafka client ecosystem

2014-08-19 Thread Dana Powers
I created kafka-clie...@groups.google.com

https://groups.google.com/forum/m/#!forum/kafka-clients

No members and no guidelines yet, but it's a start.  Would love to get this
going.

Dana
 On Aug 19, 2014 9:03 AM, Mark Roberts wiz...@gmail.com wrote:

 Did this mailing list ever get created? Was there consensus that it did or
 didn't need created?

 -Mark

  On Jul 18, 2014, at 14:34, Jay Kreps jay.kr...@gmail.com wrote:
 
  A question was asked in another thread about what was an effective way
  to contribute to the Kafka project for people who weren't very
  enthusiastic about writing Java/Scala code.
 
  I wanted to kind of advocate for an area I think is really important
  and not as good as it could be--the client ecosystem. I think our goal
  is to make Kafka effective as a general purpose, centralized, data
  subscription system. This vision only really works if all your
  applications, are able to integrate easily, whatever language they are
  in.
 
  We have a number of pretty good non-java producers. We have been
  lacking the features on the server-side to make writing non-java
  consumers easy. We are fixing that right now as part of the consumer
  work going on right now (which moves a lot of the functionality in the
  java consumer to the server side).
 
  But apart from this I think there may be a lot more we can do to make
  the client ecosystem better.
 
  Here are some concrete ideas. If anyone has additional ideas please
  reply to this thread and share them. If you are interested in picking
  any of these up, please do.
 
  1. The most obvious way to improve the ecosystem is to help work on
  clients. This doesn't necessarily mean writing new clients, since in
  many cases we already have a client in a given language. I think any
  way we can incentivize fewer, better clients rather than many
  half-working clients we should do. However we are working now on the
  server-side consumer co-ordination so it should now be possible to
  write much simpler consumers.
 
  2. It would be great if someone put together a mailing list just for
  client developers to share tips, tricks, problems, and so on. We can
  make sure all the main contributors on this too. I think this could be
  a forum for kind of directing improvements in this area.
 
  3. Help improve the documentation on how to implement a client. We
  have tried to make the protocol spec not just a dry document but also
  have it share best practices, rationale, and intentions. I think this
  could potentially be even better as there is really a range of options
  from a very simple quick implementation to a more complex highly
  optimized version. It would be good to really document some of the
  options and tradeoffs.
 
  4. Come up with a standard way of documenting the features of clients.
  In an ideal world it would be possible to get the same information
  (author, language, feature set, download link, source code, etc) for
  all clients. It would be great to standardize the documentation for
  the client as well. For example having one or two basic examples that
  are repeated for every client in a standardized way. This would let
  someone come to the Kafka site who is not a java developer, and click
  on the link for their language and view examples of interacting with
  Kafka in the language they know using the client they would eventually
  use.
 
  5. Build a Kafka Client Compatibility Kit (KCCK) :-) The idea is this:
  anyone who wants to implement a client would implement a simple
  command line program with a set of standardized options. The
  compatibility kit would be a standard set of scripts that ran their
  client using this command line driver and validate its behavior. E.g.
  for a producer it would test that it correctly can send messages, that
  the ordering is retained, that the client correctly handles
  reconnection and metadata refresh, and compression. The output would
  be a list of features that passed are certified, and perhaps basic
  performance information. This would be an easy way to help client
  developers write correct clients, as well as having a standardized
  comparison for the clients that says that they work correctly.
 
  -Jay



Re: Using Kafka Metrics

2014-03-21 Thread Dana Powers
Be aware that JMX metrics changed between 0.7 and 0.8.  If you use chef,
you might also check out https://github.com/bflad/chef-jmxtrans which has
recipes for both 0.7 and 0.8 kafka metrics - graphite.

Dana Powers
Rdio, Inc.
dana.pow...@rd.io
rdio.com/people/dpkp/


On Thu, Mar 20, 2014 at 7:46 AM, Andrew Otto o...@wikimedia.org wrote:

 I'm using jmxtrans to do this for Ganglia, but it should work the same for
 Graphite:

 http://www.jmxtrans.org/

 Here's an example Kafka jmxtrans json file.

 https://github.com/wikimedia/puppet-kafka/blob/master/kafka-jmxtrans.json.md

 You can change the output writers to use Graphite instead of Ganglia.


 On Mar 20, 2014, at 2:26 AM, Sanjay Mengani smeng...@mashery.com wrote:

  Please I need help for sending metrics to Graphite. Can anyone help me
 in resolving this.
 
  Thank You.
  Regards,
  Sanjay Mengani
  Extn : 4060
  Mobile : +91-9985267763