I created KAFKA-1951 to improve the offset checker to let it print offsets stored in both zookeeper and Kafka. I don¹t think we have a way to wipe out offset topic in Kafka for a consumer.
‹Jiangjie (Becket) Qin On 2/12/15, 9:59 PM, "tao xiao" <[email protected]> wrote: >Thanks for the explanation. It there a way that I can wipe out the offset >stored in kafka so that the checker can continue to work again? > >On Fri, Feb 13, 2015 at 1:31 PM, Jiangjie Qin <[email protected]> >wrote: > >> I think this is the offset checker bug. >> The offset checker will >> 1. first check if the offset exists in offset topic on broker or not. >> 2. If it is on broker then it will just return that offset. >> 3. Otherwise it goes to zookeeper. >> >> So the problem you saw was actually following this logic. >> After dual commit, offset topic already had the offsets for this >>consumer >> and topic. >> Then you switched to zookeeper commit. >> Because the offset topic has the offsets already, offset checker will >>use >> that and skip checking zookeeper. So the offset will not change anymore >> because you are no longer committing to offset topic on broker, while >> offset checker always use that offset. >> >> On 2/12/15, 7:30 PM, "tao xiao" <[email protected]> wrote: >> >> >I used the one shipped with 0.8.2. It is pretty straightforward to >> >reproduce the issue. >> > >> >Here are the steps to reproduce: >> >1. I have a consumer using high level consumer API with initial >>settings >> >offsets.storage=kafka and dual.commit.enabled=false. >> >2. After consuming messages for a while shutdown the consumer and >>change >> >setting dual.commit.enabled=true >> >3. bounce the consumer and run for while. The lag looks good >> >4. change setting offsets.storage=zookeeper and bounce the consumer. >> >Starting from now the lag remain unchanged >> > >> >On Fri, Feb 13, 2015 at 11:01 AM, Joel Koshy <[email protected]> >>wrote: >> > >> >> That is weird. Are you by any chance running an older version of the >> >> offset checker? Is this straightforward to reproduce? >> >> >> >> On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote: >> >> > Joel, >> >> > >> >> > No, the metric was not increasing. It was 0 all the time. >> >> > >> >> > On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy <[email protected]> >> >> wrote: >> >> > >> >> > > Actually I meant to say check that is not increasing. >> >> > > >> >> > > On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote: >> >> > > > Possibly a bug - can you also look at the MaxLag mbean in the >> >> consumer >> >> > > > to verify that the maxlag is zero? >> >> > > > >> >> > > > On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote: >> >> > > > > Hi Joel, >> >> > > > > >> >> > > > > When I set dual.commit.enabled=true the count value of both >> >> metrics got >> >> > > > > increased. After I set offsets.storage=zookeeper only >> >> > > ZooKeeperCommitsPerSec >> >> > > > > changed but not KafkaCommitsPerSec. I think this is expected >>as >> >> kafka >> >> > > > > offset storage was turned off. >> >> > > > > >> >> > > > > But when I looked up the consumer lag via >> >> > > kafka.tools.ConsumerOffsetChecker >> >> > > > > the lag still remained unchanged. >> >> > > > > >> >> > > > > I scanned through the source code of ConsumerOffsetChecker it >> >> doesn't >> >> > > > > check the offset in zk unless offsetFetchResponse returns >> >>NoOffset. >> >> > > Since >> >> > > > > the consumer used kafka as the offset storage before I don't >> >>think >> >> > > > > offsetFetchResponse would return NoOffset >> >> > > > > >> >> > > > > offsetFetchResponse.requestInfo.foreach { case >> >>(topicAndPartition, >> >> > > > > offsetAndMetadata) => >> >> > > > > >> >> > > > > if (offsetAndMetadata == >> >>OffsetMetadataAndError.NoOffset) { >> >> > > > > >> >> > > > > val topicDirs = new ZKGroupTopicDirs(group, >> >> > > topicAndPartition. >> >> > > > > topic) >> >> > > > > >> >> > > > > // this group may not have migrated off zookeeper >>for >> >> offsets >> >> > > > > storage (we don't expose the dual-commit option in this tool >> >> > > > > >> >> > > > > // (meaning the lag may be off until all the >>consumers >> >> in the >> >> > > > > group have the same setting for offsets storage) >> >> > > > > >> >> > > > > try { >> >> > > > > >> >> > > > > val offset = ZkUtils.readData(zkClient, >> >> > > topicDirs.consumerOffsetDir >> >> > > > > + "/%d".format(topicAndPartition.partition))._1.toLong >> >> > > > > >> >> > > > > offsetMap.put(topicAndPartition, offset) >> >> > > > > >> >> > > > > } catch { >> >> > > > > >> >> > > > > case z: ZkNoNodeException => >> >> > > > > >> >> > > > > >> >> > > if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir)) >> >> > > > > >> >> > > > > offsetMap.put(topicAndPartition,-1) >> >> > > > > >> >> > > > > else >> >> > > > > >> >> > > > > throw z >> >> > > > > >> >> > > > > } >> >> > > > > >> >> > > > > } >> >> > > > > >> >> > > > > else if (offsetAndMetadata.error == >> >>ErrorMapping.NoError) >> >> > > > > >> >> > > > > offsetMap.put(topicAndPartition, >> >> offsetAndMetadata.offset) >> >> > > > > >> >> > > > > else { >> >> > > > > >> >> > > > > println("Could not fetch offset for %s due to >> >> %s.".format( >> >> > > > > topicAndPartition, >> >> ErrorMapping.exceptionFor(offsetAndMetadata.error))) >> >> > > > > >> >> > > > > } >> >> > > > > >> >> > > > > } >> >> > > > > >> >> > > > > On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy >> >><[email protected]> >> >> > > wrote: >> >> > > > > >> >> > > > > > There are mbeans named KafkaCommitsPerSec and >> >> ZooKeeperCommitsPerSec >> >> > > - >> >> > > > > > can you look those up and see what they report? >> >> > > > > > >> >> > > > > > On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote: >> >> > > > > > > Hi team, >> >> > > > > > > >> >> > > > > > > I was trying to migrate my consumer offset from kafka to >> >> zookeeper. >> >> > > > > > > >> >> > > > > > > Here is the original settings of my consumer >> >> > > > > > > >> >> > > > > > > props.put("offsets.storage", "kafka"); >> >> > > > > > > >> >> > > > > > > props.put("dual.commit.enabled", "false"); >> >> > > > > > > Here is the steps >> >> > > > > > > >> >> > > > > > > 1. set dual.commit.enabled=true >> >> > > > > > > 2. restart my consumer and monitor offset lag with >> >> > > > > > > kafka.tools.ConsumerOffsetChecker >> >> > > > > > > 3. set offsets.storage=zookeeper >> >> > > > > > > 4. restart my consumer and monitor offset lag with >> >> > > > > > > kafka.tools.ConsumerOffsetChecker >> >> > > > > > > >> >> > > > > > > After step 4 my consumer was able to continually consume >> >>data >> >> from >> >> > > topic >> >> > > > > > > but the offset lag remained unchanged. Did I do anything >> >>wrong? >> >> > > > > > > >> >> > > > > > > -- >> >> > > > > > > Regards, >> >> > > > > > > Tao >> >> > > > > > >> >> > > > > > >> >> > > > > >> >> > > > > >> >> > > > > -- >> >> > > > > Regards, >> >> > > > > Tao >> >> > > > >> >> > > >> >> > > >> >> > >> >> > >> >> > -- >> >> > Regards, >> >> > Tao >> >> >> >> >> > >> > >> >-- >> >Regards, >> >Tao >> >> > > >-- >Regards, >Tao
