New Consumer API and Range Consumption with Fail-over

2015-07-30 Thread Bhavesh Mistry
Hello Kafka Dev Team,


With new Consumer API redesign  (
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
),  is there a capability to consume given the topic and partition  start/
end position.  How would I achieve following use case of range consumption
with fail-over.


Use Case:
Ability to reload data given topic and its partition offset start/end with
High Level Consumer with fail over.   Basically, High Level Range
consumption and consumer group dies while main consumer group.


Suppose you have a topic called “test-topic” and its partition begin and
end offset.

{

topic:  test-topic,

[   {  partition id : 1 , offset start:   100,  offset end:
500,000 },


{  partition id : 2 ,  offset start:   200,000, offset end:
500,000

….. for n partitions

]

}

Each you create consumer group: “Range-Consumer “ and use seek method and
for each partition.   Your feedback is greatly appreciated.


In each JVM,


For each consumption tread:


Consumer c = KafkaConsumer( { group.id=”Range-consumer}…)

MapInteger, Integer parttionTOEndOfsetMapping ….

for(TopicPartition tp : topicPartitionlist){

seek(TopicPartition(Parition 1), long offset)

}



while(true){

ConsumerRecords records = consumer.poll(1);

// for each record check the offset

record = record.iterator().next();

if(parttionTOEndOfsetMapping(record.getPartition()) =
record.getoffset) {
  // consume  record

//commit  offset

  consumer.commit(CommitType.SYNC);

}else {

// Should I unsubscribe it now  for this partition ?

consumer.unscribe(record.getPartition)

}



}




Please let me know if the above approach is valid:

1) how will fail-over work.

2) how Rebooting entire consumer group impacts offset seek ? Since offset
are stored by Kafka itsself.

Thanks ,

Bhavesh


Re: New Consumer API and Range Consumption with Fail-over

2015-07-30 Thread Jason Gustafson
Hi Bhavesh,

I'm not totally sure I understand the expected behavior, but I think this
can work. Instead of seeking to the start of the range before the poll
loop, you should probably provide a ConsumerRebalanceCallback to get
notifications when group assignment has changed (e.g. when one of your
nodes dies). When a new partition is assigned, the callback will be invoked
by the consumer and you can use it to check if there's a committed position
in the range or if you need to seek to the beginning of the range. For
example:

void onPartitionsAssigned(consumer, partitions) {
  for (partition : partitions) {
 try {
   offset = consumer.committed(partition)
   consumer.seek(partition, offset)
 } catch (NoOffsetForPartition) {
   consumer.seek(partition, rangeStart)
 }
  }
}

If a failure occurs, then the partitions will be rebalanced across
whichever consumers are still active. The case of the entire cluster being
rebooted is not really different. When the consumers come back, they check
the committed position and resume where they left off. Does that make sense?

After you are finished consuming a partition's range, you can use
KafkaConsumer.pause(partition) to prevent further fetches from being
initiated while still maintaining the current assignment. The patch to add
pause() is not in trunk yet, but it probably will be before too long.

One potential problem is that you wouldn't be able to reuse the same group
to consume a different range because of the way it depends on the committed
offsets. Kafka's commit API actually allows some additional metadata to go
along with a committed offset and that could potentially be used to tie the
commit to the range, but it's not yet exposed in KafkaConsumer. I assume it
will be eventually, but I'm not sure whether that will be part of the
initial release.


Hope that helps!

Jason

On Thu, Jul 30, 2015 at 7:54 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com
wrote:

 Hello Kafka Dev Team,


 With new Consumer API redesign  (

 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
 ),  is there a capability to consume given the topic and partition  start/
 end position.  How would I achieve following use case of range consumption
 with fail-over.


 Use Case:
 Ability to reload data given topic and its partition offset start/end with
 High Level Consumer with fail over.   Basically, High Level Range
 consumption and consumer group dies while main consumer group.


 Suppose you have a topic called “test-topic” and its partition begin and
 end offset.

 {

 topic:  test-topic,

 [   {  partition id : 1 , offset start:   100,  offset end:
 500,000 },


 {  partition id : 2 ,  offset start:   200,000, offset end:
 500,000

 ….. for n partitions

 ]

 }

 Each you create consumer group: “Range-Consumer “ and use seek method and
 for each partition.   Your feedback is greatly appreciated.


 In each JVM,


 For each consumption tread:


 Consumer c = KafkaConsumer( { group.id=”Range-consumer}…)

 MapInteger, Integer parttionTOEndOfsetMapping ….

 for(TopicPartition tp : topicPartitionlist){

 seek(TopicPartition(Parition 1), long offset)

 }



 while(true){

 ConsumerRecords records = consumer.poll(1);

 // for each record check the offset

 record = record.iterator().next();

 if(parttionTOEndOfsetMapping(record.getPartition()) =
 record.getoffset) {
   // consume  record

 //commit  offset

   consumer.commit(CommitType.SYNC);

 }else {

 // Should I unsubscribe it now  for this partition
 ?

 consumer.unscribe(record.getPartition)

 }



 }




 Please let me know if the above approach is valid:

 1) how will fail-over work.

 2) how Rebooting entire consumer group impacts offset seek ? Since offset
 are stored by Kafka itsself.

 Thanks ,

 Bhavesh



Decomissioning a broker

2015-07-30 Thread Andrew Otto
I’m sure this has been asked before, but I can’t seem to find the answer.

I’m planning a Kafka cluster expansion and upgrade to 0.8.2.1.  In doing so, I 
will be decommissioning a broker.  I plan to remove this broker fully from the 
cluster, and then reinstall it and use it for a different purpose.

