I created KAFKA-1951 to improve the offset checker to let it print offsets
stored in both zookeeper and Kafka.
I don¹t think we have a way to wipe out offset topic in Kafka for a
consumer.

‹Jiangjie (Becket) Qin

On 2/12/15, 9:59 PM, "tao xiao" <xiaotao...@gmail.com> wrote:

>Thanks for the explanation. It there a way that I can wipe out the offset
>stored in kafka so that the checker can continue to work again?
>
>On Fri, Feb 13, 2015 at 1:31 PM, Jiangjie Qin <j...@linkedin.com.invalid>
>wrote:
>
>> I think this is the offset checker bug.
>> The offset checker will
>> 1. first check if the offset exists in offset topic on broker or not.
>> 2. If it is on broker then it will just return that offset.
>> 3. Otherwise it goes to zookeeper.
>>
>> So the problem you saw was actually following this logic.
>> After dual commit, offset topic already had the offsets for this
>>consumer
>> and topic.
>> Then you switched to zookeeper commit.
>> Because the offset topic has the offsets already, offset checker will
>>use
>> that and skip checking zookeeper. So the offset will not change anymore
>> because you are no longer committing to offset topic on broker, while
>> offset checker always use that offset.
>>
>> On 2/12/15, 7:30 PM, "tao xiao" <xiaotao...@gmail.com> wrote:
>>
>> >I used the one shipped with 0.8.2. It is pretty straightforward to
>> >reproduce the issue.
>> >
>> >Here are the steps to reproduce:
>> >1. I have a consumer using high level consumer API with initial
>>settings
>> >offsets.storage=kafka and dual.commit.enabled=false.
>> >2. After consuming messages for a while shutdown the consumer and
>>change
>> >setting dual.commit.enabled=true
>> >3. bounce the consumer and run for while. The lag looks good
>> >4. change setting offsets.storage=zookeeper and bounce the consumer.
>> >Starting from now the lag remain unchanged
>> >
>> >On Fri, Feb 13, 2015 at 11:01 AM, Joel Koshy <jjkosh...@gmail.com>
>>wrote:
>> >
>> >> That is weird. Are you by any chance running an older version of the
>> >> offset checker? Is this straightforward to reproduce?
>> >>
>> >> On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote:
>> >> > Joel,
>> >> >
>> >> > No, the metric was not increasing. It was 0 all the time.
>> >> >
>> >> > On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy <jjkosh...@gmail.com>
>> >> wrote:
>> >> >
>> >> > > Actually I meant to say check that is not increasing.
>> >> > >
>> >> > > On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote:
>> >> > > > Possibly a bug - can you also look at the MaxLag mbean in the
>> >> consumer
>> >> > > > to verify that the maxlag is zero?
>> >> > > >
>> >> > > > On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote:
>> >> > > > > Hi Joel,
>> >> > > > >
>> >> > > > > When I set dual.commit.enabled=true the count value of both
>> >> metrics got
>> >> > > > > increased. After I set offsets.storage=zookeeper only
>> >> > > ZooKeeperCommitsPerSec
>> >> > > > > changed but not KafkaCommitsPerSec. I think this is expected
>>as
>> >> kafka
>> >> > > > > offset storage was turned off.
>> >> > > > >
>> >> > > > > But when I looked up the consumer lag via
>> >> > > kafka.tools.ConsumerOffsetChecker
>> >> > > > > the lag still remained unchanged.
>> >> > > > >
>> >> > > > > I scanned through the source code of ConsumerOffsetChecker it
>> >> doesn't
>> >> > > > > check the offset in zk unless offsetFetchResponse returns
>> >>NoOffset.
>> >> > > Since
>> >> > > > > the consumer used kafka as the offset storage before I don't
>> >>think
>> >> > > > > offsetFetchResponse would return NoOffset
>> >> > > > >
>> >> > > > >  offsetFetchResponse.requestInfo.foreach { case
>> >>(topicAndPartition,
>> >> > > > > offsetAndMetadata) =>
>> >> > > > >
>> >> > > > >         if (offsetAndMetadata ==
>> >>OffsetMetadataAndError.NoOffset) {
>> >> > > > >
>> >> > > > >           val topicDirs = new ZKGroupTopicDirs(group,
>> >> > > topicAndPartition.
>> >> > > > > topic)
>> >> > > > >
>> >> > > > >           // this group may not have migrated off zookeeper
>>for
>> >> offsets
>> >> > > > > storage (we don't expose the dual-commit option in this tool
>> >> > > > >
>> >> > > > >           // (meaning the lag may be off until all the
>>consumers
>> >> in the
>> >> > > > > group have the same setting for offsets storage)
>> >> > > > >
>> >> > > > >           try {
>> >> > > > >
>> >> > > > >             val offset = ZkUtils.readData(zkClient,
>> >> > > topicDirs.consumerOffsetDir
>> >> > > > > + "/%d".format(topicAndPartition.partition))._1.toLong
>> >> > > > >
>> >> > > > >             offsetMap.put(topicAndPartition, offset)
>> >> > > > >
>> >> > > > >           } catch {
>> >> > > > >
>> >> > > > >             case z: ZkNoNodeException =>
>> >> > > > >
>> >> > > > >
>> >> > >  if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))
>> >> > > > >
>> >> > > > >                 offsetMap.put(topicAndPartition,-1)
>> >> > > > >
>> >> > > > >               else
>> >> > > > >
>> >> > > > >                 throw z
>> >> > > > >
>> >> > > > >           }
>> >> > > > >
>> >> > > > >         }
>> >> > > > >
>> >> > > > >         else if (offsetAndMetadata.error ==
>> >>ErrorMapping.NoError)
>> >> > > > >
>> >> > > > >           offsetMap.put(topicAndPartition,
>> >> offsetAndMetadata.offset)
>> >> > > > >
>> >> > > > >         else {
>> >> > > > >
>> >> > > > >           println("Could not fetch offset for %s due to
>> >> %s.".format(
>> >> > > > > topicAndPartition,
>> >> ErrorMapping.exceptionFor(offsetAndMetadata.error)))
>> >> > > > >
>> >> > > > >         }
>> >> > > > >
>> >> > > > >       }
>> >> > > > >
>> >> > > > > On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy
>> >><jjkosh...@gmail.com>
>> >> > > wrote:
>> >> > > > >
>> >> > > > > > There are mbeans named KafkaCommitsPerSec and
>> >> ZooKeeperCommitsPerSec
>> >> > > -
>> >> > > > > > can you look those up and see what they report?
>> >> > > > > >
>> >> > > > > > On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote:
>> >> > > > > > > Hi team,
>> >> > > > > > >
>> >> > > > > > > I was trying to migrate my consumer offset from kafka to
>> >> zookeeper.
>> >> > > > > > >
>> >> > > > > > > Here is the original settings of my consumer
>> >> > > > > > >
>> >> > > > > > > props.put("offsets.storage", "kafka");
>> >> > > > > > >
>> >> > > > > > > props.put("dual.commit.enabled", "false");
>> >> > > > > > > Here is the steps
>> >> > > > > > >
>> >> > > > > > > 1. set dual.commit.enabled=true
>> >> > > > > > > 2. restart my consumer and monitor offset lag with
>> >> > > > > > > kafka.tools.ConsumerOffsetChecker
>> >> > > > > > > 3. set offsets.storage=zookeeper
>> >> > > > > > > 4. restart my consumer and monitor offset lag with
>> >> > > > > > > kafka.tools.ConsumerOffsetChecker
>> >> > > > > > >
>> >> > > > > > > After step 4 my consumer was able to continually consume
>> >>data
>> >> from
>> >> > > topic
>> >> > > > > > > but the offset lag remained unchanged. Did I do anything
>> >>wrong?
>> >> > > > > > >
>> >> > > > > > > --
>> >> > > > > > > Regards,
>> >> > > > > > > Tao
>> >> > > > > >
>> >> > > > > >
>> >> > > > >
>> >> > > > >
>> >> > > > > --
>> >> > > > > Regards,
>> >> > > > > Tao
>> >> > > >
>> >> > >
>> >> > >
>> >> >
>> >> >
>> >> > --
>> >> > Regards,
>> >> > Tao
>> >>
>> >>
>> >
>> >
>> >--
>> >Regards,
>> >Tao
>>
>>
>
>
>-- 
>Regards,
>Tao

Reply via email to