Re: offset migration from kafka to zookeeper

2015-02-14 Thread tao xiao
Thanks Jiangjie for your help

On Sat, Feb 14, 2015 at 5:59 AM, Joel Koshy jjkosh...@gmail.com wrote:

 Thanks for looking into that!

 On Fri, Feb 13, 2015 at 05:31:39AM +, Jiangjie Qin wrote:
  I think this is the offset checker bug.
  The offset checker will
  1. first check if the offset exists in offset topic on broker or not.
  2. If it is on broker then it will just return that offset.
  3. Otherwise it goes to zookeeper.
 
  So the problem you saw was actually following this logic.
  After dual commit, offset topic already had the offsets for this consumer
  and topic.
  Then you switched to zookeeper commit.
  Because the offset topic has the offsets already, offset checker will use
  that and skip checking zookeeper. So the offset will not change anymore
  because you are no longer committing to offset topic on broker, while
  offset checker always use that offset.
 
  On 2/12/15, 7:30 PM, tao xiao xiaotao...@gmail.com wrote:
 
  I used the one shipped with 0.8.2. It is pretty straightforward to
  reproduce the issue.
  
  Here are the steps to reproduce:
  1. I have a consumer using high level consumer API with initial settings
  offsets.storage=kafka and dual.commit.enabled=false.
  2. After consuming messages for a while shutdown the consumer and change
  setting dual.commit.enabled=true
  3. bounce the consumer and run for while. The lag looks good
  4. change setting offsets.storage=zookeeper and bounce the consumer.
  Starting from now the lag remain unchanged
  
  On Fri, Feb 13, 2015 at 11:01 AM, Joel Koshy jjkosh...@gmail.com
 wrote:
  
   That is weird. Are you by any chance running an older version of the
   offset checker? Is this straightforward to reproduce?
  
   On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote:
Joel,
   
No, the metric was not increasing. It was 0 all the time.
   
On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy jjkosh...@gmail.com
   wrote:
   
 Actually I meant to say check that is not increasing.

 On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote:
  Possibly a bug - can you also look at the MaxLag mbean in the
   consumer
  to verify that the maxlag is zero?
 
  On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote:
   Hi Joel,
  
   When I set dual.commit.enabled=true the count value of both
   metrics got
   increased. After I set offsets.storage=zookeeper only
 ZooKeeperCommitsPerSec
   changed but not KafkaCommitsPerSec. I think this is expected
 as
   kafka
   offset storage was turned off.
  
   But when I looked up the consumer lag via
 kafka.tools.ConsumerOffsetChecker
   the lag still remained unchanged.
  
   I scanned through the source code of ConsumerOffsetChecker it
   doesn't
   check the offset in zk unless offsetFetchResponse returns
  NoOffset.
 Since
   the consumer used kafka as the offset storage before I don't
  think
   offsetFetchResponse would return NoOffset
  
offsetFetchResponse.requestInfo.foreach { case
  (topicAndPartition,
   offsetAndMetadata) =
  
   if (offsetAndMetadata ==
  OffsetMetadataAndError.NoOffset) {
  
 val topicDirs = new ZKGroupTopicDirs(group,
 topicAndPartition.
   topic)
  
 // this group may not have migrated off zookeeper
 for
   offsets
   storage (we don't expose the dual-commit option in this tool
  
 // (meaning the lag may be off until all the
 consumers
   in the
   group have the same setting for offsets storage)
  
 try {
  
   val offset = ZkUtils.readData(zkClient,
 topicDirs.consumerOffsetDir
   + /%d.format(topicAndPartition.partition))._1.toLong
  
   offsetMap.put(topicAndPartition, offset)
  
 } catch {
  
   case z: ZkNoNodeException =
  
  
  if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))
  
   offsetMap.put(topicAndPartition,-1)
  
 else
  
   throw z
  
 }
  
   }
  
   else if (offsetAndMetadata.error ==
  ErrorMapping.NoError)
  
 offsetMap.put(topicAndPartition,
   offsetAndMetadata.offset)
  
   else {
  
 println(Could not fetch offset for %s due to
   %s..format(
   topicAndPartition,
   ErrorMapping.exceptionFor(offsetAndMetadata.error)))
  
   }
  
 }
  
   On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy
  jjkosh...@gmail.com
 wrote:
  