I understand how to use the reassign-partitions tool to generate new partition 
assignments and to move partitions around so that the target broker no longer 
has any active replicas.  Once that is done, is there anything special that 
needs to happen?  I can shutdown the broker, but as far as I know that broker 
will still be registered in Zookeeper.  Should I just delete the znode for that 
broker once it has been shut down?

Thanks!
-Andrew Otto



Impact of Zookeeper Unavailability on Running Producers/Consumers

2015-07-30 Thread Mohit Gupta
Hi,

We are using zookeeper for committing the consumer offsets. Zookeeper
service has become unavailable due to disk full. The producers/consumers
seems to be running fine ( in terms of numbers of messages
consumed/produced per hour ).

While we are fixing the issue, just want to know it's impact on the system.
For producers, we see no impact where as for consumers just that their
offsets are not getting committed. Also, once the zookeeper is up, consumer
offsets should get updated to their proper value.  It it correct?

Thanks

-- 
Best Regards,
Mohit Gupta


Re: Best practices - Using kafka (with http server) as source-of-truth

2015-07-30 Thread Prabhjot Bharaj
Hi Ewen,

Thanks for your response. I'll experiment and benchmark it with the normal
proxy and NGinx as well and update the results.

Regards,
prabcs

On Mon, Jul 27, 2015 at 11:10 PM, Ewen Cheslack-Postava e...@confluent.io
wrote:

 Hi Prabhjot,

 Confluent has a REST proxy with docs that may give some guidance:
 http://docs.confluent.io/1.0/kafka-rest/docs/intro.html The new producer
 that it uses is very efficient, so you should be able to get pretty good
 throughput. You take a bit of a hit due to the overhead of sending data
 through a proxy, but with appropriate batching you can get about 2/3 the
 performance as you would get using the Java producer directly.

 There are also a few other proxies you can find here:
 https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-HTTPREST

 You can also put nginx (or HAProxy, or a variety of other solutions) in
 front of REST proxies for load balancing, HA, SSL termination, etc. This is
 yet another hop, so it might affect throughput and latency.

 -Ewen

 On Mon, Jul 27, 2015 at 6:55 AM, Prabhjot Bharaj prabhbha...@gmail.com
 wrote:

  Hi Folks,
 
  I would like to understand the best practices when using kafka as the
  source-of-truth, given the fact that I want to pump in data to Kafka
 using
  http methods.
 
  What are the current production configurations for such a use case:-
 
  1. Kafka-http-client - is it scalable the way Nginx is ??
  2. Using Kafka and Nginx together - If anybody has used this, please
  explain
  3. Any other scalable method ?
 
  Regards,
  prabcs
 



 --
 Thanks,
 Ewen




-- 
-
There are only 10 types of people in the world: Those who understand
binary, and those who don't


Re: Connection to zk shell on Kafka

2015-07-30 Thread Jiangjie Qin
This looks an issue to be fixed. I created KAFKA-2385 for this.

Thanks,

Jiangjie (Becket) Qin

On Wed, Jul 29, 2015 at 10:33 AM, Chris Barlock barl...@us.ibm.com wrote:

 I'm a user of Kafka/ZooKeeper not one of its developers, so I can't give
 you a technical explanation.  I do agree that Kafka should ship the jline
 JAR if its zookeeper-shell depends on it.

 Chris




 From:   Prabhjot Bharaj prabhbha...@gmail.com
 To: u...@zookeeper.apache.org, d...@kafka.apache.org
 Cc: users@kafka.apache.org
 Date:   07/29/2015 01:27 PM
 Subject:Re: Connection to zk shell on Kafka



 Sure. It would be great if you could as well explain the reason why the
 absence of the jar creates this problem

 Also, I'm surprised that zookeeper that comes bundled with kafka 0.8.2
 does
 not have the jline jar

 Regards,
 prabcs

 On Wed, Jul 29, 2015 at 10:45 PM, Chris Barlock barl...@us.ibm.com
 wrote:

  You need the jline JAR file that ships with ZooKeeper.
 
  Chris
 
  IBM Tivoli Systems
  Research Triangle Park, NC
  (919) 224-2240
  Internet:  barl...@us.ibm.com
 
 
 
  From:   Prabhjot Bharaj prabhbha...@gmail.com
  To: users@kafka.apache.org, u...@zookeeper.apache.org
  Date:   07/29/2015 01:13 PM
  Subject:Connection to zk shell on Kafka
 
 
 
  Hi folks,
 
  */kafka/bin# ./zookeeper-shell.sh localhost:2182/*
 
  *Connecting to localhost:2182/*
 
  *Welcome to ZooKeeper!*
 
  *JLine support is disabled*
 
 
   *WATCHER::*
 
 
   *WatchedEvent state:SyncConnected type:None path:null*
 
 
  *The shell never says connected*
 
  I'm running 5 node zookeeper cluster on 5-node kafka cluster (each kafka
  broker has 1 zookeeper server running)
 
  When I try connecting to the shell, the shell never says 'Connected'
 
 
 
  However, if I try connecting on another standalone zookeeper  which has
 no
  links to kafka, I'm able to connect:-
 
 
  */kafka/bin# /zookeeper/scripts/zkCli.sh -server 127.0.0.1:2181
  http://127.0.0.1:2181*
 
  *Connecting to 127.0.0.1:2181 http://127.0.0.1:2181*
 
  *Welcome to ZooKeeper!*
 
  *JLine support is enabled*
 
 
  *WATCHER::*
 
 
  *WatchedEvent state:SyncConnected type:None path:null*
 
  *[zk: 127.0.0.1:2181(CONNECTED) 0]*
 
  Am I missing something?
 
 
  Thanks,
 
  prabcs
 
 


 --
 -
 There are only 10 types of people in the world: Those who understand
 binary, and those who don't