Re: Kafka wiki Documentation conventions - looking for feedback
I've tested my examples with the new (4/30) release and they work, so I've updated the documentation. Thanks, Chris On Mon, Apr 29, 2013 at 6:18 PM, Jun Rao jun...@gmail.com wrote: Thanks. I also updated your producer example to reflect a recent config change (broker.list = metadata.broker.list). Jun On Mon, Apr 29, 2013 at 11:19 AM, Chris Curtin curtin.ch...@gmail.com wrote: Thanks, I missed that the addition of consumers can cause a re-balance. Thought it was only on Leader changes. I've updated the wording in the example. I'll pull down the beta and test my application then change the names on the properties. Thanks, Chris On Mon, Apr 29, 2013 at 11:58 AM, Jun Rao jun...@gmail.com wrote: Basically, every time a consumer joins a group, every consumer in the groups gets a ZK notification and each of them tries to own a subset of the total number of partitions. A given partition is only assigned to one of the consumers in the same group. Once the ownership is determined, each consumer consumes messages coming from its partitions and manages the offset of those partitions. Since at any given point of time, a partition is only owned by one consumer, there won't be conflicts on updating the offsets. More details are described in the consumer rebalancing algorithm section of http://kafka.apache.org/07/design.html Thanks, Jun On Mon, Apr 29, 2013 at 8:16 AM, Chris Curtin curtin.ch...@gmail.com wrote: Jun, can you explain this a little better? I thought when using Consumer Groups that on startup Kafka connects to ZooKeeper and finds the last read offset for every partition in the topic being requested for the group. That is then the starting point for the consumer threads. If a second process starts while the first one is running with the same Consumer Group, won't the second one read the last offsets consumed by the already running process and start processing from there? Then as the first process syncs consumed offsets, won't the 2nd process's next update overwrite them? Thanks, Chris On Mon, Apr 29, 2013 at 11:03 AM, Jun Rao jun...@gmail.com wrote: Chris, Thanks for the writeup. Looks great overall. A couple of comments. 1. At the beginning, it sounds like that one can't run multiple processes of consumers in the same group. This is actually not true. We can create multiple instances of consumers for the same group in the same JVM or different JVMs. The consumers will auto-balance among themselves. 2. We have changed the name of some config properties. auto.commit.interval.ms is correct. However, zk.connect, zk.session.timeout.ms and zk.sync.time.ms are changed to zookeeper.connect, zookeeper.session.timeout.ms, and zookeeper.sync.time.ms, respectively. I will add a link to your wiki in our website. Thanks again. Jun On Mon, Apr 29, 2013 at 5:54 AM, Chris Curtin curtin.ch...@gmail.com wrote: Hi Jun, I finished and published it this morning: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example One question: when documenting the ConsumerConfig parameters I couldn't find a description for the 'auto.commit.interval.ms' setting. I found one for 'autocommit.interval.ms' (no '.' between auto and commit) in the Google Cache only. Which spelling is it? Also is my description of it correct? I'll take a look at custom encoders later this week. Today and Tuesday are going to be pretty busy. Please let me know if there are changes needed to the High Level Consumer page. Thanks, Chris On Mon, Apr 29, 2013 at 12:50 AM, Jun Rao jun...@gmail.com wrote: Chris, Any update of the high level consumer example? Also, in the Producer example, it would be useful to describe how to write a customized encoder. One subtle thing is that the encoder needs a constructor that takes a a single VerifiableProperties argument ( https://issues.apache.org/jira/browse/KAFKA-869). Thanks, Jun
Re: Kafka wiki Documentation conventions - looking for feedback
Chris, Thanks. This is very helpful. I linked your wiki pages to our website. A few more comments: 1. Producer: The details of the meaning of request.required.acks are described in http://kafka.apache.org/08/configuration.html. It would be great if you can add a link to the description in your wiki. 2. High level consumer: Could you add the proper way of stopping the consumer? One just need to call consumer.shutdown(). After this is called, hasNext() call in the Kafka stream iterator will return false. 3. SimpleConsumer: We have the following api that returns the offset of the last message exposed to the consumer. The difference btw high watermark and the offset of the last consumed message tells you how many messages the consumer is behind the broker. highWatermark(topic: String, partition: Int) Finally, it would be great if you can extend the wiki with customized encoder (Producer) and decoder (Consumer) at some point. Thanks, Jun On Wed, May 1, 2013 at 6:44 AM, Chris Curtin curtin.ch...@gmail.com wrote: I've tested my examples with the new (4/30) release and they work, so I've updated the documentation. Thanks, Chris On Mon, Apr 29, 2013 at 6:18 PM, Jun Rao jun...@gmail.com wrote: Thanks. I also updated your producer example to reflect a recent config change (broker.list = metadata.broker.list). Jun On Mon, Apr 29, 2013 at 11:19 AM, Chris Curtin curtin.ch...@gmail.com wrote: Thanks, I missed that the addition of consumers can cause a re-balance. Thought it was only on Leader changes. I've updated the wording in the example. I'll pull down the beta and test my application then change the names on the properties. Thanks, Chris On Mon, Apr 29, 2013 at 11:58 AM, Jun Rao jun...@gmail.com wrote: Basically, every time a consumer joins a group, every consumer in the groups gets a ZK notification and each of them tries to own a subset of the total number of partitions. A given partition is only assigned to one of the consumers in the same group. Once the ownership is determined, each consumer consumes messages coming from its partitions and manages the offset of those partitions. Since at any given point of time, a partition is only owned by one consumer, there won't be conflicts on updating the offsets. More details are described in the consumer rebalancing algorithm section of http://kafka.apache.org/07/design.html Thanks, Jun On Mon, Apr 29, 2013 at 8:16 AM, Chris Curtin curtin.ch...@gmail.com wrote: Jun, can you explain this a little better? I thought when using Consumer Groups that on startup Kafka connects to ZooKeeper and finds the last read offset for every partition in the topic being requested for the group. That is then the starting point for the consumer threads. If a second process starts while the first one is running with the same Consumer Group, won't the second one read the last offsets consumed by the already running process and start processing from there? Then as the first process syncs consumed offsets, won't the 2nd process's next update overwrite them? Thanks, Chris On Mon, Apr 29, 2013 at 11:03 AM, Jun Rao jun...@gmail.com wrote: Chris, Thanks for the writeup. Looks great overall. A couple of comments. 1. At the beginning, it sounds like that one can't run multiple processes of consumers in the same group. This is actually not true. We can create multiple instances of consumers for the same group in the same JVM or different JVMs. The consumers will auto-balance among themselves. 2. We have changed the name of some config properties. auto.commit.interval.ms is correct. However, zk.connect, zk.session.timeout.ms and zk.sync.time.ms are changed to zookeeper.connect, zookeeper.session.timeout.ms, and zookeeper.sync.time.ms, respectively. I will add a link to your wiki in our website. Thanks again. Jun On Mon, Apr 29, 2013 at 5:54 AM, Chris Curtin curtin.ch...@gmail.com wrote: Hi Jun, I finished and published it this morning: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example One question: when documenting the ConsumerConfig parameters I couldn't find a description for the 'auto.commit.interval.ms' setting. I found one for 'autocommit.interval.ms' (no '.' between auto and commit) in the Google Cache only. Which spelling is it? Also is my description of it correct? I'll take a look at custom encoders later this week. Today and Tuesday are going to be pretty busy.
Re: Kafka wiki Documentation conventions - looking for feedback
Hi Jun I've added #1 and #2. I'll need to think about where to put #3, maybe even adding a 'tips and tricks' section? I've not had to do any encoder/decoders. Can anyone else offer some example code I can incorporate into an example? Thanks, Chris On Wed, May 1, 2013 at 11:45 AM, Jun Rao jun...@gmail.com wrote: Chris, Thanks. This is very helpful. I linked your wiki pages to our website. A few more comments: 1. Producer: The details of the meaning of request.required.acks are described in http://kafka.apache.org/08/configuration.html. It would be great if you can add a link to the description in your wiki. 2. High level consumer: Could you add the proper way of stopping the consumer? One just need to call consumer.shutdown(). After this is called, hasNext() call in the Kafka stream iterator will return false. 3. SimpleConsumer: We have the following api that returns the offset of the last message exposed to the consumer. The difference btw high watermark and the offset of the last consumed message tells you how many messages the consumer is behind the broker. highWatermark(topic: String, partition: Int) Finally, it would be great if you can extend the wiki with customized encoder (Producer) and decoder (Consumer) at some point. Thanks, Jun On Wed, May 1, 2013 at 6:44 AM, Chris Curtin curtin.ch...@gmail.com wrote: I've tested my examples with the new (4/30) release and they work, so I've updated the documentation. Thanks, Chris On Mon, Apr 29, 2013 at 6:18 PM, Jun Rao jun...@gmail.com wrote: Thanks. I also updated your producer example to reflect a recent config change (broker.list = metadata.broker.list). Jun On Mon, Apr 29, 2013 at 11:19 AM, Chris Curtin curtin.ch...@gmail.com wrote: Thanks, I missed that the addition of consumers can cause a re-balance. Thought it was only on Leader changes. I've updated the wording in the example. I'll pull down the beta and test my application then change the names on the properties. Thanks, Chris On Mon, Apr 29, 2013 at 11:58 AM, Jun Rao jun...@gmail.com wrote: Basically, every time a consumer joins a group, every consumer in the groups gets a ZK notification and each of them tries to own a subset of the total number of partitions. A given partition is only assigned to one of the consumers in the same group. Once the ownership is determined, each consumer consumes messages coming from its partitions and manages the offset of those partitions. Since at any given point of time, a partition is only owned by one consumer, there won't be conflicts on updating the offsets. More details are described in the consumer rebalancing algorithm section of http://kafka.apache.org/07/design.html Thanks, Jun On Mon, Apr 29, 2013 at 8:16 AM, Chris Curtin curtin.ch...@gmail.com wrote: Jun, can you explain this a little better? I thought when using Consumer Groups that on startup Kafka connects to ZooKeeper and finds the last read offset for every partition in the topic being requested for the group. That is then the starting point for the consumer threads. If a second process starts while the first one is running with the same Consumer Group, won't the second one read the last offsets consumed by the already running process and start processing from there? Then as the first process syncs consumed offsets, won't the 2nd process's next update overwrite them? Thanks, Chris On Mon, Apr 29, 2013 at 11:03 AM, Jun Rao jun...@gmail.com wrote: Chris, Thanks for the writeup. Looks great overall. A couple of comments. 1. At the beginning, it sounds like that one can't run multiple processes of consumers in the same group. This is actually not true. We can create multiple instances of consumers for the same group in the same JVM or different JVMs. The consumers will auto-balance among themselves. 2. We have changed the name of some config properties. auto.commit.interval.ms is correct. However, zk.connect, zk.session.timeout.ms and zk.sync.time.ms are changed to zookeeper.connect, zookeeper.session.timeout.ms, and zookeeper.sync.time.ms, respectively. I will add a link to your wiki in our website. Thanks again. Jun On Mon, Apr 29, 2013 at 5:54 AM, Chris Curtin curtin.ch...@gmail.com wrote: Hi Jun, I finished and published it this morning: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
Re: Kafka wiki Documentation conventions - looking for feedback
The following are sample encoder/decoder in java. class StringEncode implements EncoderString { private String encoding = null; public StringEncoder(VerifiableProperties props) { if(props == null) encoding = UTF8; else encoding = props.getString(serializer.encoding, UTF8); } public byte[] def toBytes(String s) { if(s == null) return null; else return s.getBytes(encoding); } } class StringDecoder implements DecoderString { private String encoding = null; public StringDecoder(VerifiableProperties props) { if(props == null) encoding = UTF8; else encoding = props.getString(serializer.encoding, UTF8); } public String fromBytes(byte bytes[]) { return new String(bytes, encoding); } } Thanks, Jun On Wed, May 1, 2013 at 12:33 PM, Chris Curtin curtin.ch...@gmail.comwrote: Hi Jun I've added #1 and #2. I'll need to think about where to put #3, maybe even adding a 'tips and tricks' section? I've not had to do any encoder/decoders. Can anyone else offer some example code I can incorporate into an example? Thanks, Chris On Wed, May 1, 2013 at 11:45 AM, Jun Rao jun...@gmail.com wrote: Chris, Thanks. This is very helpful. I linked your wiki pages to our website. A few more comments: 1. Producer: The details of the meaning of request.required.acks are described in http://kafka.apache.org/08/configuration.html. It would be great if you can add a link to the description in your wiki. 2. High level consumer: Could you add the proper way of stopping the consumer? One just need to call consumer.shutdown(). After this is called, hasNext() call in the Kafka stream iterator will return false. 3. SimpleConsumer: We have the following api that returns the offset of the last message exposed to the consumer. The difference btw high watermark and the offset of the last consumed message tells you how many messages the consumer is behind the broker. highWatermark(topic: String, partition: Int) Finally, it would be great if you can extend the wiki with customized encoder (Producer) and decoder (Consumer) at some point. Thanks, Jun On Wed, May 1, 2013 at 6:44 AM, Chris Curtin curtin.ch...@gmail.com wrote: I've tested my examples with the new (4/30) release and they work, so I've updated the documentation. Thanks, Chris On Mon, Apr 29, 2013 at 6:18 PM, Jun Rao jun...@gmail.com wrote: Thanks. I also updated your producer example to reflect a recent config change (broker.list = metadata.broker.list). Jun On Mon, Apr 29, 2013 at 11:19 AM, Chris Curtin curtin.ch...@gmail.com wrote: Thanks, I missed that the addition of consumers can cause a re-balance. Thought it was only on Leader changes. I've updated the wording in the example. I'll pull down the beta and test my application then change the names on the properties. Thanks, Chris On Mon, Apr 29, 2013 at 11:58 AM, Jun Rao jun...@gmail.com wrote: Basically, every time a consumer joins a group, every consumer in the groups gets a ZK notification and each of them tries to own a subset of the total number of partitions. A given partition is only assigned to one of the consumers in the same group. Once the ownership is determined, each consumer consumes messages coming from its partitions and manages the offset of those partitions. Since at any given point of time, a partition is only owned by one consumer, there won't be conflicts on updating the offsets. More details are described in the consumer rebalancing algorithm section of http://kafka.apache.org/07/design.html Thanks, Jun On Mon, Apr 29, 2013 at 8:16 AM, Chris Curtin curtin.ch...@gmail.com wrote: Jun, can you explain this a little better? I thought when using Consumer Groups that on startup Kafka connects to ZooKeeper and finds the last read offset for every partition in the topic being requested for the group. That is then the starting point for the consumer threads. If a second process starts while the first one is running with the same Consumer Group, won't the second one read the last offsets consumed by the already running process and start processing from there? Then as the first process syncs consumed offsets, won't the 2nd process's next update overwrite them? Thanks, Chris On Mon, Apr 29, 2013 at 11:03 AM, Jun Rao jun...@gmail.com wrote: Chris, Thanks for the writeup. Looks great overall. A couple of comments. 1. At the
RE: Producer / Consumer - connection management
Hi Neha, For the connection at creation time, I had the issue with the sync producer only, didn't observe this with the async producer, I didn't test it yet, but I guess I would get similar issues. I didn't keep the stacktrace as it happened some time ago, but basically, calling new Producer() resulted in an exception, because the connection to ZK wasn't working, I'm using zk.connect. In the setup I was testing, I have a zk cluster spanning on 2 sites, in the second site, zk is in observer mode only. From time to time, the observer loses its sync with the leader and for a short period of time you can see these zookeeper server not running in the log (during sync with leader). If the producer is created (application started) at that time, it will fail. Again, I assume the same would happen if ZK wasn't running at all. The solution I had so far was to wrap the producer to handle this, and try to create it again when ZK is coming up. As I understand, once created, the producer / consumer would reconnect automatically, it would be nice to extend this behavior and support to be in a disconnected state at startup. Kindly, Nicolas Berthet -Original Message- From: Neha Narkhede [mailto:neha.narkh...@gmail.com] Sent: Tuesday, April 30, 2013 10:53 PM To: users@kafka.apache.org Subject: Re: Producer / Consumer - connection management Basically, who and how is managing network disconnection / outage ? Short answer is that the producer is responsible for retrying connection establishment and it automatically does so when the network connection to the broker fails. There is a connection retry interval that you can use. Once that is reached it gives up trying to reconnect. 1 ) If the sync producer has been created successfully, will it reconnect whenever it loses its connection to the broker or zk ? I could understand it's not happening as we expect the connection to be immediately available when using the sync producer. The ZK client library manages connection loss and reconnects to ZK. But be aware that ZK not being unavailable is never an option. 2 ) How about the async producer ? Does it expect connection at creation time ? will it reconnect in case of failure ? Can you describe what connection issue you see at creation time. Is it just when you use the zk.connect option ? 3 ) Finally, how about the high level consumer ? Afaik, it reconnects automatically It does, so does producer.
[0.8] producing a msg to two brokers...
with topic auto-creation (and no previous topic) and a replication factor of 2 results in 4 partitions. They are numbered uniquely and sequentially, but there are two leaders? Is it so? Should we only write to one broker? Does it have to be the leader or will the producer get flipped by the zk? thanks, rob
slow organic migration to 0.8
So, we have lots of apps producing messages to our kafka 0.7.2 instances (and multiple consumers of the data). We are not going to be able to follow the suggested migration path, where we first migrate all data, then move all producers to use 0.8, etc. Instead, many apps are on their own release cycle, and we need to allow them to upgrade their kafka libraries as part of their regular release schedule. Is there a procedure I'm not seeing, or am I right in thinking I'll need to maintain duplicate kafka clusters (and consumers) for a time. Or can we have a real-time data migration consumer always running continuously against the 0.7.2 kafka store, and have all the data ultimately end up in 0.8. Eventually, the data going to 0.7.2 will dwindle to nothing, but it could take a while. So, I'm thinking I'll just need to maintain dual sets of kafka servers for a while. Since this won't result in any increase in load/disk space, etc., I was thinking of allowing instances to remain multi-tennant with each other (e.g. kafka 0.7.2 and kafka 0.8 on the same box, using separate ports, separate log storage directories (but shared disks)). Is this ok, or a terrible idea? I expect the transition to take several weeks. Thanks, Jason
Re: slow organic migration to 0.8
Jason, During the migration, the only thing to watch out for is that the producers of a particular topic don't upgrade to 0.8 before the consumers do so. You can let applications upgrade when they can to respect the above requirement. If there are fewer applications producing to and consuming from any particular topic, you can group together those and push them at roughly the same time. Thanks, Neha On May 1, 2013 8:44 PM, Jason Rosenberg j...@squareup.com wrote: So, we have lots of apps producing messages to our kafka 0.7.2 instances (and multiple consumers of the data). We are not going to be able to follow the suggested migration path, where we first migrate all data, then move all producers to use 0.8, etc. Instead, many apps are on their own release cycle, and we need to allow them to upgrade their kafka libraries as part of their regular release schedule. Is there a procedure I'm not seeing, or am I right in thinking I'll need to maintain duplicate kafka clusters (and consumers) for a time. Or can we have a real-time data migration consumer always running continuously against the 0.7.2 kafka store, and have all the data ultimately end up in 0.8. Eventually, the data going to 0.7.2 will dwindle to nothing, but it could take a while. So, I'm thinking I'll just need to maintain dual sets of kafka servers for a while. Since this won't result in any increase in load/disk space, etc., I was thinking of allowing instances to remain multi-tennant with each other (e.g. kafka 0.7.2 and kafka 0.8 on the same box, using separate ports, separate log storage directories (but shared disks)). Is this ok, or a terrible idea? I expect the transition to take several weeks. Thanks, Jason
Re: [0.8] producing a msg to two brokers...
There is one leader per partition. For details on how to use the 0.8 api, please see http://kafka.apache.org/08/api.html Thanks, Jun On Wed, May 1, 2013 at 7:11 PM, Rob Withers reefed...@gmail.com wrote: with topic auto-creation (and no previous topic) and a replication factor of 2 results in 4 partitions. They are numbered uniquely and sequentially, but there are two leaders? Is it so? Should we only write to one broker? Does it have to be the leader or will the producer get flipped by the zk? thanks, rob
Re: consuming only half the messages produced
Partition is different from replicas. A topic can have one or more partitions and each partition can have one or more replicas. A consumer consumes data at partition level. In other words, a consumer gets the same data no matter how many replicas are there. When you say the consumer only gets half of the messages, do you mean that it gets half of the messages that are produced? You may want to take a look at the consumer example in http://kafka.apache.org/08/api.html Thanks, Jun On Wed, May 1, 2013 at 7:14 PM, Rob Withers reefed...@gmail.com wrote: Running a consumer group (createStreams()), pointing to the zookeeper and with the topic and 1 consumer thread, results in only half the messages being consumed. The topic was auto-created, with a replication factor of 2, but the producer was configured to produce to 2 brokers and so 4 partitions resulted. Are half getting sent to one leader, in one broker, and the other half getting sent to another leader, in the other broker, but the consumer stream is only reading from one leader from the zk? Shouldn't there only be one leader? thanks, rob
Re: code name for technology changes in 0.8?
We don't have an official code-name. You may want to try sth like kafka-ng (next generation). Thanks, Jun On Wed, May 1, 2013 at 8:36 PM, Jason Rosenberg j...@squareup.com wrote: Hi, I'm wondering if there's a code-name or useful descriptor for the technology changes introduced in 0.8. Since I'll need to deploy a separate set of servers/consumers while we transition our running apps to use the new library for producing, I am looking for a descriptive name to call the servers. I don't expect we'll have to do a migration like this with every release, so calling it something like 'kafkaserver-v08' doesn't quite work. Is there some sort of moniker that will still be applicable if we upgrade to 0.9 and so on? Thanks, Jason
Re: slow organic migration to 0.8
Yeah, well we have many producers and only a few consumers. I don't expect the producers of a given topic to unifornly migrate at the same time, so we'll have duplicate consumer versions. I'll know the migration is complete when the old consumer version stops receiving any new messages... On Wed, May 1, 2013 at 9:53 PM, Neha Narkhede neha.narkh...@gmail.comwrote: Jason, During the migration, the only thing to watch out for is that the producers of a particular topic don't upgrade to 0.8 before the consumers do so. You can let applications upgrade when they can to respect the above requirement. If there are fewer applications producing to and consuming from any particular topic, you can group together those and push them at roughly the same time. Thanks, Neha On May 1, 2013 8:44 PM, Jason Rosenberg j...@squareup.com wrote: So, we have lots of apps producing messages to our kafka 0.7.2 instances (and multiple consumers of the data). We are not going to be able to follow the suggested migration path, where we first migrate all data, then move all producers to use 0.8, etc. Instead, many apps are on their own release cycle, and we need to allow them to upgrade their kafka libraries as part of their regular release schedule. Is there a procedure I'm not seeing, or am I right in thinking I'll need to maintain duplicate kafka clusters (and consumers) for a time. Or can we have a real-time data migration consumer always running continuously against the 0.7.2 kafka store, and have all the data ultimately end up in 0.8. Eventually, the data going to 0.7.2 will dwindle to nothing, but it could take a while. So, I'm thinking I'll just need to maintain dual sets of kafka servers for a while. Since this won't result in any increase in load/disk space, etc., I was thinking of allowing instances to remain multi-tennant with each other (e.g. kafka 0.7.2 and kafka 0.8 on the same box, using separate ports, separate log storage directories (but shared disks)). Is this ok, or a terrible idea? I expect the transition to take several weeks. Thanks, Jason