Guozhang, Just curious, do you guys already have a java version of the ConsumerOffsetChecker https://github.com/apache/kafka/blob/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala so that I could use it in my storm topology? Chen
On Fri, Aug 8, 2014 at 2:03 PM, Chen Wang <chen.apache.s...@gmail.com> wrote: > ah..my bad..didn't notice i have put two auto.commit.interval.ms in the > config. After fixing it it now behaves as expected.:-) > Thanks again!! > Chen > > > > On Fri, Aug 8, 2014 at 1:58 PM, Guozhang Wang <wangg...@gmail.com> wrote: > >> Chen, >> >> Your auto.commit.interval.ms is set to 1 sec, which may be too small. >> Could >> you try with larger numbers, like 10000? >> >> Guozhang >> >> >> On Fri, Aug 8, 2014 at 1:41 PM, Chen Wang <chen.apache.s...@gmail.com> >> wrote: >> >> > Guozhang, >> > I just did a simple test, and kafka does not seem to do what it is >> supposed >> > to do: >> > I put 20 messages numbered from 1 to 20 to a topic with 3 partitions, >> and >> > throw Runtime exception on all the even numbered messages. (2, 4, 6,..) >> > >> > while (it.hasNext()){ >> > >> > String message = new String(it.next().message()); >> > >> > System.out.println("message received" + message); >> > >> > int messageInt = Integer.parseInt(message); >> > >> > if(messageInt % 2 == 0){ >> > >> > // crash all the even numbered message >> > >> > throw new RuntimeException("mesasge " + message + " failed"); >> > >> > } >> > >> > }} >> > >> > My config is like this; >> > >> > props.put("zookeeper.connect", a_zookeeper); >> > >> > props.put("group.id", a_groupId); >> > >> > props.put("zookeeper.session.timeout.ms", "4000"); >> > >> > props.put("zookeeper.sync.time.ms", "200"); >> > >> > props.put("auto.commit.interval.ms", "1000"); >> > >> > props.put("consumer.timeout.ms","6000"); >> > >> > props.put("autocommit.interval.ms", "360000"); >> > >> > props.put("auto.offset.reset","smallest"); >> > >> > >> > I started 10 threads, but it seems that whenever I get the even numbered >> > message, the thread crashes, then I restart them, it starts read from >> the >> > next message: so in the first batch: >> > >> > message received1 >> > >> > message received2 >> > >> > Then I start again: >> > >> > message received3 >> > >> > message received4 >> > >> > >> > As you can see, message 2 is not replayed. Is this expected? I >> > run bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group >> > chen_test_6 --topic test_20 -zkconnect localhost:2182, and its >> consistent >> > with the testing result.(even numbered failed messages are not re >> > retrieved) >> > >> > What i am missing here? >> > >> > Chen >> > >> > >> > >> > >> > >> > >> > On Fri, Aug 8, 2014 at 1:09 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> > >> > > Chen, >> > > >> > > You can use the ConsumerOffsetChecker tool. >> > > >> > > http://kafka.apache.org/documentation.html#basic_ops_consumer_lag >> > > >> > > Guozhang >> > > >> > > >> > > On Fri, Aug 8, 2014 at 12:18 PM, Chen Wang < >> chen.apache.s...@gmail.com> >> > > wrote: >> > > >> > > > sounds like a good idea! I think i will go with the high level >> consumer >> > > > then. >> > > > Another question along with this design is that is there a way to >> check >> > > the >> > > > lag for a consumer group for a topic? Upon machine crashes and >> > restarts, >> > > I >> > > > want to only continue reading from a certain topic if the lag is NOT >> > 0. I >> > > > know I could depend on the time out("consumer.timeout.ms") to check >> > > > whether >> > > > there is still data in the topic, but wondering whether there is >> more >> > > > elegant way. >> > > > Thanks much for the help, Guozhang! >> > > > >> > > > Chen >> > > > >> > > > >> > > > On Fri, Aug 8, 2014 at 11:23 AM, Guozhang Wang <wangg...@gmail.com> >> > > wrote: >> > > > >> > > > > Using simple consumer you then need to take care of consumer >> failure >> > > > > detection and partition reassignment yourself. But you would have >> > more >> > > > > flexibility of the offsets. >> > > > > >> > > > > If each time processing incur errors the corresponding consumer >> > thread >> > > > will >> > > > > fail also (i.e. will not be involved in the rebalance and hence >> > commit >> > > > > offsets) and you could live with data duplicates, then you can >> just >> > > > enable >> > > > > auto offset commits with say, 10 secs period. We usually have even >> > > larger >> > > > > period, like minutes. >> > > > > >> > > > > Guozhang >> > > > > >> > > > > >> > > > > On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang < >> > chen.apache.s...@gmail.com >> > > > >> > > > > wrote: >> > > > > >> > > > > > Maybe i could batch the messages before commit.., e.g committing >> > > every >> > > > 10 >> > > > > > second.this is what the auto commit does anyway and I could >> live >> > > with >> > > > > > duplicate data. >> > > > > > What do u think? >> > > > > > >> > > > > > I would then also seem to need a monitoring daemon to check the >> lag >> > > to >> > > > > > restart the consumer during machine crashes.. >> > > > > > >> > > > > > >> > > > > > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang < >> > > chen.apache.s...@gmail.com >> > > > > >> > > > > > wrote: >> > > > > > >> > > > > > > Thanks,Guozhang, >> > > > > > > So if I switch to SimpleConsumer, will these problems be taken >> > care >> > > > of >> > > > > > > already? I would assume that I will need to manage all the >> offset >> > > by >> > > > > > > myself, including the error recovery logic, right? >> > > > > > > Chen >> > > > > > > >> > > > > > > >> > > > > > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang < >> > wangg...@gmail.com> >> > > > > > wrote: >> > > > > > > >> > > > > > >> Hello Chen, >> > > > > > >> >> > > > > > >> 1. Manually commit offsets does have the risk of duplicates, >> > > > consider >> > > > > > the >> > > > > > >> following pattern: >> > > > > > >> >> > > > > > >> message = consumer.next(); >> > > > > > >> process(message); >> > > > > > >> consumer.commit(); >> > > > > > >> >> > > > > > >> the rebalance can happen between line 2 and 3, where the >> message >> > > has >> > > > > > been >> > > > > > >> processed but offset not being committed, if another consumer >> > > picks >> > > > up >> > > > > > >> this >> > > > > > >> partition after the rebalance, it may re-consume this message >> > > again. >> > > > > > With >> > > > > > >> auto.commit turned on, offsets will always be committed >> before >> > the >> > > > > > >> consumers release ownership of partitions during rebalances. >> > > > > > >> >> > > > > > >> In the 0.9 consumer design, we have fixed this issue by >> > > introducing >> > > > > the >> > > > > > >> onPartitionDeassigned callback, you can take a look at its >> > current >> > > > API >> > > > > > >> here: >> > > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html >> > > > > > >> >> > > > > > >> 2. Commit offsets too often does have an overhead since it is >> > > going >> > > > to >> > > > > > >> Zookeeper, and ZK is not write-scalable. We are also fixing >> that >> > > > issue >> > > > > > by >> > > > > > >> moving the offset management from ZK to kafka servers. This >> is >> > > > already >> > > > > > >> checked in trunk, and will be included in 0.8.2 release. >> > > > > > >> >> > > > > > >> Guozhang >> > > > > > >> >> > > > > > >> >> > > > > > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang < >> > > > chen.apache.s...@gmail.com >> > > > > > >> > > > > > >> wrote: >> > > > > > >> >> > > > > > >> > Guozhang, >> > > > > > >> > Just to make it clear: >> > > > > > >> > If I have 10 threads with the same consumer group id, read >> the >> > > > topic >> > > > > > T. >> > > > > > >> The >> > > > > > >> > auto commit is turned off, and commitOffset is called only >> > when >> > > > the >> > > > > > >> message >> > > > > > >> > is processed successfully. >> > > > > > >> > If thread 1 dies when processing message from partition P1, >> > and >> > > > the >> > > > > > last >> > > > > > >> > offset is Offset1. Then kafka will ensure that one of the >> > > other >> > > > > > >> running 9 >> > > > > > >> > threads will automatically pick up the message on >> partition P1 >> > > > from >> > > > > > >> Offset1 >> > > > > > >> > ? will the thread have the risk of reading the same message >> > more >> > > > > than >> > > > > > >> once? >> > > > > > >> > >> > > > > > >> > Also I would assume commit offset for each message is a bit >> > > heavy. >> > > > > > What >> > > > > > >> you >> > > > > > >> > guys usually do for error handling during reading kafka? >> > > > > > >> > Thanks much! >> > > > > > >> > Chen >> > > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang < >> > > wangg...@gmail.com >> > > > > >> > > > > > >> wrote: >> > > > > > >> > >> > > > > > >> > > Yes, in that case you can turn of auto commit and call >> > > > > commitOffsets >> > > > > > >> > > manually after processing is finished. commitOffsets() >> will >> > > only >> > > > > > write >> > > > > > >> > the >> > > > > > >> > > offset of the partitions that the consumer is currently >> > > > fetching, >> > > > > so >> > > > > > >> > there >> > > > > > >> > > is no need to coordinate this operation. >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang < >> > > > > > chen.apache.s...@gmail.com >> > > > > > >> > >> > > > > > >> > > wrote: >> > > > > > >> > > >> > > > > > >> > > > But with the auto commit turned on, I am risking off >> > losing >> > > > the >> > > > > > >> failed >> > > > > > >> > > > message, right? should I turn off the auto commit, and >> > only >> > > > > commit >> > > > > > >> the >> > > > > > >> > > > offset when the message is processed successfully..But >> > that >> > > > > would >> > > > > > >> > require >> > > > > > >> > > > the coordination between threads in order to know what >> is >> > > the >> > > > > > right >> > > > > > >> > > timing >> > > > > > >> > > > to commit offset.. >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang < >> > > > > wangg...@gmail.com >> > > > > > > >> > > > > > >> > > wrote: >> > > > > > >> > > > >> > > > > > >> > > > > Hello Chen, >> > > > > > >> > > > > >> > > > > > >> > > > > With high-level consumer, the partition >> re-assignment is >> > > > > > automatic >> > > > > > >> > upon >> > > > > > >> > > > > consumer failures. >> > > > > > >> > > > > >> > > > > > >> > > > > Guozhang >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang < >> > > > > > >> > chen.apache.s...@gmail.com> >> > > > > > >> > > > > wrote: >> > > > > > >> > > > > >> > > > > > >> > > > > > Folks, >> > > > > > >> > > > > > I have a process started at specific time and read >> > > from a >> > > > > > >> specific >> > > > > > >> > > > > topic. >> > > > > > >> > > > > > I am currently using the High Level API(consumer >> > group) >> > > to >> > > > > > read >> > > > > > >> > from >> > > > > > >> > > > > > kafka(and will stop once there is nothing in the >> topic >> > > by >> > > > > > >> > specifying >> > > > > > >> > > a >> > > > > > >> > > > > > timeout). i am most concerned about error recovery >> in >> > > > > multiple >> > > > > > >> > thread >> > > > > > >> > > > > > context. If one thread dies, will other running >> bolt >> > > > threads >> > > > > > >> picks >> > > > > > >> > up >> > > > > > >> > > > the >> > > > > > >> > > > > > failed message? Or I have to start another thread >> in >> > > order >> > > > > to >> > > > > > >> pick >> > > > > > >> > up >> > > > > > >> > > > the >> > > > > > >> > > > > > failed message? What would be a good practice to >> > ensure >> > > > the >> > > > > > >> > message >> > > > > > >> > > > can >> > > > > > >> > > > > be >> > > > > > >> > > > > > processed at least once? >> > > > > > >> > > > > > >> > > > > > >> > > > > > Note that all threads are using the same group id. >> > > > > > >> > > > > > >> > > > > > >> > > > > > Thanks, >> > > > > > >> > > > > > Chen >> > > > > > >> > > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > -- >> > > > > > >> > > > > -- Guozhang >> > > > > > >> > > > > >> > > > > > >> > > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > -- >> > > > > > >> > > -- Guozhang >> > > > > > >> > > >> > > > > > >> > >> > > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > > >> -- >> > > > > > >> -- Guozhang >> > > > > > >> >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > > -- >> > > > > -- Guozhang >> > > > > >> > > > >> > > >> > > >> > > >> > > -- >> > > -- Guozhang >> > > >> > >> >> >> >> -- >> -- Guozhang >> > >