There are mbeans named KafkaCommitsPerSec and
   ZooKeeperCommitsPerSec
 -
can you look those up and see what they report?
   
On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote:
 Hi team,

 I was 

Re: offset migration from kafka to zookeeper

2015-02-12 Thread Joel Koshy
That is weird. Are you by any chance running an older version of the
offset checker? Is this straightforward to reproduce?

On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote:
 Joel,
 
 No, the metric was not increasing. It was 0 all the time.
 
 On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy jjkosh...@gmail.com wrote:
 
  Actually I meant to say check that is not increasing.
 
  On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote:
   Possibly a bug - can you also look at the MaxLag mbean in the consumer
   to verify that the maxlag is zero?
  
   On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote:
Hi Joel,
   
When I set dual.commit.enabled=true the count value of both metrics got
increased. After I set offsets.storage=zookeeper only
  ZooKeeperCommitsPerSec
changed but not KafkaCommitsPerSec. I think this is expected as kafka
offset storage was turned off.
   
But when I looked up the consumer lag via
  kafka.tools.ConsumerOffsetChecker
the lag still remained unchanged.
   
I scanned through the source code of ConsumerOffsetChecker it  doesn't
check the offset in zk unless offsetFetchResponse returns NoOffset.
  Since
the consumer used kafka as the offset storage before I don't think
offsetFetchResponse would return NoOffset
   
 offsetFetchResponse.requestInfo.foreach { case (topicAndPartition,
offsetAndMetadata) =
   
if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
   
  val topicDirs = new ZKGroupTopicDirs(group,
  topicAndPartition.
topic)
   
  // this group may not have migrated off zookeeper for offsets
storage (we don't expose the dual-commit option in this tool
   
  // (meaning the lag may be off until all the consumers in the
group have the same setting for offsets storage)
   
  try {
   
val offset = ZkUtils.readData(zkClient,
  topicDirs.consumerOffsetDir
+ /%d.format(topicAndPartition.partition))._1.toLong
   
offsetMap.put(topicAndPartition, offset)
   
  } catch {
   
case z: ZkNoNodeException =
   
   
   if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))
   
offsetMap.put(topicAndPartition,-1)
   
  else
   
throw z
   
  }
   
}
   
else if (offsetAndMetadata.error == ErrorMapping.NoError)
   
  offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
   
