Re: offset migration from kafka to zookeeper
Thanks Jiangjie for your help On Sat, Feb 14, 2015 at 5:59 AM, Joel Koshy jjkosh...@gmail.com wrote: Thanks for looking into that! On Fri, Feb 13, 2015 at 05:31:39AM +, Jiangjie Qin wrote: I think this is the offset checker bug. The offset checker will 1. first check if the offset exists in offset topic on broker or not. 2. If it is on broker then it will just return that offset. 3. Otherwise it goes to zookeeper. So the problem you saw was actually following this logic. After dual commit, offset topic already had the offsets for this consumer and topic. Then you switched to zookeeper commit. Because the offset topic has the offsets already, offset checker will use that and skip checking zookeeper. So the offset will not change anymore because you are no longer committing to offset topic on broker, while offset checker always use that offset. On 2/12/15, 7:30 PM, tao xiao xiaotao...@gmail.com wrote: I used the one shipped with 0.8.2. It is pretty straightforward to reproduce the issue. Here are the steps to reproduce: 1. I have a consumer using high level consumer API with initial settings offsets.storage=kafka and dual.commit.enabled=false. 2. After consuming messages for a while shutdown the consumer and change setting dual.commit.enabled=true 3. bounce the consumer and run for while. The lag looks good 4. change setting offsets.storage=zookeeper and bounce the consumer. Starting from now the lag remain unchanged On Fri, Feb 13, 2015 at 11:01 AM, Joel Koshy jjkosh...@gmail.com wrote: That is weird. Are you by any chance running an older version of the offset checker? Is this straightforward to reproduce? On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote: Joel, No, the metric was not increasing. It was 0 all the time. On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy jjkosh...@gmail.com wrote: Actually I meant to say check that is not increasing. On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote: Possibly a bug - can you also look at the MaxLag mbean in the consumer to verify that the maxlag is zero? On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote: Hi Joel, When I set dual.commit.enabled=true the count value of both metrics got increased. After I set offsets.storage=zookeeper only ZooKeeperCommitsPerSec changed but not KafkaCommitsPerSec. I think this is expected as kafka offset storage was turned off. But when I looked up the consumer lag via kafka.tools.ConsumerOffsetChecker the lag still remained unchanged. I scanned through the source code of ConsumerOffsetChecker it doesn't check the offset in zk unless offsetFetchResponse returns NoOffset. Since the consumer used kafka as the offset storage before I don't think offsetFetchResponse would return NoOffset offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) = if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition. topic) // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) try { val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + /%d.format(topicAndPartition.partition))._1.toLong offsetMap.put(topicAndPartition, offset) } catch { case z: ZkNoNodeException = if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir)) offsetMap.put(topicAndPartition,-1) else throw z } } else if (offsetAndMetadata.error == ErrorMapping.NoError) offsetMap.put(topicAndPartition, offsetAndMetadata.offset) else { println(Could not fetch offset for %s due to %s..format( topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) } } On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy jjkosh...@gmail.com wrote: There are mbeans named KafkaCommitsPerSec and ZooKeeperCommitsPerSec - can you look those up and see what they report? On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote: Hi team, I was
offset migration from kafka to zookeeper
Hi team, I was trying to migrate my consumer offset from kafka to zookeeper. Here is the original settings of my consumer props.put(offsets.storage, kafka); props.put(dual.commit.enabled, false); Here is the steps 1. set dual.commit.enabled=true 2. restart my consumer and monitor offset lag with kafka.tools.ConsumerOffsetChecker 3. set offsets.storage=zookeeper 4. restart my consumer and monitor offset lag with kafka.tools.ConsumerOffsetChecker After step 4 my consumer was able to continually consume data from topic but the offset lag remained unchanged. Did I do anything wrong? -- Regards, Tao
Re: offset migration from kafka to zookeeper
That is weird. Are you by any chance running an older version of the offset checker? Is this straightforward to reproduce? On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote: Joel, No, the metric was not increasing. It was 0 all the time. On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy jjkosh...@gmail.com wrote: Actually I meant to say check that is not increasing. On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote: Possibly a bug - can you also look at the MaxLag mbean in the consumer to verify that the maxlag is zero? On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote: Hi Joel, When I set dual.commit.enabled=true the count value of both metrics got increased. After I set offsets.storage=zookeeper only ZooKeeperCommitsPerSec changed but not KafkaCommitsPerSec. I think this is expected as kafka offset storage was turned off. But when I looked up the consumer lag via kafka.tools.ConsumerOffsetChecker the lag still remained unchanged. I scanned through the source code of ConsumerOffsetChecker it doesn't check the offset in zk unless offsetFetchResponse returns NoOffset. Since the consumer used kafka as the offset storage before I don't think offsetFetchResponse would return NoOffset offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) = if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition. topic) // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) try { val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + /%d.format(topicAndPartition.partition))._1.toLong offsetMap.put(topicAndPartition, offset) } catch { case z: ZkNoNodeException = if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir)) offsetMap.put(topicAndPartition,-1) else throw z } } else if (offsetAndMetadata.error == ErrorMapping.NoError) offsetMap.put(topicAndPartition, offsetAndMetadata.offset) else { println(Could not fetch offset for %s due to %s..format( topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) } } On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy jjkosh...@gmail.com wrote: There are mbeans named KafkaCommitsPerSec and ZooKeeperCommitsPerSec - can you look those up and see what they report? On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote: Hi team, I was trying to migrate my consumer offset from kafka to zookeeper. Here is the original settings of my consumer props.put(offsets.storage, kafka); props.put(dual.commit.enabled, false); Here is the steps 1. set dual.commit.enabled=true 2. restart my consumer and monitor offset lag with kafka.tools.ConsumerOffsetChecker 3. set offsets.storage=zookeeper 4. restart my consumer and monitor offset lag with kafka.tools.ConsumerOffsetChecker After step 4 my consumer was able to continually consume data from topic but the offset lag remained unchanged. Did I do anything wrong? -- Regards, Tao -- Regards, Tao -- Regards, Tao
Re: offset migration from kafka to zookeeper
Thanks for the explanation. It there a way that I can wipe out the offset stored in kafka so that the checker can continue to work again? On Fri, Feb 13, 2015 at 1:31 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: I think this is the offset checker bug. The offset checker will 1. first check if the offset exists in offset topic on broker or not. 2. If it is on broker then it will just return that offset. 3. Otherwise it goes to zookeeper. So the problem you saw was actually following this logic. After dual commit, offset topic already had the offsets for this consumer and topic. Then you switched to zookeeper commit. Because the offset topic has the offsets already, offset checker will use that and skip checking zookeeper. So the offset will not change anymore because you are no longer committing to offset topic on broker, while offset checker always use that offset. On 2/12/15, 7:30 PM, tao xiao xiaotao...@gmail.com wrote: I used the one shipped with 0.8.2. It is pretty straightforward to reproduce the issue. Here are the steps to reproduce: 1. I have a consumer using high level consumer API with initial settings offsets.storage=kafka and dual.commit.enabled=false. 2. After consuming messages for a while shutdown the consumer and change setting dual.commit.enabled=true 3. bounce the consumer and run for while. The lag looks good 4. change setting offsets.storage=zookeeper and bounce the consumer. Starting from now the lag remain unchanged On Fri, Feb 13, 2015 at 11:01 AM, Joel Koshy jjkosh...@gmail.com wrote: That is weird. Are you by any chance running an older version of the offset checker? Is this straightforward to reproduce? On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote: Joel, No, the metric was not increasing. It was 0 all the time. On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy jjkosh...@gmail.com wrote: Actually I meant to say check that is not increasing. On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote: Possibly a bug - can you also look at the MaxLag mbean in the consumer to verify that the maxlag is zero? On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote: Hi Joel, When I set dual.commit.enabled=true the count value of both metrics got increased. After I set offsets.storage=zookeeper only ZooKeeperCommitsPerSec changed but not KafkaCommitsPerSec. I think this is expected as kafka offset storage was turned off. But when I looked up the consumer lag via kafka.tools.ConsumerOffsetChecker the lag still remained unchanged. I scanned through the source code of ConsumerOffsetChecker it doesn't check the offset in zk unless offsetFetchResponse returns NoOffset. Since the consumer used kafka as the offset storage before I don't think offsetFetchResponse would return NoOffset offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) = if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition. topic) // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) try { val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + /%d.format(topicAndPartition.partition))._1.toLong offsetMap.put(topicAndPartition, offset) } catch { case z: ZkNoNodeException = if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir)) offsetMap.put(topicAndPartition,-1) else throw z } } else if (offsetAndMetadata.error == ErrorMapping.NoError) offsetMap.put(topicAndPartition, offsetAndMetadata.offset) else { println(Could not fetch offset for %s due to %s..format( topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) } } On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy jjkosh...@gmail.com wrote: There are mbeans named KafkaCommitsPerSec and ZooKeeperCommitsPerSec - can you look those up and see what they report? On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote: Hi team, I was trying to migrate my consumer offset from kafka to zookeeper. Here is the original settings of my consumer