We do not have java implementation of the operational tools yet. Guozhang
On Fri, Aug 8, 2014 at 4:29 PM, Chen Wang <chen.apache.s...@gmail.com> wrote: > Guozhang, > Just curious, do you guys already have a java version of the > ConsumerOffsetChecker > > https://github.com/apache/kafka/blob/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala > so that I could use it in my storm topology? > Chen > > > On Fri, Aug 8, 2014 at 2:03 PM, Chen Wang <chen.apache.s...@gmail.com> > wrote: > > > ah..my bad..didn't notice i have put two auto.commit.interval.ms in the > > config. After fixing it it now behaves as expected.:-) > > Thanks again!! > > Chen > > > > > > > > On Fri, Aug 8, 2014 at 1:58 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > >> Chen, > >> > >> Your auto.commit.interval.ms is set to 1 sec, which may be too small. > >> Could > >> you try with larger numbers, like 10000? > >> > >> Guozhang > >> > >> > >> On Fri, Aug 8, 2014 at 1:41 PM, Chen Wang <chen.apache.s...@gmail.com> > >> wrote: > >> > >> > Guozhang, > >> > I just did a simple test, and kafka does not seem to do what it is > >> supposed > >> > to do: > >> > I put 20 messages numbered from 1 to 20 to a topic with 3 partitions, > >> and > >> > throw Runtime exception on all the even numbered messages. (2, 4, > 6,..) > >> > > >> > while (it.hasNext()){ > >> > > >> > String message = new String(it.next().message()); > >> > > >> > System.out.println("message received" + message); > >> > > >> > int messageInt = Integer.parseInt(message); > >> > > >> > if(messageInt % 2 == 0){ > >> > > >> > // crash all the even numbered message > >> > > >> > throw new RuntimeException("mesasge " + message + " failed"); > >> > > >> > } > >> > > >> > }} > >> > > >> > My config is like this; > >> > > >> > props.put("zookeeper.connect", a_zookeeper); > >> > > >> > props.put("group.id", a_groupId); > >> > > >> > props.put("zookeeper.session.timeout.ms", "4000"); > >> > > >> > props.put("zookeeper.sync.time.ms", "200"); > >> > > >> > props.put("auto.commit.interval.ms", "1000"); > >> > > >> > props.put("consumer.timeout.ms","6000"); > >> > > >> > props.put("autocommit.interval.ms", "360000"); > >> > > >> > props.put("auto.offset.reset","smallest"); > >> > > >> > > >> > I started 10 threads, but it seems that whenever I get the even > numbered > >> > message, the thread crashes, then I restart them, it starts read from > >> the > >> > next message: so in the first batch: > >> > > >> > message received1 > >> > > >> > message received2 > >> > > >> > Then I start again: > >> > > >> > message received3 > >> > > >> > message received4 > >> > > >> > > >> > As you can see, message 2 is not replayed. Is this expected? I > >> > run bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group > >> > chen_test_6 --topic test_20 -zkconnect localhost:2182, and its > >> consistent > >> > with the testing result.(even numbered failed messages are not re > >> > retrieved) > >> > > >> > What i am missing here? > >> > > >> > Chen > >> > > >> > > >> > > >> > > >> > > >> > > >> > On Fri, Aug 8, 2014 at 1:09 PM, Guozhang Wang <wangg...@gmail.com> > >> wrote: > >> > > >> > > Chen, > >> > > > >> > > You can use the ConsumerOffsetChecker tool. > >> > > > >> > > http://kafka.apache.org/documentation.html#basic_ops_consumer_lag > >> > > > >> > > Guozhang > >> > > > >> > > > >> > > On Fri, Aug 8, 2014 at 12:18 PM, Chen Wang < > >> chen.apache.s...@gmail.com> > >> > > wrote: > >> > > > >> > > > sounds like a good idea! I think i will go with the high level > >> consumer > >> > > > then. > >> > > > Another question along with this design is that is there a way to > >> check > >> > > the > >> > > > lag for a consumer group for a topic? Upon machine crashes and > >> > restarts, > >> > > I > >> > > > want to only continue reading from a certain topic if the lag is > NOT > >> > 0. I > >> > > > know I could depend on the time out("consumer.timeout.ms") to > check > >> > > > whether > >> > > > there is still data in the topic, but wondering whether there is > >> more > >> > > > elegant way. > >> > > > Thanks much for the help, Guozhang! > >> > > > > >> > > > Chen > >> > > > > >> > > > > >> > > > On Fri, Aug 8, 2014 at 11:23 AM, Guozhang Wang < > wangg...@gmail.com> > >> > > wrote: > >> > > > > >> > > > > Using simple consumer you then need to take care of consumer > >> failure > >> > > > > detection and partition reassignment yourself. But you would > have > >> > more > >> > > > > flexibility of the offsets. > >> > > > > > >> > > > > If each time processing incur errors the corresponding consumer > >> > thread > >> > > > will > >> > > > > fail also (i.e. will not be involved in the rebalance and hence > >> > commit > >> > > > > offsets) and you could live with data duplicates, then you can > >> just > >> > > > enable > >> > > > > auto offset commits with say, 10 secs period. We usually have > even > >> > > larger > >> > > > > period, like minutes. > >> > > > > > >> > > > > Guozhang > >> > > > > > >> > > > > > >> > > > > On Fri, Aug 8, 2014 at 11:11 AM, Chen Wang < > >> > chen.apache.s...@gmail.com > >> > > > > >> > > > > wrote: > >> > > > > > >> > > > > > Maybe i could batch the messages before commit.., e.g > committing > >> > > every > >> > > > 10 > >> > > > > > second.this is what the auto commit does anyway and I could > >> live > >> > > with > >> > > > > > duplicate data. > >> > > > > > What do u think? > >> > > > > > > >> > > > > > I would then also seem to need a monitoring daemon to check > the > >> lag > >> > > to > >> > > > > > restart the consumer during machine crashes.. > >> > > > > > > >> > > > > > > >> > > > > > On Fri, Aug 8, 2014 at 10:40 AM, Chen Wang < > >> > > chen.apache.s...@gmail.com > >> > > > > > >> > > > > > wrote: > >> > > > > > > >> > > > > > > Thanks,Guozhang, > >> > > > > > > So if I switch to SimpleConsumer, will these problems be > taken > >> > care > >> > > > of > >> > > > > > > already? I would assume that I will need to manage all the > >> offset > >> > > by > >> > > > > > > myself, including the error recovery logic, right? > >> > > > > > > Chen > >> > > > > > > > >> > > > > > > > >> > > > > > > On Fri, Aug 8, 2014 at 8:05 AM, Guozhang Wang < > >> > wangg...@gmail.com> > >> > > > > > wrote: > >> > > > > > > > >> > > > > > >> Hello Chen, > >> > > > > > >> > >> > > > > > >> 1. Manually commit offsets does have the risk of > duplicates, > >> > > > consider > >> > > > > > the > >> > > > > > >> following pattern: > >> > > > > > >> > >> > > > > > >> message = consumer.next(); > >> > > > > > >> process(message); > >> > > > > > >> consumer.commit(); > >> > > > > > >> > >> > > > > > >> the rebalance can happen between line 2 and 3, where the > >> message > >> > > has > >> > > > > > been > >> > > > > > >> processed but offset not being committed, if another > consumer > >> > > picks > >> > > > up > >> > > > > > >> this > >> > > > > > >> partition after the rebalance, it may re-consume this > message > >> > > again. > >> > > > > > With > >> > > > > > >> auto.commit turned on, offsets will always be committed > >> before > >> > the > >> > > > > > >> consumers release ownership of partitions during > rebalances. > >> > > > > > >> > >> > > > > > >> In the 0.9 consumer design, we have fixed this issue by > >> > > introducing > >> > > > > the > >> > > > > > >> onPartitionDeassigned callback, you can take a look at its > >> > current > >> > > > API > >> > > > > > >> here: > >> > > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > http://people.apache.org/~nehanarkhede/kafka-0.9-producer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html > >> > > > > > >> > >> > > > > > >> 2. Commit offsets too often does have an overhead since it > is > >> > > going > >> > > > to > >> > > > > > >> Zookeeper, and ZK is not write-scalable. We are also fixing > >> that > >> > > > issue > >> > > > > > by > >> > > > > > >> moving the offset management from ZK to kafka servers. This > >> is > >> > > > already > >> > > > > > >> checked in trunk, and will be included in 0.8.2 release. > >> > > > > > >> > >> > > > > > >> Guozhang > >> > > > > > >> > >> > > > > > >> > >> > > > > > >> On Thu, Aug 7, 2014 at 5:36 PM, Chen Wang < > >> > > > chen.apache.s...@gmail.com > >> > > > > > > >> > > > > > >> wrote: > >> > > > > > >> > >> > > > > > >> > Guozhang, > >> > > > > > >> > Just to make it clear: > >> > > > > > >> > If I have 10 threads with the same consumer group id, > read > >> the > >> > > > topic > >> > > > > > T. > >> > > > > > >> The > >> > > > > > >> > auto commit is turned off, and commitOffset is called > only > >> > when > >> > > > the > >> > > > > > >> message > >> > > > > > >> > is processed successfully. > >> > > > > > >> > If thread 1 dies when processing message from partition > P1, > >> > and > >> > > > the > >> > > > > > last > >> > > > > > >> > offset is Offset1. Then kafka will ensure that one of > the > >> > > other > >> > > > > > >> running 9 > >> > > > > > >> > threads will automatically pick up the message on > >> partition P1 > >> > > > from > >> > > > > > >> Offset1 > >> > > > > > >> > ? will the thread have the risk of reading the same > message > >> > more > >> > > > > than > >> > > > > > >> once? > >> > > > > > >> > > >> > > > > > >> > Also I would assume commit offset for each message is a > bit > >> > > heavy. > >> > > > > > What > >> > > > > > >> you > >> > > > > > >> > guys usually do for error handling during reading kafka? > >> > > > > > >> > Thanks much! > >> > > > > > >> > Chen > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > On Thu, Aug 7, 2014 at 5:18 PM, Guozhang Wang < > >> > > wangg...@gmail.com > >> > > > > > >> > > > > > >> wrote: > >> > > > > > >> > > >> > > > > > >> > > Yes, in that case you can turn of auto commit and call > >> > > > > commitOffsets > >> > > > > > >> > > manually after processing is finished. commitOffsets() > >> will > >> > > only > >> > > > > > write > >> > > > > > >> > the > >> > > > > > >> > > offset of the partitions that the consumer is currently > >> > > > fetching, > >> > > > > so > >> > > > > > >> > there > >> > > > > > >> > > is no need to coordinate this operation. > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > >> > > On Thu, Aug 7, 2014 at 5:03 PM, Chen Wang < > >> > > > > > chen.apache.s...@gmail.com > >> > > > > > >> > > >> > > > > > >> > > wrote: > >> > > > > > >> > > > >> > > > > > >> > > > But with the auto commit turned on, I am risking off > >> > losing > >> > > > the > >> > > > > > >> failed > >> > > > > > >> > > > message, right? should I turn off the auto commit, > and > >> > only > >> > > > > commit > >> > > > > > >> the > >> > > > > > >> > > > offset when the message is processed > successfully..But > >> > that > >> > > > > would > >> > > > > > >> > require > >> > > > > > >> > > > the coordination between threads in order to know > what > >> is > >> > > the > >> > > > > > right > >> > > > > > >> > > timing > >> > > > > > >> > > > to commit offset.. > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > On Thu, Aug 7, 2014 at 4:54 PM, Guozhang Wang < > >> > > > > wangg...@gmail.com > >> > > > > > > > >> > > > > > >> > > wrote: > >> > > > > > >> > > > > >> > > > > > >> > > > > Hello Chen, > >> > > > > > >> > > > > > >> > > > > > >> > > > > With high-level consumer, the partition > >> re-assignment is > >> > > > > > automatic > >> > > > > > >> > upon > >> > > > > > >> > > > > consumer failures. > >> > > > > > >> > > > > > >> > > > > > >> > > > > Guozhang > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > On Thu, Aug 7, 2014 at 4:41 PM, Chen Wang < > >> > > > > > >> > chen.apache.s...@gmail.com> > >> > > > > > >> > > > > wrote: > >> > > > > > >> > > > > > >> > > > > > >> > > > > > Folks, > >> > > > > > >> > > > > > I have a process started at specific time and > read > >> > > from a > >> > > > > > >> specific > >> > > > > > >> > > > > topic. > >> > > > > > >> > > > > > I am currently using the High Level API(consumer > >> > group) > >> > > to > >> > > > > > read > >> > > > > > >> > from > >> > > > > > >> > > > > > kafka(and will stop once there is nothing in the > >> topic > >> > > by > >> > > > > > >> > specifying > >> > > > > > >> > > a > >> > > > > > >> > > > > > timeout). i am most concerned about error > recovery > >> in > >> > > > > multiple > >> > > > > > >> > thread > >> > > > > > >> > > > > > context. If one thread dies, will other running > >> bolt > >> > > > threads > >> > > > > > >> picks > >> > > > > > >> > up > >> > > > > > >> > > > the > >> > > > > > >> > > > > > failed message? Or I have to start another thread > >> in > >> > > order > >> > > > > to > >> > > > > > >> pick > >> > > > > > >> > up > >> > > > > > >> > > > the > >> > > > > > >> > > > > > failed message? What would be a good practice to > >> > ensure > >> > > > the > >> > > > > > >> > message > >> > > > > > >> > > > can > >> > > > > > >> > > > > be > >> > > > > > >> > > > > > processed at least once? > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > Note that all threads are using the same group > id. > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > Thanks, > >> > > > > > >> > > > > > Chen > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > -- > >> > > > > > >> > > > > -- Guozhang > >> > > > > > >> > > > > > >> > > > > > >> > > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > >> > > -- > >> > > > > > >> > > -- Guozhang > >> > > > > > >> > > > >> > > > > > >> > > >> > > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > > >> -- > >> > > > > > >> -- Guozhang > >> > > > > > >> > >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > -- > >> > > > > -- Guozhang > >> > > > > > >> > > > > >> > > > >> > > > >> > > > >> > > -- > >> > > -- Guozhang > >> > > > >> > > >> > >> > >> > >> -- > >> -- Guozhang > >> > > > > > -- -- Guozhang