Guozhang, I just did a simple test, and kafka does not seem to do what it is supposed to do: I put 20 messages numbered from 1 to 20 to a topic with 3 partitions, and throw Runtime exception on all the even numbered messages. (2, 4, 6,..)
while (it.hasNext()){ String message = new String(it.next().message()); System.out.println("message received" + message); int messageInt = Integer.parseInt(message); if(messageInt % 2 == 0){ // crash all the even numbered message throw new RuntimeException("mesasge " + message + " failed"); } }} My config is like this; props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("consumer.timeout.ms","6000"); props.put("autocommit.interval.ms", "360000"); props.put("auto.offset.reset","smallest"); I started 10 threads, but it seems that whenever I get the even numbered message, the thread crashes, then I restart them, it starts read from the next message: so in the first batch: message received1 message received2 Then I start again: message received3 message received4 As you can see, message 2 is not replayed. Is this expected? I run bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group chen_test_6 --topic test_20 -zkconnect localhost:2182, and its consistent with the testing result.(even numbered failed messages are not re retrieved) What i am missing here? Chen On Fri, Aug 8, 2014 at 1:09 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Chen, > > You can use the ConsumerOffsetChecker tool. > > http://kafka.apache.org/documentation.html#basic_ops_consumer_lag > > Guozhang > > > On Fri, Aug 8, 2014 at 12:18 PM, Chen Wang <chen.apache.s...@gmail.com> > wrote: > > > sounds like a good idea! I think i will go with the high level consumer > > then. > > Another question along with this design is that is there a way to check > the > > lag for a consumer group for a topic? Upon machine crashes and restarts, > I > > want to only continue reading from a certain topic if the lag is NOT 0. I > > know I could depend on the time out("consumer.timeout.ms") to check > > whether > > there is still data in the topic, but wondering whether there is more > > elegant way. > > Thanks much for the help, Guozhang! > > > > Chen > > > > > > On Fri, Aug 8, 2014 at 11:23 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Using simple consumer you then need to take care of consumer failure > > > detection and partition reassignment yourself. But you would have more > > > flexibility of the offsets. > > > > > > If each time processing incur errors the corresponding consumer thread > > will > > > fail also (i.e. will not be involved in the rebalance and hence commit > > > offsets) and you could live with data duplicates, then you can just > > enable > > > auto offset commits with say, 10 secs period. We usually have even > larger > > > period, like minutes. > > > > > > Guozhang > > > > > > > > > On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang <chen.apache.s...@gmail.com > > > > > wrote: > > > > > > > Maybe i could batch the messages before commit.., e.g committing > every > > 10 > > > > second.this is what the auto commit does anyway and I could live > with > > > > duplicate data. > > > > What do u think? > > > > > > > > I would then also seem to need a monitoring daemon to check the lag > to > > > > restart the consumer during machine crashes.. > > > > > > > > > > > > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang < > chen.apache.s...@gmail.com > > > > > > > wrote: > > > > > > > > > Thanks,Guozhang, > > > > > So if I switch to SimpleConsumer, will these problems be taken care > > of > > > > > already? I would assume that I will need to manage all the offset > by > > > > > myself, including the error recovery logic, right? > > > > > Chen > > > > > > > > > > > > > > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang <wangg...@gmail.com> > > > > wrote: > > > > > > > > > >> Hello Chen, > > > > >> > > > > >> 1. Manually commit offsets does have the risk of duplicates, > > consider > > > > the > > > > >> following pattern: > > > > >> > > > > >> message = consumer.next(); > > > > >> process(message); > > > > >> consumer.commit(); > > > > >> > > > > >> the rebalance can happen between line 2 and 3, where the message > has > > > > been > > > > >> processed but offset not being committed, if another consumer > picks > > up > > > > >> this > > > > >> partition after the rebalance, it may re-consume this message > again. > > > > With > > > > >> auto.commit turned on, offsets will always be committed before the > > > > >> consumers release ownership of partitions during rebalances. > > > > >> > > > > >> In the 0.9 consumer design, we have fixed this issue by > introducing > > > the > > > > >> onPartitionDeassigned callback, you can take a look at its current > > API > > > > >> here: > > > > >> > > > > >> > > > > >> > > > > > > > > > > http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html > > > > >> > > > > >> 2. Commit offsets too often does have an overhead since it is > going > > to > > > > >> Zookeeper, and ZK is not write-scalable. We are also fixing that > > issue > > > > by > > > > >> moving the offset management from ZK to kafka servers. This is > > already > > > > >> checked in trunk, and will be included in 0.8.2 release. > > > > >> > > > > >> Guozhang > > > > >> > > > > >> > > > > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang < > > chen.apache.s...@gmail.com > > > > > > > > >> wrote: > > > > >> > > > > >> > Guozhang, > > > > >> > Just to make it clear: > > > > >> > If I have 10 threads with the same consumer group id, read the > > topic > > > > T. > > > > >> The > > > > >> > auto commit is turned off, and commitOffset is called only when > > the > > > > >> message > > > > >> > is processed successfully. > > > > >> > If thread 1 dies when processing message from partition P1, and > > the > > > > last > > > > >> > offset is Offset1. Then kafka will ensure that one of the > other > > > > >> running 9 > > > > >> > threads will automatically pick up the message on partition P1 > > from > > > > >> Offset1 > > > > >> > ? will the thread have the risk of reading the same message more > > > than > > > > >> once? > > > > >> > > > > > >> > Also I would assume commit offset for each message is a bit > heavy. > > > > What > > > > >> you > > > > >> > guys usually do for error handling during reading kafka? > > > > >> > Thanks much! > > > > >> > Chen > > > > >> > > > > > >> > > > > > >> > > > > > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang < > wangg...@gmail.com > > > > > > > >> wrote: > > > > >> > > > > > >> > > Yes, in that case you can turn of auto commit and call > > > commitOffsets > > > > >> > > manually after processing is finished. commitOffsets() will > only > > > > write > > > > >> > the > > > > >> > > offset of the partitions that the consumer is currently > > fetching, > > > so > > > > >> > there > > > > >> > > is no need to coordinate this operation. > > > > >> > > > > > > >> > > > > > > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang < > > > > chen.apache.s...@gmail.com > > > > >> > > > > > >> > > wrote: > > > > >> > > > > > > >> > > > But with the auto commit turned on, I am risking off losing > > the > > > > >> failed > > > > >> > > > message, right? should I turn off the auto commit, and only > > > commit > > > > >> the > > > > >> > > > offset when the message is processed successfully..But that > > > would > > > > >> > require > > > > >> > > > the coordination between threads in order to know what is > the > > > > right > > > > >> > > timing > > > > >> > > > to commit offset.. > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang < > > > wangg...@gmail.com > > > > > > > > > >> > > wrote: > > > > >> > > > > > > > >> > > > > Hello Chen, > > > > >> > > > > > > > > >> > > > > With high-level consumer, the partition re-assignment is > > > > automatic > > > > >> > upon > > > > >> > > > > consumer failures. > > > > >> > > > > > > > > >> > > > > Guozhang > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang < > > > > >> > chen.apache.s...@gmail.com> > > > > >> > > > > wrote: > > > > >> > > > > > > > > >> > > > > > Folks, > > > > >> > > > > > I have a process started at specific time and read > from a > > > > >> specific > > > > >> > > > > topic. > > > > >> > > > > > I am currently using the High Level API(consumer group) > to > > > > read > > > > >> > from > > > > >> > > > > > kafka(and will stop once there is nothing in the topic > by > > > > >> > specifying > > > > >> > > a > > > > >> > > > > > timeout). i am most concerned about error recovery in > > > multiple > > > > >> > thread > > > > >> > > > > > context. If one thread dies, will other running bolt > > threads > > > > >> picks > > > > >> > up > > > > >> > > > the > > > > >> > > > > > failed message? Or I have to start another thread in > order > > > to > > > > >> pick > > > > >> > up > > > > >> > > > the > > > > >> > > > > > failed message? What would be a good practice to ensure > > the > > > > >> > message > > > > >> > > > can > > > > >> > > > > be > > > > >> > > > > > processed at least once? > > > > >> > > > > > > > > > >> > > > > > Note that all threads are using the same group id. > > > > >> > > > > > > > > > >> > > > > > Thanks, > > > > >> > > > > > Chen > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > -- > > > > >> > > > > -- Guozhang > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > -- > > > > >> > > -- Guozhang > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> -- > > > > >> -- Guozhang > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >