Thanks Jiangjie for your help On Sat, Feb 14, 2015 at 5:59 AM, Joel Koshy <jjkosh...@gmail.com> wrote:
> Thanks for looking into that! > > On Fri, Feb 13, 2015 at 05:31:39AM +0000, Jiangjie Qin wrote: > > I think this is the offset checker bug. > > The offset checker will > > 1. first check if the offset exists in offset topic on broker or not. > > 2. If it is on broker then it will just return that offset. > > 3. Otherwise it goes to zookeeper. > > > > So the problem you saw was actually following this logic. > > After dual commit, offset topic already had the offsets for this consumer > > and topic. > > Then you switched to zookeeper commit. > > Because the offset topic has the offsets already, offset checker will use > > that and skip checking zookeeper. So the offset will not change anymore > > because you are no longer committing to offset topic on broker, while > > offset checker always use that offset. > > > > On 2/12/15, 7:30 PM, "tao xiao" <xiaotao...@gmail.com> wrote: > > > > >I used the one shipped with 0.8.2. It is pretty straightforward to > > >reproduce the issue. > > > > > >Here are the steps to reproduce: > > >1. I have a consumer using high level consumer API with initial settings > > >offsets.storage=kafka and dual.commit.enabled=false. > > >2. After consuming messages for a while shutdown the consumer and change > > >setting dual.commit.enabled=true > > >3. bounce the consumer and run for while. The lag looks good > > >4. change setting offsets.storage=zookeeper and bounce the consumer. > > >Starting from now the lag remain unchanged > > > > > >On Fri, Feb 13, 2015 at 11:01 AM, Joel Koshy <jjkosh...@gmail.com> > wrote: > > > > > >> That is weird. Are you by any chance running an older version of the > > >> offset checker? Is this straightforward to reproduce? > > >> > > >> On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote: > > >> > Joel, > > >> > > > >> > No, the metric was not increasing. It was 0 all the time. > > >> > > > >> > On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy <jjkosh...@gmail.com> > > >> wrote: > > >> > > > >> > > Actually I meant to say check that is not increasing. > > >> > > > > >> > > On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote: > > >> > > > Possibly a bug - can you also look at the MaxLag mbean in the > > >> consumer > > >> > > > to verify that the maxlag is zero? > > >> > > > > > >> > > > On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote: > > >> > > > > Hi Joel, > > >> > > > > > > >> > > > > When I set dual.commit.enabled=true the count value of both > > >> metrics got > > >> > > > > increased. After I set offsets.storage=zookeeper only > > >> > > ZooKeeperCommitsPerSec > > >> > > > > changed but not KafkaCommitsPerSec. I think this is expected > as > > >> kafka > > >> > > > > offset storage was turned off. > > >> > > > > > > >> > > > > But when I looked up the consumer lag via > > >> > > kafka.tools.ConsumerOffsetChecker > > >> > > > > the lag still remained unchanged. > > >> > > > > > > >> > > > > I scanned through the source code of ConsumerOffsetChecker it > > >> doesn't > > >> > > > > check the offset in zk unless offsetFetchResponse returns > > >>NoOffset. > > >> > > Since > > >> > > > > the consumer used kafka as the offset storage before I don't > > >>think > > >> > > > > offsetFetchResponse would return NoOffset > > >> > > > > > > >> > > > > offsetFetchResponse.requestInfo.foreach { case > > >>(topicAndPartition, > > >> > > > > offsetAndMetadata) => > > >> > > > > > > >> > > > > if (offsetAndMetadata == > > >>OffsetMetadataAndError.NoOffset) { > > >> > > > > > > >> > > > > val topicDirs = new ZKGroupTopicDirs(group, > > >> > > topicAndPartition. > > >> > > > > topic) > > >> > > > > > > >> > > > > // this group may not have migrated off zookeeper > for > > >> offsets > > >> > > > > storage (we don't expose the dual-commit option in this tool > > >> > > > > > > >> > > > > // (meaning the lag may be off until all the > consumers > > >> in the > > >> > > > > group have the same setting for offsets storage) > > >> > > > > > > >> > > > > try { > > >> > > > > > > >> > > > > val offset = ZkUtils.readData(zkClient, > > >> > > topicDirs.consumerOffsetDir > > >> > > > > + "/%d".format(topicAndPartition.partition))._1.toLong > > >> > > > > > > >> > > > > offsetMap.put(topicAndPartition, offset) > > >> > > > > > > >> > > > > } catch { > > >> > > > > > > >> > > > > case z: ZkNoNodeException => > > >> > > > > > > >> > > > > > > >> > > if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir)) > > >> > > > > > > >> > > > > offsetMap.put(topicAndPartition,-1) > > >> > > > > > > >> > > > > else > > >> > > > > > > >> > > > > throw z > > >> > > > > > > >> > > > > } > > >> > > > > > > >> > > > > } > > >> > > > > > > >> > > > > else if (offsetAndMetadata.error == > > >>ErrorMapping.NoError) > > >> > > > > > > >> > > > > offsetMap.put(topicAndPartition, > > >> offsetAndMetadata.offset) > > >> > > > > > > >> > > > > else { > > >> > > > > > > >> > > > > println("Could not fetch offset for %s due to > > >> %s.".format( > > >> > > > > topicAndPartition, > > >> ErrorMapping.exceptionFor(offsetAndMetadata.error))) > > >> > > > > > > >> > > > > } > > >> > > > > > > >> > > > > } > > >> > > > > > > >> > > > > On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy > > >><jjkosh...@gmail.com> > > >> > > wrote: > > >> > > > > > > >> > > > > > There are mbeans named KafkaCommitsPerSec and > > >> ZooKeeperCommitsPerSec > > >> > > - > > >> > > > > > can you look those up and see what they report? > > >> > > > > > > > >> > > > > > On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote: > > >> > > > > > > Hi team, > > >> > > > > > > > > >> > > > > > > I was trying to migrate my consumer offset from kafka to > > >> zookeeper. > > >> > > > > > > > > >> > > > > > > Here is the original settings of my consumer > > >> > > > > > > > > >> > > > > > > props.put("offsets.storage", "kafka"); > > >> > > > > > > > > >> > > > > > > props.put("dual.commit.enabled", "false"); > > >> > > > > > > Here is the steps > > >> > > > > > > > > >> > > > > > > 1. set dual.commit.enabled=true > > >> > > > > > > 2. restart my consumer and monitor offset lag with > > >> > > > > > > kafka.tools.ConsumerOffsetChecker > > >> > > > > > > 3. set offsets.storage=zookeeper > > >> > > > > > > 4. restart my consumer and monitor offset lag with > > >> > > > > > > kafka.tools.ConsumerOffsetChecker > > >> > > > > > > > > >> > > > > > > After step 4 my consumer was able to continually consume > > >>data > > >> from > > >> > > topic > > >> > > > > > > but the offset lag remained unchanged. Did I do anything > > >>wrong? > > >> > > > > > > > > >> > > > > > > -- > > >> > > > > > > Regards, > > >> > > > > > > Tao > > >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > -- > > >> > > > > Regards, > > >> > > > > Tao > > >> > > > > > >> > > > > >> > > > > >> > > > >> > > > >> > -- > > >> > Regards, > > >> > Tao > > >> > > >> > > > > > > > > >-- > > >Regards, > > >Tao > > > > -- Regards, Tao