ah..my bad..didn't notice i have put two auto.commit.interval.ms in the config. After fixing it it now behaves as expected.:-) Thanks again!! Chen
On Fri, Aug 8, 2014 at 1:58 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Chen, > > Your auto.commit.interval.ms is set to 1 sec, which may be too small. > Could > you try with larger numbers, like 10000? > > Guozhang > > > On Fri, Aug 8, 2014 at 1:41 PM, Chen Wang <chen.apache.s...@gmail.com> > wrote: > > > Guozhang, > > I just did a simple test, and kafka does not seem to do what it is > supposed > > to do: > > I put 20 messages numbered from 1 to 20 to a topic with 3 partitions, and > > throw Runtime exception on all the even numbered messages. (2, 4, 6,..) > > > > while (it.hasNext()){ > > > > String message = new String(it.next().message()); > > > > System.out.println("message received" + message); > > > > int messageInt = Integer.parseInt(message); > > > > if(messageInt % 2 == 0){ > > > > // crash all the even numbered message > > > > throw new RuntimeException("mesasge " + message + " failed"); > > > > } > > > > }} > > > > My config is like this; > > > > props.put("zookeeper.connect", a_zookeeper); > > > > props.put("group.id", a_groupId); > > > > props.put("zookeeper.session.timeout.ms", "4000"); > > > > props.put("zookeeper.sync.time.ms", "200"); > > > > props.put("auto.commit.interval.ms", "1000"); > > > > props.put("consumer.timeout.ms","6000"); > > > > props.put("autocommit.interval.ms", "360000"); > > > > props.put("auto.offset.reset","smallest"); > > > > > > I started 10 threads, but it seems that whenever I get the even numbered > > message, the thread crashes, then I restart them, it starts read from the > > next message: so in the first batch: > > > > message received1 > > > > message received2 > > > > Then I start again: > > > > message received3 > > > > message received4 > > > > > > As you can see, message 2 is not replayed. Is this expected? I > > run bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group > > chen_test_6 --topic test_20 -zkconnect localhost:2182, and its consistent > > with the testing result.(even numbered failed messages are not re > > retrieved) > > > > What i am missing here? > > > > Chen > > > > > > > > > > > > > > On Fri, Aug 8, 2014 at 1:09 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Chen, > > > > > > You can use the ConsumerOffsetChecker tool. > > > > > > http://kafka.apache.org/documentation.html#basic_ops_consumer_lag > > > > > > Guozhang > > > > > > > > > On Fri, Aug 8, 2014 at 12:18 PM, Chen Wang <chen.apache.s...@gmail.com > > > > > wrote: > > > > > > > sounds like a good idea! I think i will go with the high level > consumer > > > > then. > > > > Another question along with this design is that is there a way to > check > > > the > > > > lag for a consumer group for a topic? Upon machine crashes and > > restarts, > > > I > > > > want to only continue reading from a certain topic if the lag is NOT > > 0. I > > > > know I could depend on the time out("consumer.timeout.ms") to check > > > > whether > > > > there is still data in the topic, but wondering whether there is more > > > > elegant way. > > > > Thanks much for the help, Guozhang! > > > > > > > > Chen > > > > > > > > > > > > On Fri, Aug 8, 2014 at 11:23 AM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > Using simple consumer you then need to take care of consumer > failure > > > > > detection and partition reassignment yourself. But you would have > > more > > > > > flexibility of the offsets. > > > > > > > > > > If each time processing incur errors the corresponding consumer > > thread > > > > will > > > > > fail also (i.e. will not be involved in the rebalance and hence > > commit > > > > > offsets) and you could live with data duplicates, then you can just > > > > enable > > > > > auto offset commits with say, 10 secs period. We usually have even > > > larger > > > > > period, like minutes. > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang < > > chen.apache.s...@gmail.com > > > > > > > > > wrote: > > > > > > > > > > > Maybe i could batch the messages before commit.., e.g committing > > > every > > > > 10 > > > > > > second.this is what the auto commit does anyway and I could live > > > with > > > > > > duplicate data. > > > > > > What do u think? > > > > > > > > > > > > I would then also seem to need a monitoring daemon to check the > lag > > > to > > > > > > restart the consumer during machine crashes.. > > > > > > > > > > > > > > > > > > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang < > > > chen.apache.s...@gmail.com > > > > > > > > > > > wrote: > > > > > > > > > > > > > Thanks,Guozhang, > > > > > > > So if I switch to SimpleConsumer, will these problems be taken > > care > > > > of > > > > > > > already? I would assume that I will need to manage all the > offset > > > by > > > > > > > myself, including the error recovery logic, right? > > > > > > > Chen > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang < > > wangg...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > >> Hello Chen, > > > > > > >> > > > > > > >> 1. Manually commit offsets does have the risk of duplicates, > > > > consider > > > > > > the > > > > > > >> following pattern: > > > > > > >> > > > > > > >> message = consumer.next(); > > > > > > >> process(message); > > > > > > >> consumer.commit(); > > > > > > >> > > > > > > >> the rebalance can happen between line 2 and 3, where the > message > > > has > > > > > > been > > > > > > >> processed but offset not being committed, if another consumer > > > picks > > > > up > > > > > > >> this > > > > > > >> partition after the rebalance, it may re-consume this message > > > again. > > > > > > With > > > > > > >> auto.commit turned on, offsets will always be committed before > > the > > > > > > >> consumers release ownership of partitions during rebalances. > > > > > > >> > > > > > > >> In the 0.9 consumer design, we have fixed this issue by > > > introducing > > > > > the > > > > > > >> onPartitionDeassigned callback, you can take a look at its > > current > > > > API > > > > > > >> here: > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html > > > > > > >> > > > > > > >> 2. Commit offsets too often does have an overhead since it is > > > going > > > > to > > > > > > >> Zookeeper, and ZK is not write-scalable. We are also fixing > that > > > > issue > > > > > > by > > > > > > >> moving the offset management from ZK to kafka servers. This is > > > > already > > > > > > >> checked in trunk, and will be included in 0.8.2 release. > > > > > > >> > > > > > > >> Guozhang > > > > > > >> > > > > > > >> > > > > > > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang < > > > > chen.apache.s...@gmail.com > > > > > > > > > > > > >> wrote: > > > > > > >> > > > > > > >> > Guozhang, > > > > > > >> > Just to make it clear: > > > > > > >> > If I have 10 threads with the same consumer group id, read > the > > > > topic > > > > > > T. > > > > > > >> The > > > > > > >> > auto commit is turned off, and commitOffset is called only > > when > > > > the > > > > > > >> message > > > > > > >> > is processed successfully. > > > > > > >> > If thread 1 dies when processing message from partition P1, > > and > > > > the > > > > > > last > > > > > > >> > offset is Offset1. Then kafka will ensure that one of the > > > other > > > > > > >> running 9 > > > > > > >> > threads will automatically pick up the message on partition > P1 > > > > from > > > > > > >> Offset1 > > > > > > >> > ? will the thread have the risk of reading the same message > > more > > > > > than > > > > > > >> once? > > > > > > >> > > > > > > > >> > Also I would assume commit offset for each message is a bit > > > heavy. > > > > > > What > > > > > > >> you > > > > > > >> > guys usually do for error handling during reading kafka? > > > > > > >> > Thanks much! > > > > > > >> > Chen > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang < > > > wangg...@gmail.com > > > > > > > > > > > >> wrote: > > > > > > >> > > > > > > > >> > > Yes, in that case you can turn of auto commit and call > > > > > commitOffsets > > > > > > >> > > manually after processing is finished. commitOffsets() > will > > > only > > > > > > write > > > > > > >> > the > > > > > > >> > > offset of the partitions that the consumer is currently > > > > fetching, > > > > > so > > > > > > >> > there > > > > > > >> > > is no need to coordinate this operation. > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang < > > > > > > chen.apache.s...@gmail.com > > > > > > >> > > > > > > > >> > > wrote: > > > > > > >> > > > > > > > > >> > > > But with the auto commit turned on, I am risking off > > losing > > > > the > > > > > > >> failed > > > > > > >> > > > message, right? should I turn off the auto commit, and > > only > > > > > commit > > > > > > >> the > > > > > > >> > > > offset when the message is processed successfully..But > > that > > > > > would > > > > > > >> > require > > > > > > >> > > > the coordination between threads in order to know what > is > > > the > > > > > > right > > > > > > >> > > timing > > > > > > >> > > > to commit offset.. > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang < > > > > > wangg...@gmail.com > > > > > > > > > > > > > >> > > wrote: > > > > > > >> > > > > > > > > > >> > > > > Hello Chen, > > > > > > >> > > > > > > > > > > >> > > > > With high-level consumer, the partition re-assignment > is > > > > > > automatic > > > > > > >> > upon > > > > > > >> > > > > consumer failures. > > > > > > >> > > > > > > > > > > >> > > > > Guozhang > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang < > > > > > > >> > chen.apache.s...@gmail.com> > > > > > > >> > > > > wrote: > > > > > > >> > > > > > > > > > > >> > > > > > Folks, > > > > > > >> > > > > > I have a process started at specific time and read > > > from a > > > > > > >> specific > > > > > > >> > > > > topic. > > > > > > >> > > > > > I am currently using the High Level API(consumer > > group) > > > to > > > > > > read > > > > > > >> > from > > > > > > >> > > > > > kafka(and will stop once there is nothing in the > topic > > > by > > > > > > >> > specifying > > > > > > >> > > a > > > > > > >> > > > > > timeout). i am most concerned about error recovery > in > > > > > multiple > > > > > > >> > thread > > > > > > >> > > > > > context. If one thread dies, will other running bolt > > > > threads > > > > > > >> picks > > > > > > >> > up > > > > > > >> > > > the > > > > > > >> > > > > > failed message? Or I have to start another thread in > > > order > > > > > to > > > > > > >> pick > > > > > > >> > up > > > > > > >> > > > the > > > > > > >> > > > > > failed message? What would be a good practice to > > ensure > > > > the > > > > > > >> > message > > > > > > >> > > > can > > > > > > >> > > > > be > > > > > > >> > > > > > processed at least once? > > > > > > >> > > > > > > > > > > > >> > > > > > Note that all threads are using the same group id. > > > > > > >> > > > > > > > > > > > >> > > > > > Thanks, > > > > > > >> > > > > > Chen > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > -- > > > > > > >> > > > > -- Guozhang > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > -- > > > > > > >> > > -- Guozhang > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> -- > > > > > > >> -- Guozhang > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >