Chen, You can use the ConsumerOffsetChecker tool.
http://kafka.apache.org/documentation.html#basic_ops_consumer_lag Guozhang On Fri, Aug 8, 2014 at 12:18 PM, Chen Wang <chen.apache.s...@gmail.com> wrote: > sounds like a good idea! I think i will go with the high level consumer > then. > Another question along with this design is that is there a way to check the > lag for a consumer group for a topic? Upon machine crashes and restarts, I > want to only continue reading from a certain topic if the lag is NOT 0. I > know I could depend on the time out("consumer.timeout.ms") to check > whether > there is still data in the topic, but wondering whether there is more > elegant way. > Thanks much for the help, Guozhang! > > Chen > > > On Fri, Aug 8, 2014 at 11:23 AM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Using simple consumer you then need to take care of consumer failure > > detection and partition reassignment yourself. But you would have more > > flexibility of the offsets. > > > > If each time processing incur errors the corresponding consumer thread > will > > fail also (i.e. will not be involved in the rebalance and hence commit > > offsets) and you could live with data duplicates, then you can just > enable > > auto offset commits with say, 10 secs period. We usually have even larger > > period, like minutes. > > > > Guozhang > > > > > > On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang <chen.apache.s...@gmail.com> > > wrote: > > > > > Maybe i could batch the messages before commit.., e.g committing every > 10 > > > second.this is what the auto commit does anyway and I could live with > > > duplicate data. > > > What do u think? > > > > > > I would then also seem to need a monitoring daemon to check the lag to > > > restart the consumer during machine crashes.. > > > > > > > > > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang <chen.apache.s...@gmail.com > > > > > wrote: > > > > > > > Thanks,Guozhang, > > > > So if I switch to SimpleConsumer, will these problems be taken care > of > > > > already? I would assume that I will need to manage all the offset by > > > > myself, including the error recovery logic, right? > > > > Chen > > > > > > > > > > > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > >> Hello Chen, > > > >> > > > >> 1. Manually commit offsets does have the risk of duplicates, > consider > > > the > > > >> following pattern: > > > >> > > > >> message = consumer.next(); > > > >> process(message); > > > >> consumer.commit(); > > > >> > > > >> the rebalance can happen between line 2 and 3, where the message has > > > been > > > >> processed but offset not being committed, if another consumer picks > up > > > >> this > > > >> partition after the rebalance, it may re-consume this message again. > > > With > > > >> auto.commit turned on, offsets will always be committed before the > > > >> consumers release ownership of partitions during rebalances. > > > >> > > > >> In the 0.9 consumer design, we have fixed this issue by introducing > > the > > > >> onPartitionDeassigned callback, you can take a look at its current > API > > > >> here: > > > >> > > > >> > > > >> > > > > > > http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html > > > >> > > > >> 2. Commit offsets too often does have an overhead since it is going > to > > > >> Zookeeper, and ZK is not write-scalable. We are also fixing that > issue > > > by > > > >> moving the offset management from ZK to kafka servers. This is > already > > > >> checked in trunk, and will be included in 0.8.2 release. > > > >> > > > >> Guozhang > > > >> > > > >> > > > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang < > chen.apache.s...@gmail.com > > > > > > >> wrote: > > > >> > > > >> > Guozhang, > > > >> > Just to make it clear: > > > >> > If I have 10 threads with the same consumer group id, read the > topic > > > T. > > > >> The > > > >> > auto commit is turned off, and commitOffset is called only when > the > > > >> message > > > >> > is processed successfully. > > > >> > If thread 1 dies when processing message from partition P1, and > the > > > last > > > >> > offset is Offset1. Then kafka will ensure that one of the other > > > >> running 9 > > > >> > threads will automatically pick up the message on partition P1 > from > > > >> Offset1 > > > >> > ? will the thread have the risk of reading the same message more > > than > > > >> once? > > > >> > > > > >> > Also I would assume commit offset for each message is a bit heavy. > > > What > > > >> you > > > >> > guys usually do for error handling during reading kafka? > > > >> > Thanks much! > > > >> > Chen > > > >> > > > > >> > > > > >> > > > > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang <wangg...@gmail.com > > > > > >> wrote: > > > >> > > > > >> > > Yes, in that case you can turn of auto commit and call > > commitOffsets > > > >> > > manually after processing is finished. commitOffsets() will only > > > write > > > >> > the > > > >> > > offset of the partitions that the consumer is currently > fetching, > > so > > > >> > there > > > >> > > is no need to coordinate this operation. > > > >> > > > > > >> > > > > > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang < > > > chen.apache.s...@gmail.com > > > >> > > > > >> > > wrote: > > > >> > > > > > >> > > > But with the auto commit turned on, I am risking off losing > the > > > >> failed > > > >> > > > message, right? should I turn off the auto commit, and only > > commit > > > >> the > > > >> > > > offset when the message is processed successfully..But that > > would > > > >> > require > > > >> > > > the coordination between threads in order to know what is the > > > right > > > >> > > timing > > > >> > > > to commit offset.. > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang < > > wangg...@gmail.com > > > > > > > >> > > wrote: > > > >> > > > > > > >> > > > > Hello Chen, > > > >> > > > > > > > >> > > > > With high-level consumer, the partition re-assignment is > > > automatic > > > >> > upon > > > >> > > > > consumer failures. > > > >> > > > > > > > >> > > > > Guozhang > > > >> > > > > > > > >> > > > > > > > >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang < > > > >> > chen.apache.s...@gmail.com> > > > >> > > > > wrote: > > > >> > > > > > > > >> > > > > > Folks, > > > >> > > > > > I have a process started at specific time and read from a > > > >> specific > > > >> > > > > topic. > > > >> > > > > > I am currently using the High Level API(consumer group) to > > > read > > > >> > from > > > >> > > > > > kafka(and will stop once there is nothing in the topic by > > > >> > specifying > > > >> > > a > > > >> > > > > > timeout). i am most concerned about error recovery in > > multiple > > > >> > thread > > > >> > > > > > context. If one thread dies, will other running bolt > threads > > > >> picks > > > >> > up > > > >> > > > the > > > >> > > > > > failed message? Or I have to start another thread in order > > to > > > >> pick > > > >> > up > > > >> > > > the > > > >> > > > > > failed message? What would be a good practice to ensure > the > > > >> > message > > > >> > > > can > > > >> > > > > be > > > >> > > > > > processed at least once? > > > >> > > > > > > > > >> > > > > > Note that all threads are using the same group id. > > > >> > > > > > > > > >> > > > > > Thanks, > > > >> > > > > > Chen > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > -- > > > >> > > > > -- Guozhang > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > -- > > > >> > > -- Guozhang > > > >> > > > > > >> > > > > >> > > > >> > > > >> > > > >> -- > > > >> -- Guozhang > > > >> > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang