New Consumer API and Range Consumption with Fail-over
Hello Kafka Dev Team, With new Consumer API redesign ( https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ), is there a capability to consume given the topic and partition start/ end position. How would I achieve following use case of range consumption with fail-over. Use Case: Ability to reload data given topic and its partition offset start/end with High Level Consumer with fail over. Basically, High Level Range consumption and consumer group dies while main consumer group. Suppose you have a topic called “test-topic” and its partition begin and end offset. { topic: test-topic, [ { partition id : 1 , offset start: 100, offset end: 500,000 }, { partition id : 2 , offset start: 200,000, offset end: 500,000 ….. for n partitions ] } Each you create consumer group: “Range-Consumer “ and use seek method and for each partition. Your feedback is greatly appreciated. In each JVM, For each consumption tread: Consumer c = KafkaConsumer( { group.id=”Range-consumer}…) MapInteger, Integer parttionTOEndOfsetMapping …. for(TopicPartition tp : topicPartitionlist){ seek(TopicPartition(Parition 1), long offset) } while(true){ ConsumerRecords records = consumer.poll(1); // for each record check the offset record = record.iterator().next(); if(parttionTOEndOfsetMapping(record.getPartition()) = record.getoffset) { // consume record //commit offset consumer.commit(CommitType.SYNC); }else { // Should I unsubscribe it now for this partition ? consumer.unscribe(record.getPartition) } } Please let me know if the above approach is valid: 1) how will fail-over work. 2) how Rebooting entire consumer group impacts offset seek ? Since offset are stored by Kafka itsself. Thanks , Bhavesh
Re: New Consumer API and Range Consumption with Fail-over
Hi Bhavesh, I'm not totally sure I understand the expected behavior, but I think this can work. Instead of seeking to the start of the range before the poll loop, you should probably provide a ConsumerRebalanceCallback to get notifications when group assignment has changed (e.g. when one of your nodes dies). When a new partition is assigned, the callback will be invoked by the consumer and you can use it to check if there's a committed position in the range or if you need to seek to the beginning of the range. For example: void onPartitionsAssigned(consumer, partitions) { for (partition : partitions) { try { offset = consumer.committed(partition) consumer.seek(partition, offset) } catch (NoOffsetForPartition) { consumer.seek(partition, rangeStart) } } } If a failure occurs, then the partitions will be rebalanced across whichever consumers are still active. The case of the entire cluster being rebooted is not really different. When the consumers come back, they check the committed position and resume where they left off. Does that make sense? After you are finished consuming a partition's range, you can use KafkaConsumer.pause(partition) to prevent further fetches from being initiated while still maintaining the current assignment. The patch to add pause() is not in trunk yet, but it probably will be before too long. One potential problem is that you wouldn't be able to reuse the same group to consume a different range because of the way it depends on the committed offsets. Kafka's commit API actually allows some additional metadata to go along with a committed offset and that could potentially be used to tie the commit to the range, but it's not yet exposed in KafkaConsumer. I assume it will be eventually, but I'm not sure whether that will be part of the initial release. Hope that helps! Jason On Thu, Jul 30, 2015 at 7:54 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hello Kafka Dev Team, With new Consumer API redesign ( https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ), is there a capability to consume given the topic and partition start/ end position. How would I achieve following use case of range consumption with fail-over. Use Case: Ability to reload data given topic and its partition offset start/end with High Level Consumer with fail over. Basically, High Level Range consumption and consumer group dies while main consumer group. Suppose you have a topic called “test-topic” and its partition begin and end offset. { topic: test-topic, [ { partition id : 1 , offset start: 100, offset end: 500,000 }, { partition id : 2 , offset start: 200,000, offset end: 500,000 ….. for n partitions ] } Each you create consumer group: “Range-Consumer “ and use seek method and for each partition. Your feedback is greatly appreciated. In each JVM, For each consumption tread: Consumer c = KafkaConsumer( { group.id=”Range-consumer}…) MapInteger, Integer parttionTOEndOfsetMapping …. for(TopicPartition tp : topicPartitionlist){ seek(TopicPartition(Parition 1), long offset) } while(true){ ConsumerRecords records = consumer.poll(1); // for each record check the offset record = record.iterator().next(); if(parttionTOEndOfsetMapping(record.getPartition()) = record.getoffset) { // consume record //commit offset consumer.commit(CommitType.SYNC); }else { // Should I unsubscribe it now for this partition ? consumer.unscribe(record.getPartition) } } Please let me know if the above approach is valid: 1) how will fail-over work. 2) how Rebooting entire consumer group impacts offset seek ? Since offset are stored by Kafka itsself. Thanks , Bhavesh
Decomissioning a broker
I’m sure this has been asked before, but I can’t seem to find the answer. I’m planning a Kafka cluster expansion and upgrade to 0.8.2.1. In doing so, I will be decommissioning a broker. I plan to remove this broker fully from the cluster, and then reinstall it and use it for a different purpose. I understand how to use the reassign-partitions tool to generate new partition assignments and to move partitions around so that the target broker no longer has any active replicas. Once that is done, is there anything special that needs to happen? I can shutdown the broker, but as far as I know that broker will still be registered in Zookeeper. Should I just delete the znode for that broker once it has been shut down? Thanks! -Andrew Otto
Impact of Zookeeper Unavailability on Running Producers/Consumers
Hi, We are using zookeeper for committing the consumer offsets. Zookeeper service has become unavailable due to disk full. The producers/consumers seems to be running fine ( in terms of numbers of messages consumed/produced per hour ). While we are fixing the issue, just want to know it's impact on the system. For producers, we see no impact where as for consumers just that their offsets are not getting committed. Also, once the zookeeper is up, consumer offsets should get updated to their proper value. It it correct? Thanks -- Best Regards, Mohit Gupta
Re: Best practices - Using kafka (with http server) as source-of-truth
Hi Ewen, Thanks for your response. I'll experiment and benchmark it with the normal proxy and NGinx as well and update the results. Regards, prabcs On Mon, Jul 27, 2015 at 11:10 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Hi Prabhjot, Confluent has a REST proxy with docs that may give some guidance: http://docs.confluent.io/1.0/kafka-rest/docs/intro.html The new producer that it uses is very efficient, so you should be able to get pretty good throughput. You take a bit of a hit due to the overhead of sending data through a proxy, but with appropriate batching you can get about 2/3 the performance as you would get using the Java producer directly. There are also a few other proxies you can find here: https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-HTTPREST You can also put nginx (or HAProxy, or a variety of other solutions) in front of REST proxies for load balancing, HA, SSL termination, etc. This is yet another hop, so it might affect throughput and latency. -Ewen On Mon, Jul 27, 2015 at 6:55 AM, Prabhjot Bharaj prabhbha...@gmail.com wrote: Hi Folks, I would like to understand the best practices when using kafka as the source-of-truth, given the fact that I want to pump in data to Kafka using http methods. What are the current production configurations for such a use case:- 1. Kafka-http-client - is it scalable the way Nginx is ?? 2. Using Kafka and Nginx together - If anybody has used this, please explain 3. Any other scalable method ? Regards, prabcs -- Thanks, Ewen -- - There are only 10 types of people in the world: Those who understand binary, and those who don't
Re: Connection to zk shell on Kafka
This looks an issue to be fixed. I created KAFKA-2385 for this. Thanks, Jiangjie (Becket) Qin On Wed, Jul 29, 2015 at 10:33 AM, Chris Barlock barl...@us.ibm.com wrote: I'm a user of Kafka/ZooKeeper not one of its developers, so I can't give you a technical explanation. I do agree that Kafka should ship the jline JAR if its zookeeper-shell depends on it. Chris From: Prabhjot Bharaj prabhbha...@gmail.com To: u...@zookeeper.apache.org, d...@kafka.apache.org Cc: users@kafka.apache.org Date: 07/29/2015 01:27 PM Subject:Re: Connection to zk shell on Kafka Sure. It would be great if you could as well explain the reason why the absence of the jar creates this problem Also, I'm surprised that zookeeper that comes bundled with kafka 0.8.2 does not have the jline jar Regards, prabcs On Wed, Jul 29, 2015 at 10:45 PM, Chris Barlock barl...@us.ibm.com wrote: You need the jline JAR file that ships with ZooKeeper. Chris IBM Tivoli Systems Research Triangle Park, NC (919) 224-2240 Internet: barl...@us.ibm.com From: Prabhjot Bharaj prabhbha...@gmail.com To: users@kafka.apache.org, u...@zookeeper.apache.org Date: 07/29/2015 01:13 PM Subject:Connection to zk shell on Kafka Hi folks, */kafka/bin# ./zookeeper-shell.sh localhost:2182/* *Connecting to localhost:2182/* *Welcome to ZooKeeper!* *JLine support is disabled* *WATCHER::* *WatchedEvent state:SyncConnected type:None path:null* *The shell never says connected* I'm running 5 node zookeeper cluster on 5-node kafka cluster (each kafka broker has 1 zookeeper server running) When I try connecting to the shell, the shell never says 'Connected' However, if I try connecting on another standalone zookeeper which has no links to kafka, I'm able to connect:- */kafka/bin# /zookeeper/scripts/zkCli.sh -server 127.0.0.1:2181 http://127.0.0.1:2181* *Connecting to 127.0.0.1:2181 http://127.0.0.1:2181* *Welcome to ZooKeeper!* *JLine support is enabled* *WATCHER::* *WatchedEvent state:SyncConnected type:None path:null* *[zk: 127.0.0.1:2181(CONNECTED) 0]* Am I missing something? Thanks, prabcs -- - There are only 10 types of people in the world: Those who understand binary, and those who don't