Using simple consumer you then need to take care of consumer failure detection and partition reassignment yourself. But you would have more flexibility of the offsets.
If each time processing incur errors the corresponding consumer thread will fail also (i.e. will not be involved in the rebalance and hence commit offsets) and you could live with data duplicates, then you can just enable auto offset commits with say, 10 secs period. We usually have even larger period, like minutes. Guozhang On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang <chen.apache.s...@gmail.com> wrote: > Maybe i could batch the messages before commit.., e.g committing every 10 > second.this is what the auto commit does anyway and I could live with > duplicate data. > What do u think? > > I would then also seem to need a monitoring daemon to check the lag to > restart the consumer during machine crashes.. > > > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <chen.apache.s...@gmail.com> > wrote: > > > Thanks,Guozhang, > > So if I switch to SimpleConsumer, will these problems be taken care of > > already? I would assume that I will need to manage all the offset by > > myself, including the error recovery logic, right? > > Chen > > > > > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > >> Hello Chen, > >> > >> 1. Manually commit offsets does have the risk of duplicates, consider > the > >> following pattern: > >> > >> message = consumer.next(); > >> process(message); > >> consumer.commit(); > >> > >> the rebalance can happen between line 2 and 3, where the message has > been > >> processed but offset not being committed, if another consumer picks up > >> this > >> partition after the rebalance, it may re-consume this message again. > With > >> auto.commit turned on, offsets will always be committed before the > >> consumers release ownership of partitions during rebalances. > >> > >> In the 0.9 consumer design, we have fixed this issue by introducing the > >> onPartitionDeassigned callback, you can take a look at its current API > >> here: > >> > >> > >> > http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html > >> > >> 2. Commit offsets too often does have an overhead since it is going to > >> Zookeeper, and ZK is not write-scalable. We are also fixing that issue > by > >> moving the offset management from ZK to kafka servers. This is already > >> checked in trunk, and will be included in 0.8.2 release. > >> > >> Guozhang > >> > >> > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang <chen.apache.s...@gmail.com> > >> wrote: > >> > >> > Guozhang, > >> > Just to make it clear: > >> > If I have 10 threads with the same consumer group id, read the topic > T. > >> The > >> > auto commit is turned off, and commitOffset is called only when the > >> message > >> > is processed successfully. > >> > If thread 1 dies when processing message from partition P1, and the > last > >> > offset is Offset1. Then kafka will ensure that one of the other > >> running 9 > >> > threads will automatically pick up the message on partition P1 from > >> Offset1 > >> > ? will the thread have the risk of reading the same message more than > >> once? > >> > > >> > Also I would assume commit offset for each message is a bit heavy. > What > >> you > >> > guys usually do for error handling during reading kafka? > >> > Thanks much! > >> > Chen > >> > > >> > > >> > > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <wangg...@gmail.com> > >> wrote: > >> > > >> > > Yes, in that case you can turn of auto commit and call commitOffsets > >> > > manually after processing is finished. commitOffsets() will only > write > >> > the > >> > > offset of the partitions that the consumer is currently fetching, so > >> > there > >> > > is no need to coordinate this operation. > >> > > > >> > > > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang < > chen.apache.s...@gmail.com > >> > > >> > > wrote: > >> > > > >> > > > But with the auto commit turned on, I am risking off losing the > >> failed > >> > > > message, right? should I turn off the auto commit, and only commit > >> the > >> > > > offset when the message is processed successfully..But that would > >> > require > >> > > > the coordination between threads in order to know what is the > right > >> > > timing > >> > > > to commit offset.. > >> > > > > >> > > > > >> > > > > >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang <wangg...@gmail.com > > > >> > > wrote: > >> > > > > >> > > > > Hello Chen, > >> > > > > > >> > > > > With high-level consumer, the partition re-assignment is > automatic > >> > upon > >> > > > > consumer failures. > >> > > > > > >> > > > > Guozhang > >> > > > > > >> > > > > > >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang < > >> > chen.apache.s...@gmail.com> > >> > > > > wrote: > >> > > > > > >> > > > > > Folks, > >> > > > > > I have a process started at specific time and read from a > >> specific > >> > > > > topic. > >> > > > > > I am currently using the High Level API(consumer group) to > read > >> > from > >> > > > > > kafka(and will stop once there is nothing in the topic by > >> > specifying > >> > > a > >> > > > > > timeout). i am most concerned about error recovery in multiple > >> > thread > >> > > > > > context. If one thread dies, will other running bolt threads > >> picks > >> > up > >> > > > the > >> > > > > > failed message? Or I have to start another thread in order to > >> pick > >> > up > >> > > > the > >> > > > > > failed message? What would be a good practice to ensure the > >> > message > >> > > > can > >> > > > > be > >> > > > > > processed at least once? > >> > > > > > > >> > > > > > Note that all threads are using the same group id. > >> > > > > > > >> > > > > > Thanks, > >> > > > > > Chen > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > -- > >> > > > > -- Guozhang > >> > > > > > >> > > > > >> > > > >> > > > >> > > > >> > > -- > >> > > -- Guozhang > >> > > > >> > > >> > >> > >> > >> -- > >> -- Guozhang > >> > > > > > -- -- Guozhang