else {
   
  println(Could not fetch offset for %s due to %s..format(
topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
   
}
   
  }
   
On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy jjkosh...@gmail.com
  wrote:
   
 There are mbeans named KafkaCommitsPerSec and ZooKeeperCommitsPerSec
  -
 can you look those up and see what they report?

 On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote:
  Hi team,
 
  I was trying to migrate my consumer offset from kafka to zookeeper.
 
  Here is the original settings of my consumer
 
  props.put(offsets.storage, kafka);
 
  props.put(dual.commit.enabled, false);
  Here is the steps
 
  1. set dual.commit.enabled=true
  2. restart my consumer and monitor offset lag with
  kafka.tools.ConsumerOffsetChecker
  3. set offsets.storage=zookeeper
  4. restart my consumer and monitor offset lag with
  kafka.tools.ConsumerOffsetChecker
 
  After step 4 my consumer was able to continually consume data from
  topic
  but the offset lag remained unchanged. Did I do anything wrong?
 
  --
  Regards,
  Tao


   
   
--
Regards,
Tao
  
 
 
 
 
 -- 
 Regards,
 Tao



Re: offset migration from kafka to zookeeper

2015-02-12 Thread tao xiao
Thanks for the explanation. It there a way that I can wipe out the offset
stored in kafka so that the checker can continue to work again?

On Fri, Feb 13, 2015 at 1:31 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 I think this is the offset checker bug.
 The offset checker will
 1. first check if the offset exists in offset topic on broker or not.
 2. If it is on broker then it will just return that offset.
 3. Otherwise it goes to zookeeper.

 So the problem you saw was actually following this logic.
 After dual commit, offset topic already had the offsets for this consumer
 and topic.
 Then you switched to zookeeper commit.
 Because the offset topic has the offsets already, offset checker will use
 that and skip checking zookeeper. So the offset will not change anymore
 because you are no longer committing to offset topic on broker, while
 offset checker always use that offset.

 On 2/12/15, 7:30 PM, tao xiao xiaotao...@gmail.com wrote:

 I used the one shipped with 0.8.2. It is pretty straightforward to
 reproduce the issue.
 
 Here are the steps to reproduce:
 1. I have a consumer using high level consumer API with initial settings
 offsets.storage=kafka and dual.commit.enabled=false.
 2. After consuming messages for a while shutdown the consumer and change
 setting dual.commit.enabled=true
 3. bounce the consumer and run for while. The lag looks good
 4. change setting offsets.storage=zookeeper and bounce the consumer.
 Starting from now the lag remain unchanged
 
 On Fri, Feb 13, 2015 at 11:01 AM, Joel Koshy jjkosh...@gmail.com wrote:
 
  That is weird. Are you by any chance running an older version of the
  offset checker? Is this straightforward to reproduce?
 
  On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote:
   Joel,
  
   No, the metric was not increasing. It was 0 all the time.
  
   On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy jjkosh...@gmail.com
  wrote:
  
Actually I meant to say check that is not increasing.
   
On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote:
 Possibly a bug - can you also look at the MaxLag mbean in the
  consumer
 to verify that the maxlag is zero?

 On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote:
  Hi Joel,
 
  When I set dual.commit.enabled=true the count value of both
  metrics got
  increased. After I set offsets.storage=zookeeper only
ZooKeeperCommitsPerSec
  changed but not KafkaCommitsPerSec. I think this is expected as
  kafka
  offset storage was turned off.
 
  But when I looked up the consumer lag via
kafka.tools.ConsumerOffsetChecker
  the lag still remained unchanged.
 
  I scanned through the source code of ConsumerOffsetChecker it
  doesn't
  check the offset in zk unless offsetFetchResponse returns
 NoOffset.
Since
  the consumer used kafka as the offset storage before I don't
 think
  offsetFetchResponse would return NoOffset
 
   offsetFetchResponse.requestInfo.foreach { case
 (topicAndPartition,
  offsetAndMetadata) =
 
  if (offsetAndMetadata ==
 OffsetMetadataAndError.NoOffset) {
 
val topicDirs = new ZKGroupTopicDirs(group,
topicAndPartition.
  topic)
 
// this group may not have migrated off zookeeper for
  offsets
  storage (we don't expose the dual-commit option in this tool
 
// (meaning the lag may be off until all the consumers
  in the
  group have the same setting for offsets storage)
 
try {
 
  val offset = ZkUtils.readData(zkClient,
topicDirs.consumerOffsetDir
  + /%d.format(topicAndPartition.partition))._1.toLong
 
  offsetMap.put(topicAndPartition, offset)
 
} catch {
 
  case z: ZkNoNodeException =
 
 
 if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))
 
  offsetMap.put(topicAndPartition,-1)
 
else
 
  throw z
 
}
 
  }
 
  else if (offsetAndMetadata.error ==
 ErrorMapping.NoError)
 
offsetMap.put(topicAndPartition,
  offsetAndMetadata.offset)
 
  else {
 
println(Could not fetch offset for %s due to
  %s..format(
  topicAndPartition,
  ErrorMapping.exceptionFor(offsetAndMetadata.error)))
 
  }
 
}
 
  On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy
 jjkosh...@gmail.com
wrote:
 
   There are mbeans named KafkaCommitsPerSec and
  ZooKeeperCommitsPerSec
-
   can you look those up and see what they report?
  
   On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote:
Hi team,
   
I was trying to migrate my consumer offset from kafka to
  zookeeper.
   
Here is the original settings of my consumer