Re: custom kafka consumer - strangeness
If you look at the example simple consumer: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example You'll see: if (currentOffset readOffset) { System.out.println(Found an old offset: + currentOffset + Expecting: + readOffset); continue; } and a comment in the 'Reading the Data' part: Also note that we are explicitly checking that the offset being read is not less than the offset that we requested. This is needed since if Kafka is compressing the messages, the fetch request will return an entire compressed block even if the requested offset isn't the beginning of the compressed block. Thus a message we saw previously may be returned again. This is probably what is happening to you Chris On Thu, Jan 9, 2014 at 4:00 PM, Gerrit Jansen van Vuuren gerrit...@gmail.com wrote: Hi, I'm writing a custom consumer for kafka 0.8. Everything works except for the following: a. connect, send fetch, read all results b. send fetch c. send fetch d. send fetch e. via the console publisher, publish 2 messages f. send fetch :corr-id 1 g. read 2 messages published :offsets [10 11] :corr-id 1 h. send fetch :corr-id 2 i. read 2 messages published :offsets [10 11] :corr-id 2 j. send fetch ... The problem is I get the messages sent twice as a response to two separate fetch requests. The correlation id is distinct so it cannot be that I read the response twice. The offsets of the 2 messages are are the same so they are duplicates, and its not the producer sending the messages twice. Note: the same connection is kept open the whole time, and I send block,receive then send again, after the first 2 messages are read, the offsets are incremented and the next fetch will ask kafka to give it messages from the new offsets. any ideas of why kafka would be sending the messages again on the second fetch request? Regards, Gerrit
Re: Special Bay Area HUG: Tajo and Samza
Hi Jay, Do they record these meetups? Thanks, Chris On Thu, Oct 17, 2013 at 5:03 PM, Jay Kreps jay.kr...@gmail.com wrote: FYI. -- Forwarded message -- From: Jakob Homan jgho...@gmail.com Date: Thu, Oct 17, 2013 at 11:08 AM Subject: Special Bay Area HUG: Tajo and Samza To: d...@samza.incubator.apache.org Hey everybody- Join us at LinkedIn Nov. 5 for a special HUG dedicated to two new awesome Incubator projects, Tajo, a low-latency SQL query engine atop YARN and Samza. http://www.meetup.com/hadoop/events/146077932/ -Jakob
Re: High Level Consumer error handling and clean exit
Thanks Ian. Is your consumer multi-threaded? If so can you share how you coordinated each of the threads so you knew it was 'okay' to commit across all the threads? I'm stuck on how to do this without really complicating the consumer. Thanks, Chris On Tue, Jul 9, 2013 at 5:51 PM, Ian Friedman i...@flurry.com wrote: Hey Chris, The way I handled this in my application using the High Level Consumer was to turn off auto-commit and commit manually after finishing a batch of messages (obviously you could do it after every message, but for my purposes it was better to have batches) -- Ian Friedman
High Level Consumer error handling and clean exit
Hi, I'm working through a production-level High Level Consumer app and have a couple of error/shutdown questions to understand how the offset storage is handled. Test case - simulate an error writing to destination application, for example a database, offset is 'lost' Scenario - write 500 messages for each topic/partition - use the example High Level Consumer code I wrote for the Wiki - Change the code so that every 10th read from the 'hasNext()' ConsumerIterator breaks out of the loop and returns from the thread, simulating a hard error. I write the offset to System.out to see what was provided - startup again and look to see what offset was first emitted for a partition Issue: Kafka treats the offset for the message read that caused me to break out of the loop as processed (as expected), but I really failed. How do I tell Kafka that I didn't really consume that offset? Here is the example code in the 'business logic': public void run() { ConsumerIteratorbyte[], byte[] it = m_stream.iterator(); int counter = 0; while (it.hasNext()) { MessageAndMetadatabyte[], byte[] msg = it.next(); if (counter == 10) { System.out.println(Stopping Thread + m_threadNumber + : Partition: + msg.partition() + : Offset: + msg.offset() + : + new String(msg.message())); break; } System.out.println(Thread + m_threadNumber + : Partition: + msg.partition() + : Offset: + msg.offset() + : + new String(msg.message())); counter++; } System.out.println(Shutting down Thread: + m_threadNumber); } I understand that handling 'hard' errors like JVM crashes, kill -9 etc. may leave the offsets in ZooKeeper incorrect, but I'm trying to understand what happens in a clean shutdown where Kafka and the Consumer are behaving correctly but I can't process what I read. This also feels like I'm blurring SimpleConsumer theory into this, but except for the exception/shutdown case High Level Consumer does everything I want. Thanks, Chris
Re: High Level Consumer error handling and clean exit
Hi Philip, Correct, I don't want to explicitly control the offset committing. The ConsumerConnector handles that well enough except for when I want to shutdown and NOT have Kafka think I consumed that last message for a stream. This isn't the crash case, it is a case where the logic consuming the message detects and error and wants to cleanly exit until that issue can be resolved, but not lose the message it was trying to process when the problem is resolved. My understanding is that the commitOffsets() call is across all threads, not just for the stream my thread is reading from. So knowing it is okay to call this requires coordination across all my threads, which makes a High Level Consumer a lot harder to write correctly. Thinking about what I'd like to happen is: my code hands the message back to the KafkaStream (or whatever level knows about the consumed offsets) and says - set the next start offset for this topic/partition to this message in ZooKeeper - cleanly shutdown the stream from the broker(s) - don't force a rebalance on the consumer since something is wrong with processing of the data in the message, not the message. - If I try to use the stream again I should get an exception - I don't think I would want this to cause a complete shutdown of the ConsumerConnector, in case other threads are still processing. If all threads have the same issue they will all fail soon enough and do the same logic. But if only one thread fails, our Operations teams will need to resolve the issue then do a clean restart to recover. I think this logic would only happen when the down stream system was having issues since the iterator would be drained correctly when the 'shutdown' call to ConsumerConnector is made. Thanks, Chris On Tue, Jul 9, 2013 at 11:21 AM, Philip O'Toole phi...@loggly.com wrote: It seems like you're not explicitly controlling the offsets. Is that correct? If so, the moment you pull a message from the stream, the client framework considers it processed. So if your app subsequently crashes before the message is fully processed, and auto-commit updates the offsets in Zookeeper, you will drop that message. The solution to this to call commitOffsets() explicitly. Philip On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin curtin.ch...@gmail.com wrote: Hi, I'm working through a production-level High Level Consumer app and have a couple of error/shutdown questions to understand how the offset storage is handled. Test case - simulate an error writing to destination application, for example a database, offset is 'lost' Scenario - write 500 messages for each topic/partition - use the example High Level Consumer code I wrote for the Wiki - Change the code so that every 10th read from the 'hasNext()' ConsumerIterator breaks out of the loop and returns from the thread, simulating a hard error. I write the offset to System.out to see what was provided - startup again and look to see what offset was first emitted for a partition Issue: Kafka treats the offset for the message read that caused me to break out of the loop as processed (as expected), but I really failed. How do I tell Kafka that I didn't really consume that offset? Here is the example code in the 'business logic': public void run() { ConsumerIteratorbyte[], byte[] it = m_stream.iterator(); int counter = 0; while (it.hasNext()) { MessageAndMetadatabyte[], byte[] msg = it.next(); if (counter == 10) { System.out.println(Stopping Thread + m_threadNumber + : Partition: + msg.partition() + : Offset: + msg.offset() + : + new String(msg.message())); break; } System.out.println(Thread + m_threadNumber + : Partition: + msg.partition() + : Offset: + msg.offset() + : + new String(msg.message())); counter++; } System.out.println(Shutting down Thread: + m_threadNumber); } I understand that handling 'hard' errors like JVM crashes, kill -9 etc. may leave the offsets in ZooKeeper incorrect, but I'm trying to understand what happens in a clean shutdown where Kafka and the Consumer are behaving correctly but I can't process what I read. This also feels like I'm blurring SimpleConsumer theory into this, but except for the exception/shutdown case High Level Consumer does everything I want. Thanks, Chris
Re: High Level Consumer error handling and clean exit
Thanks. I know I can write a SimpleConsumer to do this, but it feels like the High Level consumer is _so_ close to being robust enough tohandle what I'd think people want to do in most applications. I'm going to submit an enhancement request. I'm trying to understand the level of data loss in this situation, so I looked deeper into the KafkaStream logic: it looks like a KafkaStream includes a BlockingQueue for transferring the messages to my code from Kafka. If I call shutdown() when I detect the problem, are the messages already in the BlockingQueue considered 'read' by Kafka, or does the shutdown peek into the Queue to see what is still there before updating ZooKeeper? My concern is if that queue is not empty I'll be losing more than the one message that led to the failure. I'm also curious how others are handling this situation. Do you assume the message that is causing problems is lost or somehow know to go get it later? I'd think others would have this problem too. Thanks, Chris On Tue, Jul 9, 2013 at 3:23 PM, Philip O'Toole phi...@loggly.com wrote: OK. It sounds like you're requesting functionality that the high-level consumer simply doesn't have. As I am sure you know, there is no API call that supports handing back a message. I might be missing something, but if you need this kind of control, I think you need to code your application differently. You could try creating a ConsumerConnection per partition (your clients will then need to know the number of partitions out there). That way commitOffsets() will actually only apply to that partition. Auto-commit the same way. It might give you the level of control you need. Philip On Tue, Jul 9, 2013 at 2:22 PM, Chris Curtin curtin.ch...@gmail.com wrote: Hi Philip, Correct, I don't want to explicitly control the offset committing. The ConsumerConnector handles that well enough except for when I want to shutdown and NOT have Kafka think I consumed that last message for a stream. This isn't the crash case, it is a case where the logic consuming the message detects and error and wants to cleanly exit until that issue can be resolved, but not lose the message it was trying to process when the problem is resolved. My understanding is that the commitOffsets() call is across all threads, not just for the stream my thread is reading from. So knowing it is okay to call this requires coordination across all my threads, which makes a High Level Consumer a lot harder to write correctly. Thinking about what I'd like to happen is: my code hands the message back to the KafkaStream (or whatever level knows about the consumed offsets) and says - set the next start offset for this topic/partition to this message in ZooKeeper - cleanly shutdown the stream from the broker(s) - don't force a rebalance on the consumer since something is wrong with processing of the data in the message, not the message. - If I try to use the stream again I should get an exception - I don't think I would want this to cause a complete shutdown of the ConsumerConnector, in case other threads are still processing. If all threads have the same issue they will all fail soon enough and do the same logic. But if only one thread fails, our Operations teams will need to resolve the issue then do a clean restart to recover. I think this logic would only happen when the down stream system was having issues since the iterator would be drained correctly when the 'shutdown' call to ConsumerConnector is made. Thanks, Chris On Tue, Jul 9, 2013 at 11:21 AM, Philip O'Toole phi...@loggly.com wrote: It seems like you're not explicitly controlling the offsets. Is that correct? If so, the moment you pull a message from the stream, the client framework considers it processed. So if your app subsequently crashes before the message is fully processed, and auto-commit updates the offsets in Zookeeper, you will drop that message. The solution to this to call commitOffsets() explicitly. Philip On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin curtin.ch...@gmail.com wrote: Hi, I'm working through a production-level High Level Consumer app and have a couple of error/shutdown questions to understand how the offset storage is handled. Test case - simulate an error writing to destination application, for example a database, offset is 'lost' Scenario - write 500 messages for each topic/partition - use the example High Level Consumer code I wrote for the Wiki - Change the code so that every 10th read from the 'hasNext()' ConsumerIterator breaks out of the loop and returns from the thread, simulating a hard error. I write the offset to System.out to see what was provided - startup again and look to see what offset was first emitted for a partition Issue: Kafka treats the offset
Re: High Level Consumer error handling and clean exit
Enhancement submitted: https://issues.apache.org/jira/browse/KAFKA-966 On Tue, Jul 9, 2013 at 3:53 PM, Chris Curtin curtin.ch...@gmail.com wrote: Thanks. I know I can write a SimpleConsumer to do this, but it feels like the High Level consumer is _so_ close to being robust enough tohandle what I'd think people want to do in most applications. I'm going to submit an enhancement request. I'm trying to understand the level of data loss in this situation, so I looked deeper into the KafkaStream logic: it looks like a KafkaStream includes a BlockingQueue for transferring the messages to my code from Kafka. If I call shutdown() when I detect the problem, are the messages already in the BlockingQueue considered 'read' by Kafka, or does the shutdown peek into the Queue to see what is still there before updating ZooKeeper? My concern is if that queue is not empty I'll be losing more than the one message that led to the failure. I'm also curious how others are handling this situation. Do you assume the message that is causing problems is lost or somehow know to go get it later? I'd think others would have this problem too. Thanks, Chris On Tue, Jul 9, 2013 at 3:23 PM, Philip O'Toole phi...@loggly.com wrote: OK. It sounds like you're requesting functionality that the high-level consumer simply doesn't have. As I am sure you know, there is no API call that supports handing back a message. I might be missing something, but if you need this kind of control, I think you need to code your application differently. You could try creating a ConsumerConnection per partition (your clients will then need to know the number of partitions out there). That way commitOffsets() will actually only apply to that partition. Auto-commit the same way. It might give you the level of control you need. Philip On Tue, Jul 9, 2013 at 2:22 PM, Chris Curtin curtin.ch...@gmail.com wrote: Hi Philip, Correct, I don't want to explicitly control the offset committing. The ConsumerConnector handles that well enough except for when I want to shutdown and NOT have Kafka think I consumed that last message for a stream. This isn't the crash case, it is a case where the logic consuming the message detects and error and wants to cleanly exit until that issue can be resolved, but not lose the message it was trying to process when the problem is resolved. My understanding is that the commitOffsets() call is across all threads, not just for the stream my thread is reading from. So knowing it is okay to call this requires coordination across all my threads, which makes a High Level Consumer a lot harder to write correctly. Thinking about what I'd like to happen is: my code hands the message back to the KafkaStream (or whatever level knows about the consumed offsets) and says - set the next start offset for this topic/partition to this message in ZooKeeper - cleanly shutdown the stream from the broker(s) - don't force a rebalance on the consumer since something is wrong with processing of the data in the message, not the message. - If I try to use the stream again I should get an exception - I don't think I would want this to cause a complete shutdown of the ConsumerConnector, in case other threads are still processing. If all threads have the same issue they will all fail soon enough and do the same logic. But if only one thread fails, our Operations teams will need to resolve the issue then do a clean restart to recover. I think this logic would only happen when the down stream system was having issues since the iterator would be drained correctly when the 'shutdown' call to ConsumerConnector is made. Thanks, Chris On Tue, Jul 9, 2013 at 11:21 AM, Philip O'Toole phi...@loggly.com wrote: It seems like you're not explicitly controlling the offsets. Is that correct? If so, the moment you pull a message from the stream, the client framework considers it processed. So if your app subsequently crashes before the message is fully processed, and auto-commit updates the offsets in Zookeeper, you will drop that message. The solution to this to call commitOffsets() explicitly. Philip On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin curtin.ch...@gmail.com wrote: Hi, I'm working through a production-level High Level Consumer app and have a couple of error/shutdown questions to understand how the offset storage is handled. Test case - simulate an error writing to destination application, for example a database, offset is 'lost' Scenario - write 500 messages for each topic/partition - use the example High Level Consumer code I wrote for the Wiki - Change the code so that every 10th read from the 'hasNext()' ConsumerIterator breaks out of the loop and returns from the thread, simulating a hard error. I write the offset
Re: one consumerConnector or many?
I'd look at a variation of #2. Can your messages by grouped into a 'class (for lack of a better term)' that are consumed together? For example a 'class' of 'auditing events' or 'sensor events'. The idea would to then have a topic for 'class'. A couple of benefits to this: - you can define your consumption of a 'class's resources by value. So the 'audit' topic may only get a 2 threaded consumer while the 'sensor' class gets a 10 threaded consumer. - you can stop processing a 'class' of messages if you need to without taking all the consumers off line (Assuming you have different processors or a way while running to alter your number of threads per topic.) Since it sounds like you may be frequently adding new message types this approach also allows you to decide if you want to shutdown only a part of your processing to add the new code to handle the message. Finally, why the concern about socket use? A well configured Windows or Linux machine can have thousands of open sockets without problems. Since 0.8.0 only connects to the Broker with the topic/partition you end up with 1 socket per topic/partition and consumer. Hope this helps, Chris On Wed, May 29, 2013 at 9:13 AM, Rob Withers reefed...@gmail.com wrote: In thinking about the design of consumption, we have in mind a generic consumer server which would consume from more than one message type. The handling of each type of message would be different. I suppose we could have upwards of say 50 different message types, eventually, maybe 100+ different types. Which of the following designs would be best and why would the other options be bad? 1) Have all message types go through one topic and use a dispatcher pattern to select the correct handler. Use one consumerConnector. 2) Use a different topic for each message type, but still use one consumerConnector and a dispatcher pattern. 3) Use a different topic for each message type and have a separate consumerConnector for each topic. I am struggling with whether my assumptions are correct. It seems that a single connector for a topic would establish one socket to each broker, as rebalancing assigns various partitions to that thread. Option 2 would pull messages from more than one topic through a single socket to a particular broker, is it so? Would option 3 be reasonable, establishing upwards of 100 sockets per broker? I am guestimating that option 2 is the right way forward, to bound socket use, and we'll need to figure out a way to parameterize stream consumption with the right handlers for a particular msg type. If we add a topic, do you think we should create a new connector or restart the original connector with the new topic in the map? Thanks, rob
Re: one consumerConnector or many?
That's a good question about # of sockets when a single consumer is connecting. I'll let someone from LinkedIn comment if each consumer has a socket per topic/partition or if it is per Broker, since I'm not familiar with that part of the code. On Wed, May 29, 2013 at 9:53 AM, Withers, Robert robert.with...@dish.comwrote: Thanks for the info. Are you saying that even with a single connector, with say 3 topics and 3 threads per topic and 3 brokers with 3 partitions for all 3 topics on all 3 brokers, that a consumer box would have 9 sockets open? What if there are 6 partitions per topic, would that be 18 open sockets? I have read somewhere that a high partition number, per topic, is desirable, to scale out the consumers and to be prepared to dynamically scale out consumption during a traffic spike. Is it so? 100 topics, with 16 brokers and 200 partitions per topic with 1 consumer connector (just hypothetically so) would be 1600 sockets or 2 sockets? For sure these boxes have plenty of ports. I am just thinking through possible failures and what flexibility we have in configuration of producers/consumers to topics. Really the question is best practices in this area. A producer server handling 100+ msg types could also connect quite a bit. So, perhaps it is best to restrict producer and consumer servers to process a restricted class of types. Certainly if the producer is also hosting a web server, but perhaps not as dire on the consumer side. thanks, rob From: Chris Curtin [curtin.ch...@gmail.com] Sent: Wednesday, May 29, 2013 7:36 AM To: users Subject: Re: one consumerConnector or many? I'd look at a variation of #2. Can your messages by grouped into a 'class (for lack of a better term)' that are consumed together? For example a 'class' of 'auditing events' or 'sensor events'. The idea would to then have a topic for 'class'. A couple of benefits to this: - you can define your consumption of a 'class's resources by value. So the 'audit' topic may only get a 2 threaded consumer while the 'sensor' class gets a 10 threaded consumer. - you can stop processing a 'class' of messages if you need to without taking all the consumers off line (Assuming you have different processors or a way while running to alter your number of threads per topic.) Since it sounds like you may be frequently adding new message types this approach also allows you to decide if you want to shutdown only a part of your processing to add the new code to handle the message. Finally, why the concern about socket use? A well configured Windows or Linux machine can have thousands of open sockets without problems. Since 0.8.0 only connects to the Broker with the topic/partition you end up with 1 socket per topic/partition and consumer. Hope this helps, Chris On Wed, May 29, 2013 at 9:13 AM, Rob Withers reefed...@gmail.com wrote: In thinking about the design of consumption, we have in mind a generic consumer server which would consume from more than one message type. The handling of each type of message would be different. I suppose we could have upwards of say 50 different message types, eventually, maybe 100+ different types. Which of the following designs would be best and why would the other options be bad? 1) Have all message types go through one topic and use a dispatcher pattern to select the correct handler. Use one consumerConnector. 2) Use a different topic for each message type, but still use one consumerConnector and a dispatcher pattern. 3) Use a different topic for each message type and have a separate consumerConnector for each topic. I am struggling with whether my assumptions are correct. It seems that a single connector for a topic would establish one socket to each broker, as rebalancing assigns various partitions to that thread. Option 2 would pull messages from more than one topic through a single socket to a particular broker, is it so? Would option 3 be reasonable, establishing upwards of 100 sockets per broker? I am guestimating that option 2 is the right way forward, to bound socket use, and we'll need to figure out a way to parameterize stream consumption with the right handlers for a particular msg type. If we add a topic, do you think we should create a new connector or restart the original connector with the new topic in the map? Thanks, rob
Re: Partitioning and scale
Hi Tim, On Wed, May 22, 2013 at 3:25 PM, Timothy Chen tnac...@gmail.com wrote: Hi, I'm currently trying to understand how Kafka (0.8) can scale with our usage pattern and how to setup the partitioning. We want to route the same messages belonging to the same id to the same queue, so its consumer will able to consume all the messages of that id. My questions: - From my understanding, in Kafka we would need to have a custom partitioner that routes the same messages to the same partition right? I'm trying to find examples of writing this partitioner logic, but I can't find any. Can someone point me to an example? https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example The partitioner here does a simple mod on the IP address and the # of partitions. You'd need to define your own logic, but this is a start. - I see that Kafka server.properties allows one to specify the number of partitions it supports. However, when we want to scale I wonder if we add # of partitions or # of brokers, will the same partitioner start distributing the messages to different partitions? And if it does, how can that same consumer continue to read off the messages of those ids if it was interrupted in the middle? I'll let someone else answer this. - I'd like to create a consumer per partition, and for each one to subscribe to the changes of that one. How can this be done in kafka? Two ways: Simple Consumer or Consumer Groups: Depends on the level of control you want on code processing a specific partition vs. getting one assigned to it (and level of control over offset management). https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Examplehttps://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example Thanks, Tim
Re: Is there a limitation on the number of simultaneous consumer connections to the same topic
Yes. However be aware that starting and stopping processes will cause a rebalance of the consumers, so your code may find itself receiving events from a different partition suddenly (so don't assume the partition you are reading isn't going to change!) Also as things are starting up you may find a process receives many partitions at first, but as the other processes are started the partitions get reassigned. Finally, running more processes than partitions will mean those other processes are idle. On Mon, May 13, 2013 at 8:49 AM, Ming Li lm0...@gmail.com wrote: Hi Andrea, Thanks for your reply~~ you mean, it is no difference between having N threads share the same ConsumerConnector created by Consumer.createJavaConsumerConnector, and having N consumer process which has its own ConsumerConnector in every one of them? Best Regards, Li Ming On Mon, May 13, 2013 at 6:43 PM, Andrea Gazzarini andrea.gazzar...@gmail.com wrote: It shouldn't. Creating several listener / consumer processes belonging to the same group means you are working with a point-to-point message channel so incoming messages will be delivered only to one consumer. Maybe I'm wrong but I believe in that scenario there's no difference (from broker perspective) between threads and processes. Regards, Andrea On 05/13/2013 12:15 PM, Ming Li wrote: Hi, Does Kafka have a limitation on the simultaneous connections (created with Consumer.**createJavaConsumerConnector) for the same topic within the same group? My scenario is I need to consume a topic from different process (not thread), so I need to create lots of high level consumers. Best Regards, Li Ming
Re: a few questions from high level consumer documentation.
I'll try to answer some, the Kafka team will need to answer the others: On Wed, May 8, 2013 at 12:17 PM, Yu, Libo libo...@citi.com wrote: Hi, I read this link https://cwiki.apache.org/KAFKA/consumer-group-example.html and have a few questions (if not too many). 1 When you say the iterator may block, do you mean hasNext() may block? Yes. 2 Remember, you can only use a single process per Consumer Group. Do you mean we can only use a single process on one node of the cluster for a consumer group? Or there can be only one process on the whole cluster for a consumer group? Please clarify on this. Bug. I'll change it. When I wrote this I mis-understood the re-balancing step. I missed this reference but fixed the others. Sorry 3 Why save offset to zookeeper? Is it easier to save it to a local file? 4 When client exits/crashes or leader for a partition is changed, duplicate messages may be replayed. To help avoid this (replayed duplicate messages), make sure you provide a clean way for your client to exit instead of assuming it can be 'kill -9'd. a. For client exit, if the client is receiving data at the time, how to do a clean exit? How can client tell consumer to write offset to zookeepr before exiting? If you call the shutdown() method on the Consumer it will cleanly stop, releasing any blocked iterators. In the example it goes to sleep for a few seconds then cleanly shuts down. b. For client crash, what can client do to avoid duplicate messages when restarted? What I can think of is to read last message from log file and ignore the first few received duplicate messages until receiving the last read message. But is it possible for client to read log file directly? If you can't tolerate the possibility of duplicates you need to look at the Simple Consumer example, There you control the offset storage. c. For the change of the partition leader, is there anything that clients can do to avoid duplicates? Thanks. Libo
Re: Kafka wiki Documentation conventions - looking for feedback
I've tested my examples with the new (4/30) release and they work, so I've updated the documentation. Thanks, Chris On Mon, Apr 29, 2013 at 6:18 PM, Jun Rao jun...@gmail.com wrote: Thanks. I also updated your producer example to reflect a recent config change (broker.list = metadata.broker.list). Jun On Mon, Apr 29, 2013 at 11:19 AM, Chris Curtin curtin.ch...@gmail.com wrote: Thanks, I missed that the addition of consumers can cause a re-balance. Thought it was only on Leader changes. I've updated the wording in the example. I'll pull down the beta and test my application then change the names on the properties. Thanks, Chris On Mon, Apr 29, 2013 at 11:58 AM, Jun Rao jun...@gmail.com wrote: Basically, every time a consumer joins a group, every consumer in the groups gets a ZK notification and each of them tries to own a subset of the total number of partitions. A given partition is only assigned to one of the consumers in the same group. Once the ownership is determined, each consumer consumes messages coming from its partitions and manages the offset of those partitions. Since at any given point of time, a partition is only owned by one consumer, there won't be conflicts on updating the offsets. More details are described in the consumer rebalancing algorithm section of http://kafka.apache.org/07/design.html Thanks, Jun On Mon, Apr 29, 2013 at 8:16 AM, Chris Curtin curtin.ch...@gmail.com wrote: Jun, can you explain this a little better? I thought when using Consumer Groups that on startup Kafka connects to ZooKeeper and finds the last read offset for every partition in the topic being requested for the group. That is then the starting point for the consumer threads. If a second process starts while the first one is running with the same Consumer Group, won't the second one read the last offsets consumed by the already running process and start processing from there? Then as the first process syncs consumed offsets, won't the 2nd process's next update overwrite them? Thanks, Chris On Mon, Apr 29, 2013 at 11:03 AM, Jun Rao jun...@gmail.com wrote: Chris, Thanks for the writeup. Looks great overall. A couple of comments. 1. At the beginning, it sounds like that one can't run multiple processes of consumers in the same group. This is actually not true. We can create multiple instances of consumers for the same group in the same JVM or different JVMs. The consumers will auto-balance among themselves. 2. We have changed the name of some config properties. auto.commit.interval.ms is correct. However, zk.connect, zk.session.timeout.ms and zk.sync.time.ms are changed to zookeeper.connect, zookeeper.session.timeout.ms, and zookeeper.sync.time.ms, respectively. I will add a link to your wiki in our website. Thanks again. Jun On Mon, Apr 29, 2013 at 5:54 AM, Chris Curtin curtin.ch...@gmail.com wrote: Hi Jun, I finished and published it this morning: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example One question: when documenting the ConsumerConfig parameters I couldn't find a description for the 'auto.commit.interval.ms' setting. I found one for 'autocommit.interval.ms' (no '.' between auto and commit) in the Google Cache only. Which spelling is it? Also is my description of it correct? I'll take a look at custom encoders later this week. Today and Tuesday are going to be pretty busy. Please let me know if there are changes needed to the High Level Consumer page. Thanks, Chris On Mon, Apr 29, 2013 at 12:50 AM, Jun Rao jun...@gmail.com wrote: Chris, Any update of the high level consumer example? Also, in the Producer example, it would be useful to describe how to write a customized encoder. One subtle thing is that the encoder needs a constructor that takes a a single VerifiableProperties argument ( https://issues.apache.org/jira/browse/KAFKA-869). Thanks, Jun
Re: Kafka wiki Documentation conventions - looking for feedback
Hi Jun I've added #1 and #2. I'll need to think about where to put #3, maybe even adding a 'tips and tricks' section? I've not had to do any encoder/decoders. Can anyone else offer some example code I can incorporate into an example? Thanks, Chris On Wed, May 1, 2013 at 11:45 AM, Jun Rao jun...@gmail.com wrote: Chris, Thanks. This is very helpful. I linked your wiki pages to our website. A few more comments: 1. Producer: The details of the meaning of request.required.acks are described in http://kafka.apache.org/08/configuration.html. It would be great if you can add a link to the description in your wiki. 2. High level consumer: Could you add the proper way of stopping the consumer? One just need to call consumer.shutdown(). After this is called, hasNext() call in the Kafka stream iterator will return false. 3. SimpleConsumer: We have the following api that returns the offset of the last message exposed to the consumer. The difference btw high watermark and the offset of the last consumed message tells you how many messages the consumer is behind the broker. highWatermark(topic: String, partition: Int) Finally, it would be great if you can extend the wiki with customized encoder (Producer) and decoder (Consumer) at some point. Thanks, Jun On Wed, May 1, 2013 at 6:44 AM, Chris Curtin curtin.ch...@gmail.com wrote: I've tested my examples with the new (4/30) release and they work, so I've updated the documentation. Thanks, Chris On Mon, Apr 29, 2013 at 6:18 PM, Jun Rao jun...@gmail.com wrote: Thanks. I also updated your producer example to reflect a recent config change (broker.list = metadata.broker.list). Jun On Mon, Apr 29, 2013 at 11:19 AM, Chris Curtin curtin.ch...@gmail.com wrote: Thanks, I missed that the addition of consumers can cause a re-balance. Thought it was only on Leader changes. I've updated the wording in the example. I'll pull down the beta and test my application then change the names on the properties. Thanks, Chris On Mon, Apr 29, 2013 at 11:58 AM, Jun Rao jun...@gmail.com wrote: Basically, every time a consumer joins a group, every consumer in the groups gets a ZK notification and each of them tries to own a subset of the total number of partitions. A given partition is only assigned to one of the consumers in the same group. Once the ownership is determined, each consumer consumes messages coming from its partitions and manages the offset of those partitions. Since at any given point of time, a partition is only owned by one consumer, there won't be conflicts on updating the offsets. More details are described in the consumer rebalancing algorithm section of http://kafka.apache.org/07/design.html Thanks, Jun On Mon, Apr 29, 2013 at 8:16 AM, Chris Curtin curtin.ch...@gmail.com wrote: Jun, can you explain this a little better? I thought when using Consumer Groups that on startup Kafka connects to ZooKeeper and finds the last read offset for every partition in the topic being requested for the group. That is then the starting point for the consumer threads. If a second process starts while the first one is running with the same Consumer Group, won't the second one read the last offsets consumed by the already running process and start processing from there? Then as the first process syncs consumed offsets, won't the 2nd process's next update overwrite them? Thanks, Chris On Mon, Apr 29, 2013 at 11:03 AM, Jun Rao jun...@gmail.com wrote: Chris, Thanks for the writeup. Looks great overall. A couple of comments. 1. At the beginning, it sounds like that one can't run multiple processes of consumers in the same group. This is actually not true. We can create multiple instances of consumers for the same group in the same JVM or different JVMs. The consumers will auto-balance among themselves. 2. We have changed the name of some config properties. auto.commit.interval.ms is correct. However, zk.connect, zk.session.timeout.ms and zk.sync.time.ms are changed to zookeeper.connect, zookeeper.session.timeout.ms, and zookeeper.sync.time.ms, respectively. I will add a link to your wiki in our website. Thanks again. Jun On Mon, Apr 29, 2013 at 5:54 AM, Chris Curtin curtin.ch...@gmail.com wrote: Hi Jun, I finished and published it this morning: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
Yammer Metrics not included in 0.8.0?
Hi, I pulled 0.8.0 head at 3 pm eastern 4/30, did the sbt update; sbt package; sbt assembly-package-dependency and my code won't compile. It is missing the Yammer Metrics libraries. Last pull was about 45 days ago and they were stored in core\lib\metrics-* Do I now need to pull them myself? Also looks like zkclient is also not being included. Thanks, Chris
Re: kafka 0.8 beta release status
Just added the High Level Consumer example. On Mon, Apr 29, 2013 at 1:52 AM, Jun Rao jun...@gmail.com wrote: We have updated the 0.8 documentation in our website ( http://kafka.apache.org/index.html). Please review the docs. We have the following blockers for the 0.8 beta release: additional docs: * examples of using the 0.8 high level consumer api * description of additional 0.8 tools KAFKA-885 (sbt package builds two kafka jars) It would be great if people can help on the blockers. Thanks, Jun
Re: Kafka wiki Documentation conventions - looking for feedback
Jun, can you explain this a little better? I thought when using Consumer Groups that on startup Kafka connects to ZooKeeper and finds the last read offset for every partition in the topic being requested for the group. That is then the starting point for the consumer threads. If a second process starts while the first one is running with the same Consumer Group, won't the second one read the last offsets consumed by the already running process and start processing from there? Then as the first process syncs consumed offsets, won't the 2nd process's next update overwrite them? Thanks, Chris On Mon, Apr 29, 2013 at 11:03 AM, Jun Rao jun...@gmail.com wrote: Chris, Thanks for the writeup. Looks great overall. A couple of comments. 1. At the beginning, it sounds like that one can't run multiple processes of consumers in the same group. This is actually not true. We can create multiple instances of consumers for the same group in the same JVM or different JVMs. The consumers will auto-balance among themselves. 2. We have changed the name of some config properties. auto.commit.interval.ms is correct. However, zk.connect, zk.session.timeout.ms and zk.sync.time.ms are changed to zookeeper.connect, zookeeper.session.timeout.ms, and zookeeper.sync.time.ms, respectively. I will add a link to your wiki in our website. Thanks again. Jun On Mon, Apr 29, 2013 at 5:54 AM, Chris Curtin curtin.ch...@gmail.com wrote: Hi Jun, I finished and published it this morning: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example One question: when documenting the ConsumerConfig parameters I couldn't find a description for the 'auto.commit.interval.ms' setting. I found one for 'autocommit.interval.ms' (no '.' between auto and commit) in the Google Cache only. Which spelling is it? Also is my description of it correct? I'll take a look at custom encoders later this week. Today and Tuesday are going to be pretty busy. Please let me know if there are changes needed to the High Level Consumer page. Thanks, Chris On Mon, Apr 29, 2013 at 12:50 AM, Jun Rao jun...@gmail.com wrote: Chris, Any update of the high level consumer example? Also, in the Producer example, it would be useful to describe how to write a customized encoder. One subtle thing is that the encoder needs a constructor that takes a a single VerifiableProperties argument ( https://issues.apache.org/jira/browse/KAFKA-869). Thanks, Jun
Re: Kafka wiki Documentation conventions - looking for feedback
Thanks, I missed that the addition of consumers can cause a re-balance. Thought it was only on Leader changes. I've updated the wording in the example. I'll pull down the beta and test my application then change the names on the properties. Thanks, Chris On Mon, Apr 29, 2013 at 11:58 AM, Jun Rao jun...@gmail.com wrote: Basically, every time a consumer joins a group, every consumer in the groups gets a ZK notification and each of them tries to own a subset of the total number of partitions. A given partition is only assigned to one of the consumers in the same group. Once the ownership is determined, each consumer consumes messages coming from its partitions and manages the offset of those partitions. Since at any given point of time, a partition is only owned by one consumer, there won't be conflicts on updating the offsets. More details are described in the consumer rebalancing algorithm section of http://kafka.apache.org/07/design.html Thanks, Jun On Mon, Apr 29, 2013 at 8:16 AM, Chris Curtin curtin.ch...@gmail.com wrote: Jun, can you explain this a little better? I thought when using Consumer Groups that on startup Kafka connects to ZooKeeper and finds the last read offset for every partition in the topic being requested for the group. That is then the starting point for the consumer threads. If a second process starts while the first one is running with the same Consumer Group, won't the second one read the last offsets consumed by the already running process and start processing from there? Then as the first process syncs consumed offsets, won't the 2nd process's next update overwrite them? Thanks, Chris On Mon, Apr 29, 2013 at 11:03 AM, Jun Rao jun...@gmail.com wrote: Chris, Thanks for the writeup. Looks great overall. A couple of comments. 1. At the beginning, it sounds like that one can't run multiple processes of consumers in the same group. This is actually not true. We can create multiple instances of consumers for the same group in the same JVM or different JVMs. The consumers will auto-balance among themselves. 2. We have changed the name of some config properties. auto.commit.interval.ms is correct. However, zk.connect, zk.session.timeout.ms and zk.sync.time.ms are changed to zookeeper.connect, zookeeper.session.timeout.ms, and zookeeper.sync.time.ms, respectively. I will add a link to your wiki in our website. Thanks again. Jun On Mon, Apr 29, 2013 at 5:54 AM, Chris Curtin curtin.ch...@gmail.com wrote: Hi Jun, I finished and published it this morning: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example One question: when documenting the ConsumerConfig parameters I couldn't find a description for the 'auto.commit.interval.ms' setting. I found one for 'autocommit.interval.ms' (no '.' between auto and commit) in the Google Cache only. Which spelling is it? Also is my description of it correct? I'll take a look at custom encoders later this week. Today and Tuesday are going to be pretty busy. Please let me know if there are changes needed to the High Level Consumer page. Thanks, Chris On Mon, Apr 29, 2013 at 12:50 AM, Jun Rao jun...@gmail.com wrote: Chris, Any update of the high level consumer example? Also, in the Producer example, it would be useful to describe how to write a customized encoder. One subtle thing is that the encoder needs a constructor that takes a a single VerifiableProperties argument ( https://issues.apache.org/jira/browse/KAFKA-869). Thanks, Jun
Re: one producer and 2 consumers
In a nutshell: High Level uses Consumer Groups to handle the tracking of message offset consumption. SimpleConsumer leaves it all up to you. The 0.7.x quick start shows examples of both: http://kafka.apache.org/quickstart.html On Fri, Apr 26, 2013 at 12:32 PM, Oleg Ruchovets oruchov...@gmail.comwrote: By the way. What does high level consumer means? Is there other type of consumers? Thanks Oleg.
Re: LeaderNotAvailable Exception
Did you create the topic without a # of partitions then try to delete/recreate it? I've had that happen to me before. Try shutting down everything (including zookeeper) and restarting. On Tue, Apr 23, 2013 at 9:08 PM, Jun Rao jun...@gmail.com wrote: Does this happen on every message that you type in producer console? Thanks, Jun On Tue, Apr 23, 2013 at 4:15 PM, Yin Yin yin@outlook.com wrote: I tried to run the kafka 0.8 version as instructed in Quick Start. The kafka server shows the following message when I launch the producer. ERROR Error while fetching metadata for partition [test,0] (kafka.admin.AdminUtils$) kafka.common.LeaderNotAvailableException: No leader exists for partition 0 at kafka.admin.AdminUtils$$anonfun$3.apply(AdminUtils.scala:219) at kafka.admin.AdminUtils$$anonfun$3.apply(AdminUtils.scala:201) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.immutable.List.map(List.scala:45) at kafka.admin.AdminUtils$.kafka$admin$AdminUtils$$fetchTopicMetadataFromZk(AdminUtils.scala:201) at kafka.admin.AdminUtils$.fetchTopicMetadataFromZk(AdminUtils.scala:190) at kafka.server.KafkaApis$$anonfun$handleTopicMetadataRequest$1.apply(KafkaApis.scala:479) at kafka.server.KafkaApis$$anonfun$handleTopicMetadataRequest$1.apply(KafkaApis.scala:465) at scala.collection.immutable.Set$Set1.foreach(Set.scala:81) at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:464) at kafka.server.KafkaApis.handle(KafkaApis.scala:67) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41) at java.lang.Thread.run(Unknown Source) Then, when I type a message in the producer console, the server console pops out WARN [KafkaApi-0] Leader not local for topic test partition 0 on broker 0 (kafka.server.KafkaApis) Consumer console also didn't get any message. Any help is appreciated. Thanks
Re: Kafka 0.8 cluster setup?
I following these instructions to get the first 'play' cluster going: https://cwiki.apache.org/KAFKA/kafka-08-quick-start.html Instead of running the 3 brokers on the same machine, I ran on on each machine. Note that you will need to do a little bit of zookeeper setup to get a cluster running, I think the main one was setting the 'myid' in the zookeeper install directory properly on each machine (namely give each machine a unique # in that file). Thanks, Chris On Mon, Apr 22, 2013 at 5:32 PM, Neha Narkhede neha.narkh...@gmail.comwrote: Hi Jason, We are in the process of updating the documentation. Hoping to finish it by this week. Stay tuned. Thanks, Neha On Mon, Apr 22, 2013 at 2:12 PM, Jason Huang jason.hu...@icare.com wrote: Hello, We've been playing around with kafka 0.8 for a few months now and decided to install kafka on a small cluster for further testing. I tried to search online but couldn't find any setup documentation for a kafka 0.8 cluster. Does anyone know if such documents exist? If they don't exist, what could be our best guide to start with? thanks! Jason
Re: Kafka 0.8 cluster setup?
Beat me to it ;) Only caveat is I wouldn't use /tmp for this, since if you're running tmpwatch the 'myid' file will get removed unexpectedly since it doesn't seem to be changed at it. That was fun to find :) We use /var/zookeeper for our storage. Thanks, Chris On Tue, Apr 23, 2013 at 10:30 AM, Jason Huang jason.hu...@icare.com wrote: Thanks Eric - this helps quite a bit. I will play around with it. Jason On Tue, Apr 23, 2013 at 10:21 AM, Eric Sites eric.si...@threattrack.com wrote: Jason, You need to modify the ZooKeeper config and add the following: dataDir=/tmp/zookeeper initLimit=50 syncLimit=2 server.1=kafka001.domain.com:2888:3888 server.2=kafka002.domain.com:2888:3888 server.3=kafka003.domain.com:2888:3888 # Make sure you open those 2 points on each of the servers for your firewall The .1 after the server is the id of the zookeeper which should be stored in a file Named /tmp/zookeeper/myid Echo 1 /tmp/zookeeper/myid` Cheers, Eric Sites On 4/23/13 9:50 AM, Jason Huang jason.hu...@icare.com wrote: Thanks Chris and Neha. Chris - I've been through the link you mentioned before. However, that appears to be using one instance of zookeeper, which makes whichever server that runs zookeeper as the single point of failure? Jason On Tue, Apr 23, 2013 at 8:28 AM, Chris Curtin curtin.ch...@gmail.comwrote: I following these instructions to get the first 'play' cluster going: https://cwiki.apache.org/KAFKA/kafka-08-quick-start.html Instead of running the 3 brokers on the same machine, I ran on on each machine. Note that you will need to do a little bit of zookeeper setup to get a cluster running, I think the main one was setting the 'myid' in the zookeeper install directory properly on each machine (namely give each machine a unique # in that file). Thanks, Chris On Mon, Apr 22, 2013 at 5:32 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Hi Jason, We are in the process of updating the documentation. Hoping to finish it by this week. Stay tuned. Thanks, Neha On Mon, Apr 22, 2013 at 2:12 PM, Jason Huang jason.hu...@icare.com wrote: Hello, We've been playing around with kafka 0.8 for a few months now and decided to install kafka on a small cluster for further testing. I tried to search online but couldn't find any setup documentation for a kafka 0.8 cluster. Does anyone know if such documents exist? If they don't exist, what could be our best guide to start with? thanks! Jason
Re: Securing Kafka
Also keep in mind that anything done at the transport (SSL for example) layer won't solve your 'at rest' problems. All messages are written to disk, so unless the broker does some encryption logic you haven't solved the data visibility issues. I also think this should be a producer/consumer problem not a Broker. Keep the Brokers as fast as possible (thus NIO/kernel space activities etc.) and push the cost to the producers and consumers. Chris On Tue, Apr 23, 2013 at 2:02 PM, Jason Rosenberg j...@squareup.com wrote: Yes, I think encryption at the message level is a workable solution, as long as you don't care about exposing the meta data that goes with it (e.g. topic names, kafka broker/zk server locations, etc.). Jason On Tue, Apr 23, 2013 at 10:02 AM, Fergal Somers fergal.som...@workday.comwrote: Hi We are planning to use Kafka, but like others on this list we have a need to be able to secure communication. The approaches people have suggested on this list are: - Encrypt the messages at the producer (e.g http://search-hadoop.com/m/1AfXKcZIk52/message+encryptionsubj=Re+Secure+communication ) - Add SSL to Kafka protocol - http://mail-archives.apache.org/mod_mbox/kafka-users/201304.mbox/ajax/%3CCAA%2BBczQ_dMXUTNndSu4d%2B6aRo%3DSLiFa4iGMu_78OWKub_CTScw%40mail.gmail.com%3E Adding SSL support to Kafka, probably means adding SSLEngine support the the nio socket handling ( https://groups.google.com/forum/#!msg/kafka-dev/ZmJrB_plu1I/_9cmGlLCSVEJ ). I don't think there are any immediate plans to provide this, but it's potentially something that Kafka would support in the future? In theory this is something we could look at, but we would need to go further. We also need to separate producers from consumers. The aim would be to ensure that a Kafka producer couldn't also act as a consumer. Essentially producers can write to Kafka, but not read. From looking at the Kafka source, achieving producer/consumer separation looks to me like it would be quite a change to the Kafka server (0.7). So are there any plans in the (near) future in this area (producer / consumer separation) ? Message encryption (at the application layer) would allow us to achieve both aims of securing communication and separating consumers from producers. Producers get the public cert (so they can encrypt messages as they place them on the bus). Only consumers get the private cert - so only they can decrypt messages consumed. This seems like something we can do ourselves - I just wanted to sanity check the approach with this group. Cheers, Fergal.
Re: Kafka wiki Documentation conventions - looking for feedback
Hi Jun, #1 and #2 are done, thanks for the code-review! I'll work on getting a High Level consumer example this week. I don't have one readily usable (we quickly found the lack of control over offsets didn't meet our needs) but I can get something this week. Congratulations on getting closer to Beta! Chris On Mon, Apr 22, 2013 at 12:26 PM, Jun Rao jun...@gmail.com wrote: Chris, Thanks for the wiki. We are getting close to releasing 0.8.0 beta and your writeup is very helpful. The following are some comments for the 0.8 Producer wiki. 1. The following sentence is inaccurate. The producer will do random assignment as long as the key in KeyedMessage is null. If a key is not null, it will use the default partitioner if partitioner.class is not set. By default if you don't include a partitioner.class Kafka will randomly assign the message to a partition. 2. In the following sentence, the first type is the key and the second type is the message. The first is the type of the message, the second the type of the Partition key. 3. Could you explain the key.serializer.class property too? In addition to the 0.8 SimpleConsumer wiki, could you write up one for the 0.8 high level consumer? Thanks, Jun On Fri, Mar 29, 2013 at 8:28 AM, Chris Curtin curtin.ch...@gmail.com wrote: Hi, I've added an example program for using a SimpleConsumer for 0.8.0. Turns out to be a little more complicated once you add Broker failover. I'm not 100% thrilled with how I detect and recover, so if someone has a better way of doing this please let me (and this list) know. https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example Thanks, Chris On Mon, Mar 25, 2013 at 8:02 AM, Chris Curtin curtin.ch...@gmail.com wrote: Hi David, Thanks for the feedback. I've seen the example before and after in different books/articles and it doesn't matter to me. Anyone else want to help define a style guide or is there one I didn't see already? Thanks, Chris On Thu, Mar 21, 2013 at 7:46 PM, David Arthur mum...@gmail.com wrote: This looks great! A few comments * I think it would be useful to start with a complete example (ready to copy/paste) and then break it down bit by bit * Some of the formatting is funky (gratuitous newlines), also I think 2 spaces looks nicer than 4 * In the text, it might be useful to embolden or italicize class names Also, maybe we should move this to a separate thread?
Re: Got exception executing Kafka Producer.
You need to reference the version of Yammer shipping with Kafka. It is under \core\lib\metrics-* On Thu, Apr 4, 2013 at 11:41 AM, Oleg Ruchovets oruchov...@gmail.comwrote: I am executing a simple code like this: public class FirstKafkaTester { public ProducerInteger, String initProducer(){ Properties props = new Properties(); props.put(zk.connect, 127.0.0.1:2181); props.put(broker.list, localhost:9092); props.put(serializer.class, kafka.serializer.StringEncoder); return new ProducerInteger, String(new ProducerConfig(props)); } public void executeProducer(String topic){ int messageNo = 1; ProducerInteger, String producer = initProducer(); String messageStr = new String(Message_ + messageNo); producer.send(new KeyedMessageInteger, String(topic, messageStr)); } public static void main(String[] args) { FirstKafkaTester firstKafkaTester = new FirstKafkaTester(); firstKafkaTester.executeProducer(topic_1); } } I use Kafka 0.8. I put manually to the maven in my local repository and add dependency of yammer dependency groupIdcom.yammer.metrics/groupId artifactIdmetrics-core/artifactId version2.2.0/version /dependency I got this exception: 2013-04-04 11:09:56,909 WARN async.DefaultEventHandler (Logging.scala:warn(89)) - Failed to send producer request with correlation id 2 to broker 3 with data for partitions [topic_1,0] java.lang.NoSuchMethodError: com.yammer.metrics.core.TimerContext.stop()J at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:36) at kafka.producer.SyncProducer.send(SyncProducer.scala:100) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:243) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:105) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:99) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) at kafka.producer.Producer.send(Producer.scala:74) at kafka.javaapi.producer.Producer.send(Producer.scala:32) at kafka_eval.FirstKafkaTester.executeProducer(FirstKafkaTester.java:26) at kafka_eval.FirstKafkaTester.main(FirstKafkaTester.java:34) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) Is this something wrong that I am doing? What should I do to resolve the issue. Thanks Oleg.
Re: Slides from my March 2013 Atlanta Java User's Group presentation about Kafka
Now with Video: http://vimeo.com/63040812 (I did notice that I misspoke about reading from replicas, sorry). On Wed, Mar 20, 2013 at 8:11 AM, Chris Curtin curtin.ch...@gmail.comwrote: Hi, It went really well last night. Lots of good questions. Here are the slides, and hopefully the video will be up in a few days: http://www.slideshare.net/chriscurtin/ajug-march-2013-kafka Thanks, Chris
Re: Kafka wiki Documentation conventions - looking for feedback
Hi, I've added an example program for using a SimpleConsumer for 0.8.0. Turns out to be a little more complicated once you add Broker failover. I'm not 100% thrilled with how I detect and recover, so if someone has a better way of doing this please let me (and this list) know. https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example Thanks, Chris On Mon, Mar 25, 2013 at 8:02 AM, Chris Curtin curtin.ch...@gmail.comwrote: Hi David, Thanks for the feedback. I've seen the example before and after in different books/articles and it doesn't matter to me. Anyone else want to help define a style guide or is there one I didn't see already? Thanks, Chris On Thu, Mar 21, 2013 at 7:46 PM, David Arthur mum...@gmail.com wrote: This looks great! A few comments * I think it would be useful to start with a complete example (ready to copy/paste) and then break it down bit by bit * Some of the formatting is funky (gratuitous newlines), also I think 2 spaces looks nicer than 4 * In the text, it might be useful to embolden or italicize class names Also, maybe we should move this to a separate thread?
Re: Anyone working on a Kafka book?
Thanks for finding those. Looks like a copy and paste issue. I've updated the document. Thanks, Chris On Sat, Mar 23, 2013 at 11:27 AM, Jonathan Hodges hodg...@gmail.com wrote: Many thanks for contributing! The docs are very helpful. I found a couple small possible typos. The partitioning code example looks like it repeats at the bottom with duplicate import and class definition statements. Also the create topic command-line appears to have an extra '-' for the partition option. I can edit these, but before doing so I wanted to check and make sure I wasn't mistaken. -Jonathan
Kafka wiki Documentation conventions - looking for feedback
Hi David, Thanks for the feedback. I've seen the example before and after in different books/articles and it doesn't matter to me. Anyone else want to help define a style guide or is there one I didn't see already? Thanks, Chris On Thu, Mar 21, 2013 at 7:46 PM, David Arthur mum...@gmail.com wrote: This looks great! A few comments * I think it would be useful to start with a complete example (ready to copy/paste) and then break it down bit by bit * Some of the formatting is funky (gratuitous newlines), also I think 2 spaces looks nicer than 4 * In the text, it might be useful to embolden or italicize class names Also, maybe we should move this to a separate thread?
Re: Anyone working on a Kafka book?
Okay, how do we do this logistically? I've take the Producer code that I wrote for testing purposes and wrote a description around it. How do I get it to you guys? Simple Consumer is going to take a little longer since my test Consumers are non-trivial and I'll need to simplify them. Thanks, Chris On Tue, Mar 19, 2013 at 5:28 PM, Neha Narkhede neha.narkh...@gmail.comwrote: That's a great idea, Chris! How about picking the quickstart document ?That is the most important information that users moving to 0.8 will need. Thanks, Neha
Re: Anyone working on a Kafka book?
Hi Jun, I've been thinking for a while about how to contribute to the project and thought that working on some documentation for the website might be a good way. Do you have an outline of what you'd like the site to look like that I (AND OTHERS hint, hint) could pick a topic, write the article and submit for you guys to review? Thanks, Chris On Tue, Mar 19, 2013 at 12:20 PM, Jun Rao jun...@gmail.com wrote: Hi, David, At LinkedIn, committers are too busy to write a Kafka book right now. I think this is a good idea to pursue. So, if you want to do it, we'd be happy to help. The only request that I have for you is while writing the book, it would be good if you can use this opportunity to also help us improve the documentation of the site. Thanks, Jun On Tue, Mar 19, 2013 at 6:34 AM, David Arthur mum...@gmail.com wrote: I was approached by a publisher the other day to do a book on Kafka - something I've actually thought about pursuing. Before I say yes (or consider saying yes), I wanted to make sure no one else was working on a book. No sense in producing competing texts at this point. So, anyone working on a Kafka book? Self published or otherwise? -David
Re: 0.8 behavior change: consumer re-receives last batch of messages in a topic?
Hi, I noticed the same thing. In 0.8.0 the offset passed to the fetch is where you want to start, not where you left off. So the last offset read from the previous batch is truly the 'last offset' so you need to save it and ask for it +1. Otherwise you keep asking for that last offset, which is valid so it keeps returning. Be careful with the +1 logic. Don't keep adding 1 if you don't get anything. It should always be 'last offset read +1' I think this happened with the change from file-byte offsets to offset as a message #. Chris On Wed, Mar 13, 2013 at 2:49 PM, Hargett, Phil phil.harg...@mirror-image.com wrote: I have 2 consumers in our scenario, reading from different brokers. Each broker is running standalone, although each have their own dedicated zookeeper instance for bookkeeping. After switching from 0.7.2, I noticed that both consumers exhibited high CPU usage. I am not yet exploiting any zookeeper knowledge in my consumer code; I am just making calls to the SimpleConsumer in the java API, passing the host and port of my broker. In 0.7.2, I kept the last offset from messages received via a fetch, and used that as the offset passed into the fetch method when receiving the next message set. With 0.8, I had to add a check to drop fetched messages when the message's offset was less than my own offset, based on the last message I saw. If I didn't make that change, it seemed like the last 200 or so messages in my topic (probably matches a magic batch size configured somewhere in all of this code) were continually refetched. In this scenario, my topic was no longer accumulating messages, as I had turned off the producer, so I was expecting the fetches to eventually either block, return an empty message set, or fail (not sure of semantics of fetch). Continually receiving the last batch of messages at the end of the topic was not a semantic I expected. Is this an intended change in behavior—or do I need to write better consumer code? Guidance, please. :)
SimpleConsumer error conditions and handling
Hi, 0.8.0 HEAD from 3/4/2013. As I think through building a robust SimpleConsumer I ran some failure tests today and want to make sure I understand what is going on. FYI I know that I should be doing a metadata lookup to find the leader, but I wanted to see what happens if things are going well and the leader changes between requests or I've cached the leader and try to connect without the cost of a leader lookup. First test: connect to a Broker that is a 'copy' of the topic/partition but not leader. Get an error '5' which maps to 'ErrorMapping.LeaderNotAvailableCode'. Why didn't I get ErrorMapping.NotLeaderForPartitionCode or something else to tell me I'm not talking to the Leader? 'not available' implies something is wrong with replication. But connecting to the leader Broker everything works fine. Second test: connect to a Broker that isn't the leader or a copy and I get error 3, unknown topic or partition. Makes sense. Third test: connect to the leader and while reading data, shutdown the leader Broker via command line: I get some IOExceptions then Connection Refused on the reconnect. (Note that the Connect Refused is the exception raised, IOException was written to logs but not raised to my code.) Not sure the best way to code to recover from this without assuming the worst every time Could there be some notice from Kafka that the connection to the leader was closed due to a shutdown vs. getting Connection Refused errors so I can respond differently? Something like 'Broker has closed connection due to shutdown'. So I know to sleep for a second before going through the leader lookup logic again? Or ideally have Kafka know it was a clean shutdown and automatically transition to the new leader. Knowing it was a clean shutdown would also allow me to treat the clean shutdown as a normal occurrence vs. an exception when something goes wrong. Thanks, Chris
Re: 0.8.0 HEAD 3/4/2013 performance jump?
Great points Joe. What about something being written to INFO at startup about the replication level being used? Chris On Tue, Mar 5, 2013 at 9:36 AM, Joe Stein crypt...@gmail.com wrote: Hi Chris, setting the ack default to 1 would mean folks would have to have a replica setup and configured otherwise starting a server from scratch from download would mean an error message to the user. I hear your risk of not replicating though perhaps such a use case would be solved through auto discovery or some other feature/contribution for 0.9. I would be -1 on changing the default right now because new folks coming in on a build either as new or migrations simply leaving because they got an error or even running by just git clone ./sbt package and running (less steps in 0.8). There are already expectations on 0.8 we should try to keep things settling too. Lastly, folks when they run and go live often will have a chef, cfengine, puppet, etc script for configuration Perhaps through some more operation documentation, comments and general communications to the community we can reduce risk. /* Joe Stein http://www.linkedin.com/in/charmalloc Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop */ On Tue, Mar 5, 2013 at 8:30 AM, Chris Curtin curtin.ch...@gmail.com wrote: Hi Jun, I wasn't explicitly setting the ack anywhere. Am I reading the code correctly that in SyncProducerConfig.scala the DefaultRequiredAcks is 0? Thus not waiting on the leader? Setting: props.put(request.required.acks, 1); causes the writes to go back to the performance I was seeing before yesterday. Are you guys open to changing the default to be 1? The MongoDB Java-driver guys made a similar default change at the end of last year because many people didn't understand the risk that the default value of no-ack was putting them in until they had a node failure. So they default to 'safe' and let you decide what your risk level is vs. assuming you can lose data. Thanks, Chris On Tue, Mar 5, 2013 at 1:00 AM, Jun Rao jun...@gmail.com wrote: Chris, On the producer side, are you using ack=0? Earlier, ack=0 is the same as ack=1, which means that the producer has to wait for the message to be received by the leader. More recently, we did the actual implementation of ack=0, which means the producer doesn't wait for the message to reach the leader and therefore it is much faster. Thanks, Jun On Mon, Mar 4, 2013 at 12:01 PM, Chris Curtin curtin.ch...@gmail.com wrote: Hi, I'm definitely not complaining, but after upgrading to HEAD today my producers are running much, much faster. Don't have any measurements, but last release I was able to tab windows to stop a Broker before I could generate 500 partitioned messages. Now it completes before I can get the Broker shutdown! Anything in particular you guys fixed? (I did remove all the files on disk per the email thread last week and reset the ZooKeeper meta, but that shouldn't matter right?) Very impressive! Thanks, Chris
Re: Copy availability when broker goes down?
I'll grab HEAD in a few minutes and see if the changes. Issues submitted: https://issues.apache.org/jira/browse/KAFKA-783 https://issues.apache.org/jira/browse/KAFKA-782 Thanks, Chris On Mon, Mar 4, 2013 at 1:15 PM, Jun Rao jun...@gmail.com wrote: Chris, As Neha said, the 1st copy of a partition is the preferred replica and we try to spread them evenly across the brokers. When a broker is restarted, we don't automatically move the leader back to the preferred replica though. You will have to run a command line tool PreferredReplicaLeaderElectionCommand to balance the leaders again. Also, I recommend that you try the latest code in 0.8. A bunch of issues have been fixes since Jan. You will have to wipe out all your ZK and Kafka data first though. Thanks, Jun On Mon, Mar 4, 2013 at 8:32 AM, Chris Curtin curtin.ch...@gmail.com wrote: Hi, (Hmm, take 2. Apache's spam filter doesn't like the word to describe the copy of the data. 'R - E -P -L -I -C -A' so it blocked it from sending! Using 'copy' below to mean that concept) I’m running 0.8.0 with HEAD from end of January (not the merge you guys did last night). I’m testing how the producer responds to loss of brokers, what errors are produced etc. and noticed some strange things as I shutdown servers in my cluster. Setup: 4 node cluster 1 topic, 3 copies in the set 10 partitions numbered 0-9 State of the cluster is determined using TopicMetadataRequest. When I start with a full cluster (2nd column is the partition id, next is leader, then the copy set and ISR): Java: 0:vrd03.atlnp1 R:[ vrd03.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd01.atlnp1] Java: 1:vrd04.atlnp1 R:[ vrd04.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd04.atlnp1 vrd01.atlnp1 vrd02.atlnp1] Java: 2:vrd03.atlnp1 R:[ vrd01.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd03.atlnp1 vrd01.atlnp1 vrd02.atlnp1] Java: 3:vrd03.atlnp1 R:[ vrd02.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1] Java: 4:vrd03.atlnp1 R:[ vrd03.atlnp1 vrd01.atlnp1 vrd02.atlnp1] I:[ vrd03.atlnp1 vrd01.atlnp1 vrd02.atlnp1] Java: 5:vrd03.atlnp1 R:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1] Java: 6:vrd03.atlnp1 R:[ vrd01.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd01.atlnp1] Java: 7:vrd04.atlnp1 R:[ vrd02.atlnp1 vrd04.atlnp1 vrd01.atlnp1] I:[ vrd04.atlnp1 vrd01.atlnp1 vrd02.atlnp1] Java: 8:vrd03.atlnp1 R:[ vrd03.atlnp1 vrd02.atlnp1 vrd04.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1] Java: 9:vrd03.atlnp1 R:[ vrd04.atlnp1 vrd03.atlnp1 vrd01.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd01.atlnp1] When I stop vrd01, which isn’t leader on any: Java: 0:vrd03.atlnp1 R:[ ] I:[] Java: 1:vrd04.atlnp1 R:[ ] I:[] Java: 2:vrd03.atlnp1 R:[ ] I:[] Java: 3:vrd03.atlnp1 R:[ vrd02.atlnp1 vrd03.atlnp1 vrd04.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1] Java: 4:vrd03.atlnp1 R:[ ] I:[] Java: 5:vrd03.atlnp1 R:[ vrd04.atlnp1 vrd02.atlnp1 vrd03.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1] Java: 6:vrd03.atlnp1 R:[ ] I:[] Java: 7:vrd04.atlnp1 R:[ ] I:[] Java: 8:vrd03.atlnp1 R:[ vrd03.atlnp1 vrd02.atlnp1 vrd04.atlnp1] I:[ vrd03.atlnp1 vrd04.atlnp1 vrd02.atlnp1] Java: 9:vrd03.atlnp1 R:[ ] I:[] Does this mean that none of the partitions that used to have a copy on vrd01 are updating ANY of the copies? I ran another test, again starting with a full cluster and all partitions had a full set of copies. When I stop the broker which was leader for 9 of the 10 partitions, the leaders were all elected on one machine instead of the set of 3. Should the leaders have been better spread out? Also the copies weren’t fully populated either. Last test: started with a full cluster, showing all copies available. Stopped a broker that was not a leader for any partition. Noticed that the partitions where the stopped machine was in the copy set didn’t show any copies like above. Let the cluster sit for 30 minutes and didn’t see any new copies being brought on line. How should the cluster handle a machine that is down for an extended period of time? I don’t have a new machine I could add to the cluster, but what happens when I do? Will it not be used until a new topic is added or how does it become a valid option for a copy or eventually the leader? Thanks, Chris
Re: Consumer questions: 0.8.0 vs. 0.7.2
Hi, I was able to implement my own lookup code but have a few concerns about this long term: - the Broker class is marked as 'private' in the Scala code. IntelliJ gives me an error about using it, but the runtime lets me use it and get the host/port out. - I have to know a lot about the structure of some internal classes for this to work, so changes in the implementation would cause my logic to break. I did a quick JIRA search and didn't see a request for a java-api for finding the primary, is that already on the roadmap or should I submit an enhancement request? Thanks, Chris On Wed, Nov 28, 2012 at 2:08 PM, Neha Narkhede neha.narkh...@gmail.comwrote: snip Also, there are 2 ways to send the topic metadata request. One way is how SimpleConsumerShell.scala uses ClientUtils.fetchTopicMetadata(). Another way is by using the send() API on SyncProducer. Thanks, Neha