offset migration from kafka to zookeeper
Hi team, I was trying to migrate my consumer offset from kafka to zookeeper. Here is the original settings of my consumer props.put(offsets.storage, kafka); props.put(dual.commit.enabled, false); Here is the steps 1. set dual.commit.enabled=true 2. restart my consumer and monitor offset lag with kafka.tools.ConsumerOffsetChecker 3. set offsets.storage=zookeeper 4. restart my consumer and monitor offset lag with kafka.tools.ConsumerOffsetChecker After step 4 my consumer was able to continually consume data from topic but the offset lag remained unchanged. Did I do anything wrong? -- Regards, Tao
Re: offset migration from kafka to zookeeper
That is weird. Are you by any chance running an older version of the offset checker? Is this straightforward to reproduce? On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote: Joel, No, the metric was not increasing. It was 0 all the time. On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy jjkosh...@gmail.com wrote: Actually I meant to say check that is not increasing. On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote: Possibly a bug - can you also look at the MaxLag mbean in the consumer to verify that the maxlag is zero? On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote: Hi Joel, When I set dual.commit.enabled=true the count value of both metrics got increased. After I set offsets.storage=zookeeper only ZooKeeperCommitsPerSec changed but not KafkaCommitsPerSec. I think this is expected as kafka offset storage was turned off. But when I looked up the consumer lag via kafka.tools.ConsumerOffsetChecker the lag still remained unchanged. I scanned through the source code of ConsumerOffsetChecker it doesn't check the offset in zk unless offsetFetchResponse returns NoOffset. Since the consumer used kafka as the offset storage before I don't think offsetFetchResponse would return NoOffset offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) = if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition. topic) // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) try { val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + /%d.format(topicAndPartition.partition))._1.toLong offsetMap.put(topicAndPartition, offset) } catch { case z: ZkNoNodeException = if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir)) offsetMap.put(topicAndPartition,-1) else throw z } } else if (offsetAndMetadata.error == ErrorMapping.NoError) offsetMap.put(topicAndPartition, offsetAndMetadata.offset) else { println(Could not fetch offset for %s due to %s..format( topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) } } On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy jjkosh...@gmail.com wrote: There are mbeans named KafkaCommitsPerSec and ZooKeeperCommitsPerSec - can you look those up and see what they report? On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote: Hi team, I was trying to migrate my consumer offset from kafka to zookeeper. Here is the original settings of my consumer props.put(offsets.storage, kafka); props.put(dual.commit.enabled, false); Here is the steps 1. set dual.commit.enabled=true 2. restart my consumer and monitor offset lag with kafka.tools.ConsumerOffsetChecker 3. set offsets.storage=zookeeper 4. restart my consumer and monitor offset lag with kafka.tools.ConsumerOffsetChecker After step 4 my consumer was able to continually consume data from topic but the offset lag remained unchanged. Did I do anything wrong? -- Regards, Tao -- Regards, Tao -- Regards, Tao
Re: Increased CPU usage with 0.8.2-beta
This is a serious issue, we'll take a look. -Jay On Thu, Feb 12, 2015 at 3:19 PM, Solon Gordon so...@knewton.com wrote: I saw a very similar jump in CPU usage when I tried upgrading from 0.8.1.1 to 0.8.2.0 today in a test environment. The Kafka cluster there is two m1.larges handling 2,000 partitions across 32 topics. CPU usage rose from 40% into the 150%–190% range, and load average from under 1 to over 4. Downgrading to 0.8.1.1 brought the CPU and load back to the previous values. If there's more info that would be helpful, please let me know. On Thu, Feb 12, 2015 at 4:17 PM, Mathias Söderberg mathias.soederb...@gmail.com wrote: Jun, Pardon the radio silence. I booted up a new broker, created a topic with three (3) partitions and replication factor one (1) and used the *kafka-producer-perf-test.sh *script to generate load (using messages of roughly the same size as ours). There was a slight increase in CPU usage (~5-10%) on 0.8.2.0-rc2 compared to 0.8.1.1, but that was about it. I upgraded our staging cluster to 0.8.2.0 earlier this week or so, and had to add an additional broker due to increased load after the upgrade (note that the incoming load on the cluster has been pretty much consistent). Since the upgrade we've been seeing an 2-3x increase in latency as well. I'm considering downgrading to 0.8.1.1 again to see if it resolves our issues. Best regards, Mathias On Tue Feb 03 2015 at 6:44:36 PM Jun Rao j...@confluent.io wrote: Mathias, The new hprof doesn't reveal anything new to me. We did fix the logic in using Purgatory in 0.8.2, which could potentially drive up the CPU usage a bit. To verify that, could you do your test on a single broker (with replication factor 1) btw 0.8.1 and 0.8.2 and see if there is any significant difference in cpu usage? Thanks, Jun On Tue, Feb 3, 2015 at 5:09 AM, Mathias Söderberg mathias.soederb...@gmail.com wrote: Jun, I re-ran the hprof test, for about 30 minutes again, for 0.8.2.0-rc2 with the same version of snappy that 0.8.1.1 used. Attached the logs. Unfortunately there wasn't any improvement as the node running 0.8.2.0-rc2 still had a higher load and CPU usage. Best regards, Mathias On Tue Feb 03 2015 at 4:40:31 AM Jaikiran Pai jai.forums2...@gmail.com wrote: On Monday 02 February 2015 11:03 PM, Jun Rao wrote: Jaikiran, The fix you provided in probably unnecessary. The channel that we use in SimpleConsumer (BlockingChannel) is configured to be blocking. So even though the read from the socket is in a loop, each read blocks if there is no bytes received from the broker. So, that shouldn't cause extra CPU consumption. Hi Jun, Of course, you are right! I forgot that while reading the thread dump in hprof output, one has to be aware that the thread state isn't shown and the thread need not necessarily be doing any CPU activity. -Jaikiran Thanks, Jun On Mon, Jan 26, 2015 at 10:05 AM, Mathias Söderberg mathias.soederb...@gmail.com wrote: Hi Neha, I sent an e-mail earlier today, but noticed now that it didn't actually go through. Anyhow, I've attached two files, one with output from a 10 minute run and one with output from a 30 minute run. Realized that maybe I should've done one or two runs with 0.8.1.1 as well, but nevertheless. I upgraded our staging cluster to 0.8.2.0-rc2, and I'm seeing the same CPU usage as with the beta version (basically pegging all cores). If I manage to find the time I'll do another run with hprof on the rc2 version later today. Best regards, Mathias On Tue Dec 09 2014 at 10:08:21 PM Neha Narkhede n...@confluent.io wrote: The following should be sufficient java -agentlib:hprof=cpu=samples,depth=100,interval=20,lineno= y,thread=y,file=kafka.hprof classname You would need to start the Kafka server with the settings above for sometime until you observe the problem. On Tue, Dec 9, 2014 at 3:47 AM, Mathias Söderberg mathias.soederb...@gmail.com wrote: Hi Neha, Yeah sure. I'm not familiar with hprof, so any particular options I should include or just run with defaults? Best regards, Mathias On Mon Dec 08 2014 at 7:41:32 PM Neha Narkhede n...@confluent.io wrote: Thanks for reporting the issue. Would you mind running hprof and sending the output? On Mon, Dec 8, 2014 at 1:25 AM, Mathias Söderberg mathias.soederb...@gmail.com wrote: Good day, I upgraded a Kafka cluster from v0.8.1.1 to v0.8.2-beta and noticed that the CPU usage
Re: offset migration from kafka to zookeeper
Thanks for the explanation. It there a way that I can wipe out the offset stored in kafka so that the checker can continue to work again? On Fri, Feb 13, 2015 at 1:31 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: I think this is the offset checker bug. The offset checker will 1. first check if the offset exists in offset topic on broker or not. 2. If it is on broker then it will just return that offset. 3. Otherwise it goes to zookeeper. So the problem you saw was actually following this logic. After dual commit, offset topic already had the offsets for this consumer and topic. Then you switched to zookeeper commit. Because the offset topic has the offsets already, offset checker will use that and skip checking zookeeper. So the offset will not change anymore because you are no longer committing to offset topic on broker, while offset checker always use that offset. On 2/12/15, 7:30 PM, tao xiao xiaotao...@gmail.com wrote: I used the one shipped with 0.8.2. It is pretty straightforward to reproduce the issue. Here are the steps to reproduce: 1. I have a consumer using high level consumer API with initial settings offsets.storage=kafka and dual.commit.enabled=false. 2. After consuming messages for a while shutdown the consumer and change setting dual.commit.enabled=true 3. bounce the consumer and run for while. The lag looks good 4. change setting offsets.storage=zookeeper and bounce the consumer. Starting from now the lag remain unchanged On Fri, Feb 13, 2015 at 11:01 AM, Joel Koshy jjkosh...@gmail.com wrote: That is weird. Are you by any chance running an older version of the offset checker? Is this straightforward to reproduce? On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote: Joel, No, the metric was not increasing. It was 0 all the time. On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy jjkosh...@gmail.com wrote: Actually I meant to say check that is not increasing. On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote: Possibly a bug - can you also look at the MaxLag mbean in the consumer to verify that the maxlag is zero? On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote: Hi Joel, When I set dual.commit.enabled=true the count value of both metrics got increased. After I set offsets.storage=zookeeper only ZooKeeperCommitsPerSec changed but not KafkaCommitsPerSec. I think this is expected as kafka offset storage was turned off. But when I looked up the consumer lag via kafka.tools.ConsumerOffsetChecker the lag still remained unchanged. I scanned through the source code of ConsumerOffsetChecker it doesn't check the offset in zk unless offsetFetchResponse returns NoOffset. Since the consumer used kafka as the offset storage before I don't think offsetFetchResponse would return NoOffset offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) = if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition. topic) // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) try { val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + /%d.format(topicAndPartition.partition))._1.toLong offsetMap.put(topicAndPartition, offset) } catch { case z: ZkNoNodeException = if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir)) offsetMap.put(topicAndPartition,-1) else throw z } } else if (offsetAndMetadata.error == ErrorMapping.NoError) offsetMap.put(topicAndPartition, offsetAndMetadata.offset) else { println(Could not fetch offset for %s due to %s..format( topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) } } On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy jjkosh...@gmail.com wrote: There are mbeans named KafkaCommitsPerSec and ZooKeeperCommitsPerSec - can you look those up and see what they report? On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote: Hi team, I was trying to migrate my consumer offset from kafka to zookeeper. Here is the original settings of my consumer
Re: understanding partition key
So it's not possible to have 1 topic with 1 partition and many consumers of that topic? My intention is to have a topic with many consumers, but each consumer needs to be able to have access to all the messages in that topic. On 11 February 2015 at 20:42, Zijing Guo alter...@yahoo.com.invalid wrote: Partition key is on producer level, that if you have multiple partitions for a single topic, then you can pass in a key for the KeyedMessage object, and base on different partition.class, it will return a partition number for the producer, and producer will find the leader for that partition.I don't know how kafka could handle time series case, but depends on how many partitions for that topic. If you only have 1 partition, then you don't need to worry about order at all, since each consumer group can only allow 1 consumer instance to consume that data. if you have multiple partitions (say 3 for example), then you can fire up 3 consumer instances under the same consumer group, and each will only consume 1 partition's data. if order in each partition matters, then you need to do some work on the producer side.Hope this helpsEdwin On Wednesday, February 11, 2015 3:14 PM, Gary Ogden gog...@gmail.com wrote: I'm trying to understand how the partition key works and whether I need to specify a partition key for my topics or not. What happens if I don't specify a PK and I have more than one consumer that wants all messages in a topic for a certain period of time? Will those consumers get all the messages, but they just may not be ordered correctly? The current scenario is that we will have events going into a topic based on customer and the data will remain in the topic for 24 hours. We will then have multiple consumers reading messages from that topic. They will want to be able to get them out over a time range (could be last hour, last 8 hours etc). So if I specify the same PK for each subscriber, then each consumer will get all messages in the correct order? If I don't specify the PK or use a random one, will each consumer still get all the messages but they just won't be ordered correctly?
Re: understanding partition key
Gary, That is certainly a valid use case. What Zijing was saying is that you can only have 1 consumer per consumer application per partition. I think that what it boils down to is how you want your information grouped inside your timeframes. For example, if you want to have everything for a specific user, then you could use that as your partition key, ensuring that any data for that user is processed by the same consumer (in however many consumer applications you opt to run). The second point that I think Zijing was getting at was whether or not your proposed use case makes sense for Kafka. If your goal is to do time-interval batch processing (versus N-record batches), then why use Kafka for it? Why not use something more adept at batch processing? For example, if you're using HBase you can use Pig jobs that would read only the records created between specific timestamps. David On Thu, Feb 12, 2015 at 7:44 AM, Gary Ogden gog...@gmail.com wrote: So it's not possible to have 1 topic with 1 partition and many consumers of that topic? My intention is to have a topic with many consumers, but each consumer needs to be able to have access to all the messages in that topic. On 11 February 2015 at 20:42, Zijing Guo alter...@yahoo.com.invalid wrote: Partition key is on producer level, that if you have multiple partitions for a single topic, then you can pass in a key for the KeyedMessage object, and base on different partition.class, it will return a partition number for the producer, and producer will find the leader for that partition.I don't know how kafka could handle time series case, but depends on how many partitions for that topic. If you only have 1 partition, then you don't need to worry about order at all, since each consumer group can only allow 1 consumer instance to consume that data. if you have multiple partitions (say 3 for example), then you can fire up 3 consumer instances under the same consumer group, and each will only consume 1 partition's data. if order in each partition matters, then you need to do some work on the producer side.Hope this helpsEdwin On Wednesday, February 11, 2015 3:14 PM, Gary Ogden gog...@gmail.com wrote: I'm trying to understand how the partition key works and whether I need to specify a partition key for my topics or not. What happens if I don't specify a PK and I have more than one consumer that wants all messages in a topic for a certain period of time? Will those consumers get all the messages, but they just may not be ordered correctly? The current scenario is that we will have events going into a topic based on customer and the data will remain in the topic for 24 hours. We will then have multiple consumers reading messages from that topic. They will want to be able to get them out over a time range (could be last hour, last 8 hours etc). So if I specify the same PK for each subscriber, then each consumer will get all messages in the correct order? If I don't specify the PK or use a random one, will each consumer still get all the messages but they just won't be ordered correctly?
Re: understanding partition key
I'm going to go a bit in reverse for your questions. We built a restful API to push data to so that we could submit things from multiple sources that aren't necessarily things that our team would maintain, as well as validate that data before we send it off to a topic. As for consumers... we expect that a consumer will receive everything in a particular partition, but definitely not everything in a topic. For example, for DNS information, a consumer expects to see everything for a domain (our partition key there), but not for all domains... it would be too much data to handle with a low enough latency. For us, our latencies vary by what we're processing at a given moment or other large jobs that might be running outside of the stream processing. Generally speaking we keep latency within our limits by adjusting the number of partitions for a topic, and subsequently the number of consumers...which works until you hit a wall with your data store, in which case tweaking must happen there too. Your flow of data will be the biggest impact on latency (along with batch size) If you see 100 events a minute, and have a batch size of 1k, you'll have 10 minutes of latency... if you process your data as each record comes in, then as long as your consumer can keep up with that load, you'll have very low latency. Kafka's latency has never been an issue for us, and we've always found there's something that we're doing that is affecting the overall throughput of the systems, be it needing to play with the number of partitions, adjusting batch size, resizing the hadoop cluster to meet increased need there. On Thu, Feb 12, 2015 at 9:54 AM, Gary Ogden gog...@gmail.com wrote: Thanks again David. So what kind of latencies are you experiencing with this? If I wanted to act upon certain events in this and send out alarms (email, sms etc), what kind of delays are you seeing by the time you're able to process them? It seems if you were to create an alarm topic, and dump alerts on there to be processed, and have a consumer then process those alerts (save to Cassandra and send out the notification), you could see some sort of delay. But none of you consumers are expecting that they will be getting all the data for that topic, are they? That's the hitch for me. I think I could rework the design so that no consumer would assume it's getting all the messages in the topic. When you say central producer stack, this is something you built outside of kafka I assume. On 12 February 2015 at 09:40, David McNelis dmcne...@emergingthreats.net wrote: In our setup we deal with a similar situation (lots of time-series data that we have to aggregate in a number of different ways). Our approach is to push all of our data to a central producer stack, that stack then submits data to different topics, depending on a set of predetermined rules. For arguments sake, lets say we see 100 events / second... Once the data is pushed to topics we have several consumer applications. The first pushes the lowest level raw data into our cluster (in our case HBase, but could just as easily be C*). This first consumer we use a small batch size say 100 records. Our second consumer does some aggregation and summarization and needs to reference additional resources. This batch size is considerably larger, say 1000 records in size. Our third consumer does a much larger aggregation where we're reducing it down to a much smaller dataset, by orders of magnitude, and we'll run that batch size at 10k records. Our aggregations are oftentimes time based, like rolling events / counters to the hour or day level... so while we may occasionally re-process some of the same information throughout the day, it lets us maintain a system where we have near-real-time access to most of the data we're ingesting. This certainly is something we've had to tweak in terms of the numbers of consumers / partitions and batch sizes to get to work optimally, but we're pretty happy with it at the moment. On Thu, Feb 12, 2015 at 8:22 AM, Gary Ogden gog...@gmail.com wrote: Thanks David. Whether Kafka is the right choice is exactly what I'm trying to determine. Everything I want to do with these events is time based. Store them in the topic for 24 hours. Read from the topics and get data for a time period (last hour , last 8 hours etc). This reading from the topics could happen numerous times for the same dataset. At the end of the 24 hours, we would then want to store the events in long term storage in case we need them in the future. I'm thinking Kafka isn't the right fit for this use case though. However, we have cassandra already installed and running. Maybe we use kafka as the in-between... So dump the events onto kafka, have consumers that process the events and dump them into Cassandra and then we read the the time series data we need