Re: data corruption like behavior
but they have always been up. I mean when i was testing, all the zookeepers were up. and all the kafka nodes were up. its just that I changed the number of zookeeper nodes in my first test iteration. second and third were still the same. not sure why the topics were losing some messages. On Thu, Feb 19, 2015 at 11:39 AM, Jun Rao j...@confluent.io wrote: Zookeeper requires a majority of the nodes to be up for the service to be available. Kafka relies on Zookeeper to be always available. Thanks, Jun On Thu, Feb 19, 2015 at 11:15 AM, Karts kartad...@gmail.com wrote: I have noticed some strange patterns when testing with the 0.8.1 build and the 0.8.2 builds, and are listed below. 1. So I setup a brand new cluster [3 kafka nodes with 3 zookeepers], created 2 topics via the API calls, everything went fine and was successfully able to view my messages in my consumers. There were no messages lost. All is happy. Now, I change my setup to just have 1 zookeeper. and do my test again, i lose some messages. I have checked that all my configs are pointing to just 1 zookeeper and there was no mention of the other 2 offline zookeepers. any idea why ? 2. I revert back my settings to the original config, all 3 nodes are online, no errors, send messages to same old topic, and i am still loosing some messages. I deleted all the old topic files [to follow the 'cleanup' process], create a new topic, and i am successfully able to receive all messages. no loss whatsoever. 3. Now in this state, i upgrade to 0.8.2, and try sending messages to the topic that was made after the above cleanup, and i am losing messages again. Am i making sense? I mean this is a very strange behavior, and if anyone can comment on this [please correct me if i have done something 'very' wrong].. Thanks..
Re: big cpu jump on producer in face of broker outage
This is probably due to KAFKA-1642, which is fixed in 0.8.2.0. Could you try that version or 0.8.2.1 which is being voted now. Thanks, Jun On Thu, Feb 19, 2015 at 10:42 AM, Steven Wu stevenz...@gmail.com wrote: forgot to mention in case it matters producer: 0.8.2-beta broker: 0.8.1.1 On Thu, Feb 19, 2015 at 10:34 AM, Steven Wu stevenz...@gmail.com wrote: I think this is an issue caused by KAFKA-1788. I was trying to test producer resiliency to broker outage. In this experiment, I shutdown all brokers and see how producer behavior. Here are the observations 1) kafka producer can recover from kafka outage. i.e. send resumed after brokers came back 2) producer instance saw big cpu jump during outage. 28% - 52% in one test. Note that I didn't observe cpu issue when new producer instance started with brokers outage. In this case, there are no messages accumulated in the buffer, because KafkaProducer constructor failed with DNS lookup for route53 name. when brokers came up, my wrapper re-created KafkaProducer object and recover from outage with sending messages. Here is the cpu graph for a running producer instance where broker outage happened in the middle of test run. it shows cpu problem. https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing Here is the cpu graph for a new producer instance where broker outage happened before instance startup. cpu is good here. https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing Note that producer is a 4-core m1.xlarge instance. x-axis is time, y-axis is cpu util. Thanks, Steven
Re: Default MirrorMaker not copying over from source to target
Tao, I updated the mirrorconsumer.properties config file as you suggested, and upped the MM's log level to DEBUG. I have the output of the DEBUG logger here in this pastebin, if you could take a minute to look for anything in its contents that would indicate a problem that would be extremely helpful. Note that my servers hostnames are of the form ad-010X or ba-0X where X is some integer between 1 and 4. http://pastebin.com/rBsxx15A When I run the mirrormaker and then spin up a console consumer to read from the source cluster, I get 0 messages consumed. Alex On Sun, Feb 15, 2015 at 3:00 AM, tao xiao xiaotao...@gmail.com wrote: Alex, Are you sure you have data continually being sent to the topic in source cluster after you bring up MM? By default auto.offset.reset=largest in MM consumer config which means MM only fetches the largest offset if the consumer group has no initial offset in zookeeper. You can have MM print more log by changing the log level in config/tools-log4j.properties On Sun, Feb 15, 2015 at 8:39 AM, Alex Melville amelvi...@g.hmc.edu wrote: Hi Kafka'ers, I am trying to get the Mirrormaker working with two separate clusters, one as the source and the other as the target. The topic I'm trying to copy over exists on both the source and target clusters. Here are the relevant entries in my consumer and producer properties files, which I'm specifying the command I run to start the MM: *mirrorconsumer.properties:* zookeeper.connect=ad-0104:2181 zookeeper.connection.timeout.ms=6000 group.id=test-consumer-group *mirrorproducer.properties:* metadata.broker.list=ba-02:9092,ba-03:9092 producer.type=sync compression.codec=none serializer.class=kafka.serializer.DefaultEncoder Then I run the following command: bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config ../config/mirrorconsumer.properties --producer.config ../config/mirrorproducer.properties --whitelist consolemm so consolemm is the topic I'm trying to copy over. I've created consolemm and have used to console-consumer to verify that there are messages in the topic. When I run this command... nothing happens. The process keeps running and prints nothing to the Terminal. If I look in the output of the zookeeper on the source cluster I get only the following: [2015-02-15 00:34:06,102] INFO Accepted socket connection from / 10.7.162.75:42819 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2015-02-15 00:34:06,104] INFO Client attempting to establish new session at /10.7.162.75:42819 (org.apache.zookeeper.server.ZooKeeperServer) [2015-02-15 00:34:06,106] INFO Established session 0x14b668b0fbe0033 with negotiated timeout 6000 for client /10.7.162.75:42819 (org.apache.zookeeper.server.ZooKeeperServer) and when I look at the output of one of the brokers on the source cluster I get: [2015-02-15 00:32:14,382] INFO Closing socket connection to /10.7.162.75 . (kafka.network.Processor) and there is no output on the zookeeper on the target cluster. Any advice on what is causing MM to not properly copy over data to the target cluster would be extremely helpful. -Alex -- Regards, Tao
data corruption like behavior
I have noticed some strange patterns when testing with the 0.8.1 build and the 0.8.2 builds, and are listed below. 1. So I setup a brand new cluster [3 kafka nodes with 3 zookeepers], created 2 topics via the API calls, everything went fine and was successfully able to view my messages in my consumers. There were no messages lost. All is happy. Now, I change my setup to just have 1 zookeeper. and do my test again, i lose some messages. I have checked that all my configs are pointing to just 1 zookeeper and there was no mention of the other 2 offline zookeepers. any idea why ? 2. I revert back my settings to the original config, all 3 nodes are online, no errors, send messages to same old topic, and i am still loosing some messages. I deleted all the old topic files [to follow the 'cleanup' process], create a new topic, and i am successfully able to receive all messages. no loss whatsoever. 3. Now in this state, i upgrade to 0.8.2, and try sending messages to the topic that was made after the above cleanup, and i am losing messages again. Am i making sense? I mean this is a very strange behavior, and if anyone can comment on this [please correct me if i have done something 'very' wrong].. Thanks..
Re: Simple Consumer and offsets
Joel/All, The SimpleConsumer constructor requires a specific host and port. Can this be any broker? If it needs to be a specific broker, for 0.8.2, should this be the offset coordinator? For 0.8.1, does it matter? -Suren On Thursday, February 19, 2015 10:43 AM, Joel Koshy jjkosh...@gmail.com wrote: I see - yes, you can use the SimpleConsumer for that. However, your high-level consumers need to be shutdown while you do that (otherwise they may auto-commit while you are resetting offsets). Thanks, Joel On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote: We are using the High Level Consumer API to interact with Kafka for our normal use cases. However, on consumer restart in the case of consumer failures, we want to be able to manually reset offsets in certain situations. And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-) It looked like instantiating a SimpleConsumer just to reset offsets on restart was a viable option, while continuing to use the High Level Consumer for our normal operations. Not sure if there is a better way that is compatible across 0.8.1 and 0.8.2. -Suren On Thursday, February 19, 2015 10:25 AM, Joel Koshy jjkosh...@gmail.com wrote: Not sure what you mean by using the SimpleConsumer on failure recovery. Can you elaborate on this? On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote: Haven't used either one now. Sounds like 0.8.2.1 will help. We are using the High Level Consumer generally but are thinking to use the SimpleConsumer on failure recovery to set the offsets. Is that the recommended approach for this use case? Thanks. -Suren On Thursday, February 19, 2015 9:40 AM, Joel Koshy jjkosh...@gmail.com wrote: Are you using it from Java or Scala? i.e., are you using the javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer In 0.8.2 javaapi we explicitly set version 0 of the OffsetCommitRequest/OffsetFetchRequest which means it will commit/fetch to/from ZooKeeper only. If you use the scala API you can create an OffsetCommitRequest with version set to 1 (which will allow you to commit to Kafka). Since we are doing an 0.8.2.1 release we will make the above more consistent. i.e., you can create OffsetCommitRequests with version 1 even from the javaapi. I will be updating the documentation on this to make it clearer. Thanks, Joel On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: Joel, Looking at SimpleConsumer in the 0.8.2 code, it is using OffsetCommitRequest and sending that over to a broker. Is the broker storing that in ZK? -Suren On Tuesday, February 17, 2015 12:22 PM, Joel Koshy jjkosh...@gmail.com wrote: Hi Chris, In 0.8.2, the simple consumer Java API supports committing/fetching offsets that are stored in ZooKeeper. You don't need to issue any ConsumerMetadataRequest for this. Unfortunately, the API currently does not support fetching offsets that are stored in Kafka. Thanks, Joel On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: Hi, I am still using 0.8.1.1 because of the CPU use concerns. I'm confused about why the SimpleConsumer has: OffsetCommitResponse commitOffsets(OffsetCommitRequest request) and OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) but no way that I can see to issue a ConsumerMetadataRequest, which is what I think when restarting my consumers so that they can begin working where they last left off (in the event that they were stopped for a while then restarted some time later, and new messages had come in). The fetchOffsets() works on time, usually it looks like you send it Earliest or Latest (beginning or end of what's currently in the stream). I realize the documentation says this: *Downsides of using SimpleConsumer*The SimpleConsumer does require a significant amount of work not needed in the Consumer Groups: 1. You must keep track of the offsets in your application to know where you left off consuming. But that's not really quite true ... not as long as commitOffsets() has been provided. It seems the SimpleConsumer provides you with a solution to only one half of the problem of offset management. Using some zookeeper python scripts I wrote I can see that the commitOffsets() is doing its job and writing to /consumers/myGroupId/offsets/myTopic/0 That has this value: ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, ctime=1423777630972, mtime=1424122117397, version=12568262, cversion=0, aversion=0, ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679)) Now the question is just how to retrieve that - do I really have to have my
Re: [VOTE] 0.8.2.1 Candidate 1
+1 binding. Checked the md5, and quick start. Some minor comments: 1. The quickstart section would better include the building step after download and before starting server. 2. There seems to be a bug in Gradle 1.1x with Java 8 causing the gradle initialization to fail: - FAILURE: Build failed with an exception. * Where: Build file '/home/guwang/Workspace/temp/kafka/build.gradle' line: 199 * What went wrong: A problem occurred evaluating root project 'kafka'. Could not create task of type 'ScalaDoc'. -- Downgrading Java to 1.7 resolve this issue. Guozhang On Wed, Feb 18, 2015 at 7:56 PM, Connie Yang cybercon...@gmail.com wrote: +1 On Feb 18, 2015 7:23 PM, Matt Narrell matt.narr...@gmail.com wrote: +1 On Feb 18, 2015, at 7:56 PM, Jun Rao j...@confluent.io wrote: This is the first candidate for release of Apache Kafka 0.8.2.1. This only fixes one critical issue (KAFKA-1952) in 0.8.2.0. Release Notes for the 0.8.2.1 release https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/RELEASE_NOTES.html *** Please download, test and vote by Saturday, Feb 21, 7pm PT Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/scaladoc/ * java-doc https://people.apache.org/~junrao/kafka-0.8.2.1-candidate1/javadoc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.1 tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=c1b4c58531343dce80232e0122d085fc687633f6 /*** Thanks, Jun -- -- Guozhang
Re: big cpu jump on producer in face of broker outage
will try 0.8.2.1 on producer and report back result. On Thu, Feb 19, 2015 at 11:52 AM, Jun Rao j...@confluent.io wrote: This is probably due to KAFKA-1642, which is fixed in 0.8.2.0. Could you try that version or 0.8.2.1 which is being voted now. Thanks, Jun On Thu, Feb 19, 2015 at 10:42 AM, Steven Wu stevenz...@gmail.com wrote: forgot to mention in case it matters producer: 0.8.2-beta broker: 0.8.1.1 On Thu, Feb 19, 2015 at 10:34 AM, Steven Wu stevenz...@gmail.com wrote: I think this is an issue caused by KAFKA-1788. I was trying to test producer resiliency to broker outage. In this experiment, I shutdown all brokers and see how producer behavior. Here are the observations 1) kafka producer can recover from kafka outage. i.e. send resumed after brokers came back 2) producer instance saw big cpu jump during outage. 28% - 52% in one test. Note that I didn't observe cpu issue when new producer instance started with brokers outage. In this case, there are no messages accumulated in the buffer, because KafkaProducer constructor failed with DNS lookup for route53 name. when brokers came up, my wrapper re-created KafkaProducer object and recover from outage with sending messages. Here is the cpu graph for a running producer instance where broker outage happened in the middle of test run. it shows cpu problem. https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing Here is the cpu graph for a new producer instance where broker outage happened before instance startup. cpu is good here. https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing Note that producer is a 4-core m1.xlarge instance. x-axis is time, y-axis is cpu util. Thanks, Steven
Custom partitioner in kafka-0.8.2.0
Hi I could not find a way to customize Partitioner class in new KafaProducer class, is it intentional ? tx SunilKalva
Re: Custom partitioner in kafka-0.8.2.0
Hi, In new producer, we can specify the partition number as part of ProducerRecord. From javadocs : *If a valid partition number is specified that partition will be used when sending the record. If no partition is specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is present a partition will be assigned in a round-robin fashion. * http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html ManiKumar On Thu, Feb 19, 2015 at 6:05 PM, sunil kalva kalva.ka...@gmail.com wrote: Hi I could not find a way to customize Partitioner class in new KafaProducer class, is it intentional ? tx SunilKalva
Re: Custom partitioner in kafka-0.8.2.0
Hi I could not find a way to customize Partitioner class in new KafaProducer class, is it intentional ? tx SunilKalva
Re: Custom partitioner in kafka-0.8.2.0
thanks mani for quick response, sorry some how i missed this javadoc :) t SunilKalva On Thu, Feb 19, 2015 at 6:14 PM, Manikumar Reddy ku...@nmsworks.co.in wrote: Hi, In new producer, we can specify the partition number as part of ProducerRecord. From javadocs : *If a valid partition number is specified that partition will be used when sending the record. If no partition is specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is present a partition will be assigned in a round-robin fashion. * http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html ManiKumar On Thu, Feb 19, 2015 at 6:05 PM, sunil kalva kalva.ka...@gmail.com wrote: Hi I could not find a way to customize Partitioner class in new KafaProducer class, is it intentional ? tx SunilKalva
Re: Simple Consumer and offsets
We are using the High Level Consumer API to interact with Kafka for our normal use cases. However, on consumer restart in the case of consumer failures, we want to be able to manually reset offsets in certain situations. And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-) It looked like instantiating a SimpleConsumer just to reset offsets on restart was a viable option, while continuing to use the High Level Consumer for our normal operations. Not sure if there is a better way that is compatible across 0.8.1 and 0.8.2. -Suren On Thursday, February 19, 2015 10:25 AM, Joel Koshy jjkosh...@gmail.com wrote: Not sure what you mean by using the SimpleConsumer on failure recovery. Can you elaborate on this? On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote: Haven't used either one now. Sounds like 0.8.2.1 will help. We are using the High Level Consumer generally but are thinking to use the SimpleConsumer on failure recovery to set the offsets. Is that the recommended approach for this use case? Thanks. -Suren On Thursday, February 19, 2015 9:40 AM, Joel Koshy jjkosh...@gmail.com wrote: Are you using it from Java or Scala? i.e., are you using the javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer In 0.8.2 javaapi we explicitly set version 0 of the OffsetCommitRequest/OffsetFetchRequest which means it will commit/fetch to/from ZooKeeper only. If you use the scala API you can create an OffsetCommitRequest with version set to 1 (which will allow you to commit to Kafka). Since we are doing an 0.8.2.1 release we will make the above more consistent. i.e., you can create OffsetCommitRequests with version 1 even from the javaapi. I will be updating the documentation on this to make it clearer. Thanks, Joel On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: Joel, Looking at SimpleConsumer in the 0.8.2 code, it is using OffsetCommitRequest and sending that over to a broker. Is the broker storing that in ZK? -Suren On Tuesday, February 17, 2015 12:22 PM, Joel Koshy jjkosh...@gmail.com wrote: Hi Chris, In 0.8.2, the simple consumer Java API supports committing/fetching offsets that are stored in ZooKeeper. You don't need to issue any ConsumerMetadataRequest for this. Unfortunately, the API currently does not support fetching offsets that are stored in Kafka. Thanks, Joel On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: Hi, I am still using 0.8.1.1 because of the CPU use concerns. I'm confused about why the SimpleConsumer has: OffsetCommitResponse commitOffsets(OffsetCommitRequest request) and OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) but no way that I can see to issue a ConsumerMetadataRequest, which is what I think when restarting my consumers so that they can begin working where they last left off (in the event that they were stopped for a while then restarted some time later, and new messages had come in). The fetchOffsets() works on time, usually it looks like you send it Earliest or Latest (beginning or end of what's currently in the stream). I realize the documentation says this: *Downsides of using SimpleConsumer*The SimpleConsumer does require a significant amount of work not needed in the Consumer Groups: 1. You must keep track of the offsets in your application to know where you left off consuming. But that's not really quite true ... not as long as commitOffsets() has been provided. It seems the SimpleConsumer provides you with a solution to only one half of the problem of offset management. Using some zookeeper python scripts I wrote I can see that the commitOffsets() is doing its job and writing to /consumers/myGroupId/offsets/myTopic/0 That has this value: ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, ctime=1423777630972, mtime=1424122117397, version=12568262, cversion=0, aversion=0, ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679)) Now the question is just how to retrieve that - do I really have to have my client connect to ZK directly? If that's the case, future upgrades would break (e.g. 0.8.2 having its own storage for commit watermarks). What was the intent here, and what's the advice on how to proceed being that 0.8.2 is in an iffy state right now? --Chris
Re: Simple Consumer and offsets
Not sure what you mean by using the SimpleConsumer on failure recovery. Can you elaborate on this? On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote: Haven't used either one now. Sounds like 0.8.2.1 will help. We are using the High Level Consumer generally but are thinking to use the SimpleConsumer on failure recovery to set the offsets. Is that the recommended approach for this use case? Thanks. -Suren On Thursday, February 19, 2015 9:40 AM, Joel Koshy jjkosh...@gmail.com wrote: Are you using it from Java or Scala? i.e., are you using the javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer In 0.8.2 javaapi we explicitly set version 0 of the OffsetCommitRequest/OffsetFetchRequest which means it will commit/fetch to/from ZooKeeper only. If you use the scala API you can create an OffsetCommitRequest with version set to 1 (which will allow you to commit to Kafka). Since we are doing an 0.8.2.1 release we will make the above more consistent. i.e., you can create OffsetCommitRequests with version 1 even from the javaapi. I will be updating the documentation on this to make it clearer. Thanks, Joel On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: Joel, Looking at SimpleConsumer in the 0.8.2 code, it is using OffsetCommitRequest and sending that over to a broker. Is the broker storing that in ZK? -Suren On Tuesday, February 17, 2015 12:22 PM, Joel Koshy jjkosh...@gmail.com wrote: Hi Chris, In 0.8.2, the simple consumer Java API supports committing/fetching offsets that are stored in ZooKeeper. You don't need to issue any ConsumerMetadataRequest for this. Unfortunately, the API currently does not support fetching offsets that are stored in Kafka. Thanks, Joel On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: Hi, I am still using 0.8.1.1 because of the CPU use concerns. I'm confused about why the SimpleConsumer has: OffsetCommitResponse commitOffsets(OffsetCommitRequest request) and OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) but no way that I can see to issue a ConsumerMetadataRequest, which is what I think when restarting my consumers so that they can begin working where they last left off (in the event that they were stopped for a while then restarted some time later, and new messages had come in). The fetchOffsets() works on time, usually it looks like you send it Earliest or Latest (beginning or end of what's currently in the stream). I realize the documentation says this: *Downsides of using SimpleConsumer*The SimpleConsumer does require a significant amount of work not needed in the Consumer Groups: 1. You must keep track of the offsets in your application to know where you left off consuming. But that's not really quite true ... not as long as commitOffsets() has been provided. It seems the SimpleConsumer provides you with a solution to only one half of the problem of offset management. Using some zookeeper python scripts I wrote I can see that the commitOffsets() is doing its job and writing to /consumers/myGroupId/offsets/myTopic/0 That has this value: ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, ctime=1423777630972, mtime=1424122117397, version=12568262, cversion=0, aversion=0, ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679)) Now the question is just how to retrieve that - do I really have to have my client connect to ZK directly? If that's the case, future upgrades would break (e.g. 0.8.2 having its own storage for commit watermarks). What was the intent here, and what's the advice on how to proceed being that 0.8.2 is in an iffy state right now? --Chris
Re: Simple Consumer and offsets
I see - yes, you can use the SimpleConsumer for that. However, your high-level consumers need to be shutdown while you do that (otherwise they may auto-commit while you are resetting offsets). Thanks, Joel On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote: We are using the High Level Consumer API to interact with Kafka for our normal use cases. However, on consumer restart in the case of consumer failures, we want to be able to manually reset offsets in certain situations. And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-) It looked like instantiating a SimpleConsumer just to reset offsets on restart was a viable option, while continuing to use the High Level Consumer for our normal operations. Not sure if there is a better way that is compatible across 0.8.1 and 0.8.2. -Suren On Thursday, February 19, 2015 10:25 AM, Joel Koshy jjkosh...@gmail.com wrote: Not sure what you mean by using the SimpleConsumer on failure recovery. Can you elaborate on this? On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote: Haven't used either one now. Sounds like 0.8.2.1 will help. We are using the High Level Consumer generally but are thinking to use the SimpleConsumer on failure recovery to set the offsets. Is that the recommended approach for this use case? Thanks. -Suren On Thursday, February 19, 2015 9:40 AM, Joel Koshy jjkosh...@gmail.com wrote: Are you using it from Java or Scala? i.e., are you using the javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer In 0.8.2 javaapi we explicitly set version 0 of the OffsetCommitRequest/OffsetFetchRequest which means it will commit/fetch to/from ZooKeeper only. If you use the scala API you can create an OffsetCommitRequest with version set to 1 (which will allow you to commit to Kafka). Since we are doing an 0.8.2.1 release we will make the above more consistent. i.e., you can create OffsetCommitRequests with version 1 even from the javaapi. I will be updating the documentation on this to make it clearer. Thanks, Joel On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: Joel, Looking at SimpleConsumer in the 0.8.2 code, it is using OffsetCommitRequest and sending that over to a broker. Is the broker storing that in ZK? -Suren On Tuesday, February 17, 2015 12:22 PM, Joel Koshy jjkosh...@gmail.com wrote: Hi Chris, In 0.8.2, the simple consumer Java API supports committing/fetching offsets that are stored in ZooKeeper. You don't need to issue any ConsumerMetadataRequest for this. Unfortunately, the API currently does not support fetching offsets that are stored in Kafka. Thanks, Joel On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: Hi, I am still using 0.8.1.1 because of the CPU use concerns. I'm confused about why the SimpleConsumer has: OffsetCommitResponse commitOffsets(OffsetCommitRequest request) and OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) but no way that I can see to issue a ConsumerMetadataRequest, which is what I think when restarting my consumers so that they can begin working where they last left off (in the event that they were stopped for a while then restarted some time later, and new messages had come in). The fetchOffsets() works on time, usually it looks like you send it Earliest or Latest (beginning or end of what's currently in the stream). I realize the documentation says this: *Downsides of using SimpleConsumer*The SimpleConsumer does require a significant amount of work not needed in the Consumer Groups: 1. You must keep track of the offsets in your application to know where you left off consuming. But that's not really quite true ... not as long as commitOffsets() has been provided. It seems the SimpleConsumer provides you with a solution to only one half of the problem of offset management. Using some zookeeper python scripts I wrote I can see that the commitOffsets() is doing its job and writing to /consumers/myGroupId/offsets/myTopic/0 That has this value: ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, ctime=1423777630972, mtime=1424122117397, version=12568262, cversion=0, aversion=0, ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679)) Now the question is just how to retrieve that - do I really have to have my client connect to ZK directly? If that's the case, future upgrades would break (e.g. 0.8.2 having its own storage for commit watermarks). What was the intent here, and what's the advice on how to proceed being that 0.8.2 is in an iffy state right now? --Chris
big cpu jump on producer in face of broker outage
I think this is an issue caused by KAFKA-1788. I was trying to test producer resiliency to broker outage. In this experiment, I shutdown all brokers and see how producer behavior. Here are the observations 1) kafka producer can recover from kafka outage. i.e. send resumed after brokers came back 2) producer instance saw big cpu jump during outage. 28% - 52% in one test. Note that I didn't observe cpu issue when new producer instance started with brokers outage. In this case, there are no messages accumulated in the buffer, because KafkaProducer constructor failed with DNS lookup for route53 name. when brokers came up, my wrapper re-created KafkaProducer object and recover from outage with sending messages. Here is the cpu graph for a running producer instance where broker outage happened in the middle of test run. it shows cpu problem. https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing Here is the cpu graph for a new producer instance where broker outage happened before instance startup. cpu is good here. https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing Note that producer is a 4-core m1.xlarge instance. x-axis is time, y-axis is cpu util. Thanks, Steven
Re: big cpu jump on producer in face of broker outage
forgot to mention in case it matters producer: 0.8.2-beta broker: 0.8.1.1 On Thu, Feb 19, 2015 at 10:34 AM, Steven Wu stevenz...@gmail.com wrote: I think this is an issue caused by KAFKA-1788. I was trying to test producer resiliency to broker outage. In this experiment, I shutdown all brokers and see how producer behavior. Here are the observations 1) kafka producer can recover from kafka outage. i.e. send resumed after brokers came back 2) producer instance saw big cpu jump during outage. 28% - 52% in one test. Note that I didn't observe cpu issue when new producer instance started with brokers outage. In this case, there are no messages accumulated in the buffer, because KafkaProducer constructor failed with DNS lookup for route53 name. when brokers came up, my wrapper re-created KafkaProducer object and recover from outage with sending messages. Here is the cpu graph for a running producer instance where broker outage happened in the middle of test run. it shows cpu problem. https://docs.google.com/drawings/d/1FdEg9-Rf_jbDZX0cC3iZ834c4m-5rqgK-41lSS6VudQ/edit?usp=sharing Here is the cpu graph for a new producer instance where broker outage happened before instance startup. cpu is good here. https://docs.google.com/drawings/d/1NmOdwp79DKHE7kJeskBm411ln6QczAMfmcWeijvZQRQ/edit?usp=sharing Note that producer is a 4-core m1.xlarge instance. x-axis is time, y-axis is cpu util. Thanks, Steven
Re: Consuming a snapshot from log compacted topic
If I consumed up to the log end offset and log compaction happens in between, I would have missed some messages. Compaction actually only runs on the rolled over segments (not the active - i.e., latest segment). The log-end-offset will be in the latest segment which does not participate in compaction. The log end offset is just the end of the committed messages in the log (the last thing the consumer has access to). It isn't the same as the cleaner point but is always later than it so it would work just as well. Isn't this just roughly the same value as using c.getOffsetsBefore() with a partitionRequestTime of -1? Although its always later than the cleaner point, surely log compaction is still an issue here. If I consumed up to the log end offset and log compaction happens in between, I would have missed some messages. My thinking was that if you knew the log cleaner point, you could: Make a note of the starting offset Consume till end of log Check my starting point is ahead of current cleaner point, otherwise loop. I appreciate there is a chance I misunderstood your point. On 19 February 2015 at 18:02, Jay Kreps jay.kr...@gmail.com wrote: The log end offset is just the end of the committed messages in the log (the last thing the consumer has access to). It isn't the same as the cleaner point but is always later than it so it would work just as well. -Jay On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell w.f.funn...@gmail.com wrote: I'm not sure if I misunderstood Jay's suggestion, but I think it is along the lines of: we expose the log-end-offset (actually the high watermark) of the partition in the fetch response. However, this is not exposed to the consumer (either in the new ConsumerRecord class or the existing MessageAndMetadata class). If we did, then if you were to consume a record you can check that it has offsets up to the log-end offset. If it does then you would know for sure that you have consumed everything for that partition To confirm then, the log-end-offset is the same as the cleaner point? On 19 February 2015 at 03:10, Jay Kreps jay.kr...@gmail.com wrote: Yeah I was thinking either along the lines Joel was suggesting or else adding a logEndOffset(TopicPartition) method or something like that. As Joel says the consumer actually has this information internally (we return it with the fetch request) but doesn't expose it. -Jay On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy jjkosh...@gmail.com wrote: 2. Make the log end offset available more easily in the consumer. Was thinking something would need to be added in LogCleanerManager, in the updateCheckpoints function. Where would be best to publish the information to make it more easily available, or would you just expose the offset-cleaner-checkpoint file as it is? Is it right you would also need to know which offset-cleaner-checkpoint entry related to each active partition? I'm not sure if I misunderstood Jay's suggestion, but I think it is along the lines of: we expose the log-end-offset (actually the high watermark) of the partition in the fetch response. However, this is not exposed to the consumer (either in the new ConsumerRecord class or the existing MessageAndMetadata class). If we did, then if you were to consume a record you can check that it has offsets up to the log-end offset. If it does then you would know for sure that you have consumed everything for that partition. Yes, was looking at this initially, but as we have 100-150 writes per second, it could be a while before there is a pause long enough to check it has caught up. Even with the consumer timeout set to -1, it takes some time to query the max offset values, which is still long enough for more messages to arrive. Got it - thanks for clarifying. On 18 February 2015 at 23:16, Joel Koshy jjkosh...@gmail.com wrote: You are also correct and perceptive to notice that if you check the end of the log then begin consuming and read up to that point compaction may have already kicked in (if the reading takes a while) and hence you might have an incomplete snapshot. Isn't it sufficient to just repeat the check at the end after reading the log and repeat until you are truly done? At least for the purposes of a snapshot? On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote: If you catch up off a compacted topic and keep consuming then you will become consistent with the log. I think what you are saying is that you want to create a snapshot from the
Re: Broker w/ high memory due to index file sizes
Well are there any measurement techniques for Memory config in brokers. We do have a large load, with a max throughput 200MB/s. What do you suggest as the recommended memory config for 5 brokers to handle such loads? On Wed, Feb 18, 2015 at 7:13 PM, Jay Kreps jay.kr...@gmail.com wrote: 40G is really huge, generally you would want more like 4G. Are you sure you need that? Not sure what you mean by lsof and index files being too large, but the index files are memory mapped so they should be able to grow arbitrarily large and their memory usage is not counted in the java heap (in fact by having such a large heap you are taking away OS memory from them). -Jay On Wed, Feb 18, 2015 at 4:13 PM, Zakee kzak...@netzero.net wrote: I am running a cluster of 5 brokers with 40G ms/mx for each. I found one of the brokers is constantly using above ~90% of memory for jvm.heapUsage. I checked from lsof output that the size of the index files for this broker is too large. Not sure what is going on with this one broker in the cluster? Why would the index file sizes be so hugely different on one broker? Any ideas? Regards Zakee Invest with the Trend Exclusive Breakout Alert On Soaring Social Media Technology http://thirdpartyoffers.netzero.net/TGL3231/54e52a9fe121d2a9f4a27st01vuc Have you been injured? Get a free evaluation today to see what your injury case is worth. http://thirdpartyoffers.netzero.net/TGL3255/54e55ad9894265ad90bcbmp13duc
Broker ID disappears in Zookeeper
Hello, We're having the following issue with Kafka and/or Zookeeper: If a broker (id=1) is running, and you start another broker with id=1, the new broker will exit saying A broker is already registered on the path /brokers/ids/1. However, I noticed when I query zookeeper /brokers/ids/1 disappears This behaviour doesn't make sense to us. The concern is that if we accidentally start up multiple brokers with the same ID (automatic restarts), then we may end up with multiple brokers with the same ID running at the same time. Thoughts? Kafka: 0.8.2 Zookeeper: 3.4.5
Re: Consuming a snapshot from log compacted topic
The log end offset is just the end of the committed messages in the log (the last thing the consumer has access to). It isn't the same as the cleaner point but is always later than it so it would work just as well. Isn't this just roughly the same value as using c.getOffsetsBefore() with a partitionRequestTime of -1? Although its always later than the cleaner point, surely log compaction is still an issue here. If I consumed up to the log end offset and log compaction happens in between, I would have missed some messages. My thinking was that if you knew the log cleaner point, you could: Make a note of the starting offset Consume till end of log Check my starting point is ahead of current cleaner point, otherwise loop. I appreciate there is a chance I misunderstood your point. On 19 February 2015 at 18:02, Jay Kreps jay.kr...@gmail.com wrote: The log end offset is just the end of the committed messages in the log (the last thing the consumer has access to). It isn't the same as the cleaner point but is always later than it so it would work just as well. -Jay On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell w.f.funn...@gmail.com wrote: I'm not sure if I misunderstood Jay's suggestion, but I think it is along the lines of: we expose the log-end-offset (actually the high watermark) of the partition in the fetch response. However, this is not exposed to the consumer (either in the new ConsumerRecord class or the existing MessageAndMetadata class). If we did, then if you were to consume a record you can check that it has offsets up to the log-end offset. If it does then you would know for sure that you have consumed everything for that partition To confirm then, the log-end-offset is the same as the cleaner point? On 19 February 2015 at 03:10, Jay Kreps jay.kr...@gmail.com wrote: Yeah I was thinking either along the lines Joel was suggesting or else adding a logEndOffset(TopicPartition) method or something like that. As Joel says the consumer actually has this information internally (we return it with the fetch request) but doesn't expose it. -Jay On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy jjkosh...@gmail.com wrote: 2. Make the log end offset available more easily in the consumer. Was thinking something would need to be added in LogCleanerManager, in the updateCheckpoints function. Where would be best to publish the information to make it more easily available, or would you just expose the offset-cleaner-checkpoint file as it is? Is it right you would also need to know which offset-cleaner-checkpoint entry related to each active partition? I'm not sure if I misunderstood Jay's suggestion, but I think it is along the lines of: we expose the log-end-offset (actually the high watermark) of the partition in the fetch response. However, this is not exposed to the consumer (either in the new ConsumerRecord class or the existing MessageAndMetadata class). If we did, then if you were to consume a record you can check that it has offsets up to the log-end offset. If it does then you would know for sure that you have consumed everything for that partition. Yes, was looking at this initially, but as we have 100-150 writes per second, it could be a while before there is a pause long enough to check it has caught up. Even with the consumer timeout set to -1, it takes some time to query the max offset values, which is still long enough for more messages to arrive. Got it - thanks for clarifying. On 18 February 2015 at 23:16, Joel Koshy jjkosh...@gmail.com wrote: You are also correct and perceptive to notice that if you check the end of the log then begin consuming and read up to that point compaction may have already kicked in (if the reading takes a while) and hence you might have an incomplete snapshot. Isn't it sufficient to just repeat the check at the end after reading the log and repeat until you are truly done? At least for the purposes of a snapshot? On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote: If you catch up off a compacted topic and keep consuming then you will become consistent with the log. I think what you are saying is that you want to create a snapshot from the Kafka topic but NOT do continual reads after that point. For example you might be creating a backup of the data to a file. I agree that this isn't as easy as it could be. As you say the only solution we have is that timeout which doesn't differentiate between GC stall in your process and no more messages left so you would need to tune the timeout. This is admittedly kind
About Symantec's encryption-thru-Kafka proof of concept
Hi Folks, At the recent Kafka Meetup in Mountain View there was interest expressed about the encryption through Kafka proof of concept that Symantec did a few months ago, so I have created a blog post with some details about it. You can find that here: http://goo.gl/sjYGWN Let me know if you have any thoughts or questions. Thanks, Jim -- Jim Hoagland, Ph.D. Sr. Principal Software Engineer Big Data Analytics Team Cloud Platform Engineering Symantec Corporation http://cpe.symantec.com
Re: Default MirrorMaker not copying over from source to target
Looks like you only have 4 messages in your topic and no more messages got sent 2015-02-19 20:09:34,661] DEBUG initial fetch offset of consolemm:0: fetched offset = 4: consumed offset = 4 is 4 (kafka.consumer.PartitionTopicInfo You can try sending more messages to topic or give the MM a different consumer group id and set auto.offset.reset=smallest On Friday, February 20, 2015, Alex Melville amelvi...@g.hmc.edu wrote: Tao, I updated the mirrorconsumer.properties config file as you suggested, and upped the MM's log level to DEBUG. I have the output of the DEBUG logger here in this pastebin, if you could take a minute to look for anything in its contents that would indicate a problem that would be extremely helpful. Note that my servers hostnames are of the form ad-010X or ba-0X where X is some integer between 1 and 4. http://pastebin.com/rBsxx15A When I run the mirrormaker and then spin up a console consumer to read from the source cluster, I get 0 messages consumed. Alex On Sun, Feb 15, 2015 at 3:00 AM, tao xiao xiaotao...@gmail.com javascript:; wrote: Alex, Are you sure you have data continually being sent to the topic in source cluster after you bring up MM? By default auto.offset.reset=largest in MM consumer config which means MM only fetches the largest offset if the consumer group has no initial offset in zookeeper. You can have MM print more log by changing the log level in config/tools-log4j.properties On Sun, Feb 15, 2015 at 8:39 AM, Alex Melville amelvi...@g.hmc.edu javascript:; wrote: Hi Kafka'ers, I am trying to get the Mirrormaker working with two separate clusters, one as the source and the other as the target. The topic I'm trying to copy over exists on both the source and target clusters. Here are the relevant entries in my consumer and producer properties files, which I'm specifying the command I run to start the MM: *mirrorconsumer.properties:* zookeeper.connect=ad-0104:2181 zookeeper.connection.timeout.ms=6000 group.id=test-consumer-group *mirrorproducer.properties:* metadata.broker.list=ba-02:9092,ba-03:9092 producer.type=sync compression.codec=none serializer.class=kafka.serializer.DefaultEncoder Then I run the following command: bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config ../config/mirrorconsumer.properties --producer.config ../config/mirrorproducer.properties --whitelist consolemm so consolemm is the topic I'm trying to copy over. I've created consolemm and have used to console-consumer to verify that there are messages in the topic. When I run this command... nothing happens. The process keeps running and prints nothing to the Terminal. If I look in the output of the zookeeper on the source cluster I get only the following: [2015-02-15 00:34:06,102] INFO Accepted socket connection from / 10.7.162.75:42819 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2015-02-15 00:34:06,104] INFO Client attempting to establish new session at /10.7.162.75:42819 (org.apache.zookeeper.server.ZooKeeperServer) [2015-02-15 00:34:06,106] INFO Established session 0x14b668b0fbe0033 with negotiated timeout 6000 for client /10.7.162.75:42819 (org.apache.zookeeper.server.ZooKeeperServer) and when I look at the output of one of the brokers on the source cluster I get: [2015-02-15 00:32:14,382] INFO Closing socket connection to / 10.7.162.75 . (kafka.network.Processor) and there is no output on the zookeeper on the target cluster. Any advice on what is causing MM to not properly copy over data to the target cluster would be extremely helpful. -Alex -- Regards, Tao -- Regards, Tao
RE: Simple Consumer and offsets
If I may use the same thread to discuss the exact same issue Assuming one can store the offset in an external location (redis/db etc), along with the rest of the state that a program requires, wouldn't it be possible to manage things such that, you use the High Level API with auto commit turned off and do your custom offset management followed by the kafka commit api call (probably delayed to give a breather to zookeeper)? That way in the failure scenario, the high level consumer offset would ALWAYS be only smaller than what is actually valid and you can skip forward and avoid using the simple consumer. I assume one needs the simple consumer in the offset management use case, only we want to skip back to an older offset / use Kafka for storing offsets? I was trying to handle the customer failure scenario but avoiding the simple consumer and all the complexities it ensues. Does this work or is there anything wrong with this picture? Thanks Arun On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote: We are using the High Level Consumer API to interact with Kafka for our normal use cases. However, on consumer restart in the case of consumer failures, we want to be able to manually reset offsets in certain situations. And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-) It looked like instantiating a SimpleConsumer just to reset offsets on restart was a viable option, while continuing to use the High Level Consumer for our normal operations. Not sure if there is a better way that is compatible across 0.8.1 and 0.8.2. -Suren On Thursday, February 19, 2015 10:25 AM, Joel Koshy jjkosh...@gmail.com wrote: Not sure what you mean by using the SimpleConsumer on failure recovery. Can you elaborate on this? On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote: Haven't used either one now. Sounds like 0.8.2.1 will help. We are using the High Level Consumer generally but are thinking to use the SimpleConsumer on failure recovery to set the offsets. Is that the recommended approach for this use case? Thanks. -Suren On Thursday, February 19, 2015 9:40 AM, Joel Koshy jjkosh...@gmail.com wrote: Are you using it from Java or Scala? i.e., are you using the javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer In 0.8.2 javaapi we explicitly set version 0 of the OffsetCommitRequest/OffsetFetchRequest which means it will commit/fetch to/from ZooKeeper only. If you use the scala API you can create an OffsetCommitRequest with version set to 1 (which will allow you to commit to Kafka). Since we are doing an 0.8.2.1 release we will make the above more consistent. i.e., you can create OffsetCommitRequests with version 1 even from the javaapi. I will be updating the documentation on this to make it clearer. Thanks, Joel On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: Joel, Looking at SimpleConsumer in the 0.8.2 code, it is using OffsetCommitRequest and sending that over to a broker. Is the broker storing that in ZK? -Suren On Tuesday, February 17, 2015 12:22 PM, Joel Koshy jjkosh...@gmail.com wrote: Hi Chris, In 0.8.2, the simple consumer Java API supports committing/fetching offsets that are stored in ZooKeeper. You don't need to issue any ConsumerMetadataRequest for this. Unfortunately, the API currently does not support fetching offsets that are stored in Kafka. Thanks, Joel On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: Hi, I am still using 0.8.1.1 because of the CPU use concerns. I'm confused about why the SimpleConsumer has: OffsetCommitResponse commitOffsets(OffsetCommitRequest request) and OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) but no way that I can see to issue a ConsumerMetadataRequest, which is what I think when restarting my consumers so that they can begin working where they last left off (in the event that they were stopped for a while then restarted some time later, and new messages had come in). The fetchOffsets() works on time, usually it looks like you send it Earliest or Latest (beginning or end of what's currently in the stream). I realize the documentation says this: *Downsides of using SimpleConsumer*The SimpleConsumer does require a significant amount of work not needed in the Consumer Groups: 1. You must keep track of the offsets in your application to know where you left off consuming. But that's not really quite true ... not as long as commitOffsets() has been provided. It seems the SimpleConsumer provides you with a solution to only one half of the problem of offset management. Using some zookeeper python scripts I
Re: New Consumer Offset management in 0.8.2
Yes it is supported in 0.8.2-beta. It is documented on the site - you will need to set offsets.storage to kafka. On Thu, Feb 19, 2015 at 03:57:31PM -0500, Matthew Butt wrote: I'm having a hard time figuring out if the new Kafka-based offset management in the high-level Scala Consumer is implemented in the current version of 0.8.2-beta. If I implement a high-level consumer, will it use the new system, or will it still be storing in zookeeper? Do I need to wait for the Java consumer to take advantage of it? -- - Matt
Re: Simple Consumer and offsets
Haven't used either one now. Sounds like 0.8.2.1 will help. We are using the High Level Consumer generally but are thinking to use the SimpleConsumer on failure recovery to set the offsets. Is that the recommended approach for this use case? Thanks. -Suren On Thursday, February 19, 2015 9:40 AM, Joel Koshy jjkosh...@gmail.com wrote: Are you using it from Java or Scala? i.e., are you using the javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer In 0.8.2 javaapi we explicitly set version 0 of the OffsetCommitRequest/OffsetFetchRequest which means it will commit/fetch to/from ZooKeeper only. If you use the scala API you can create an OffsetCommitRequest with version set to 1 (which will allow you to commit to Kafka). Since we are doing an 0.8.2.1 release we will make the above more consistent. i.e., you can create OffsetCommitRequests with version 1 even from the javaapi. I will be updating the documentation on this to make it clearer. Thanks, Joel On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: Joel, Looking at SimpleConsumer in the 0.8.2 code, it is using OffsetCommitRequest and sending that over to a broker. Is the broker storing that in ZK? -Suren On Tuesday, February 17, 2015 12:22 PM, Joel Koshy jjkosh...@gmail.com wrote: Hi Chris, In 0.8.2, the simple consumer Java API supports committing/fetching offsets that are stored in ZooKeeper. You don't need to issue any ConsumerMetadataRequest for this. Unfortunately, the API currently does not support fetching offsets that are stored in Kafka. Thanks, Joel On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: Hi, I am still using 0.8.1.1 because of the CPU use concerns. I'm confused about why the SimpleConsumer has: OffsetCommitResponse commitOffsets(OffsetCommitRequest request) and OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) but no way that I can see to issue a ConsumerMetadataRequest, which is what I think when restarting my consumers so that they can begin working where they last left off (in the event that they were stopped for a while then restarted some time later, and new messages had come in). The fetchOffsets() works on time, usually it looks like you send it Earliest or Latest (beginning or end of what's currently in the stream). I realize the documentation says this: *Downsides of using SimpleConsumer*The SimpleConsumer does require a significant amount of work not needed in the Consumer Groups: 1. You must keep track of the offsets in your application to know where you left off consuming. But that's not really quite true ... not as long as commitOffsets() has been provided. It seems the SimpleConsumer provides you with a solution to only one half of the problem of offset management. Using some zookeeper python scripts I wrote I can see that the commitOffsets() is doing its job and writing to /consumers/myGroupId/offsets/myTopic/0 That has this value: ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, ctime=1423777630972, mtime=1424122117397, version=12568262, cversion=0, aversion=0, ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679)) Now the question is just how to retrieve that - do I really have to have my client connect to ZK directly? If that's the case, future upgrades would break (e.g. 0.8.2 having its own storage for commit watermarks). What was the intent here, and what's the advice on how to proceed being that 0.8.2 is in an iffy state right now? --Chris
Re: Simple Consumer and offsets
Are you using it from Java or Scala? i.e., are you using the javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer In 0.8.2 javaapi we explicitly set version 0 of the OffsetCommitRequest/OffsetFetchRequest which means it will commit/fetch to/from ZooKeeper only. If you use the scala API you can create an OffsetCommitRequest with version set to 1 (which will allow you to commit to Kafka). Since we are doing an 0.8.2.1 release we will make the above more consistent. i.e., you can create OffsetCommitRequests with version 1 even from the javaapi. I will be updating the documentation on this to make it clearer. Thanks, Joel On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: Joel, Looking at SimpleConsumer in the 0.8.2 code, it is using OffsetCommitRequest and sending that over to a broker. Is the broker storing that in ZK? -Suren On Tuesday, February 17, 2015 12:22 PM, Joel Koshy jjkosh...@gmail.com wrote: Hi Chris, In 0.8.2, the simple consumer Java API supports committing/fetching offsets that are stored in ZooKeeper. You don't need to issue any ConsumerMetadataRequest for this. Unfortunately, the API currently does not support fetching offsets that are stored in Kafka. Thanks, Joel On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: Hi, I am still using 0.8.1.1 because of the CPU use concerns. I'm confused about why the SimpleConsumer has: OffsetCommitResponse commitOffsets(OffsetCommitRequest request) and OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) but no way that I can see to issue a ConsumerMetadataRequest, which is what I think when restarting my consumers so that they can begin working where they last left off (in the event that they were stopped for a while then restarted some time later, and new messages had come in). The fetchOffsets() works on time, usually it looks like you send it Earliest or Latest (beginning or end of what's currently in the stream). I realize the documentation says this: *Downsides of using SimpleConsumer*The SimpleConsumer does require a significant amount of work not needed in the Consumer Groups: 1. You must keep track of the offsets in your application to know where you left off consuming. But that's not really quite true ... not as long as commitOffsets() has been provided. It seems the SimpleConsumer provides you with a solution to only one half of the problem of offset management. Using some zookeeper python scripts I wrote I can see that the commitOffsets() is doing its job and writing to /consumers/myGroupId/offsets/myTopic/0 That has this value: ('32757408', ZnodeStat(czxid=2211679, mzxid=14779964, ctime=1423777630972, mtime=1424122117397, version=12568262, cversion=0, aversion=0, ephemeralOwner=0, dataLength=8, numChildren=0, pzxid=2211679)) Now the question is just how to retrieve that - do I really have to have my client connect to ZK directly? If that's the case, future upgrades would break (e.g. 0.8.2 having its own storage for commit watermarks). What was the intent here, and what's the advice on how to proceed being that 0.8.2 is in an iffy state right now? --Chris
Re: Consuming a snapshot from log compacted topic
I'm not sure if I misunderstood Jay's suggestion, but I think it is along the lines of: we expose the log-end-offset (actually the high watermark) of the partition in the fetch response. However, this is not exposed to the consumer (either in the new ConsumerRecord class or the existing MessageAndMetadata class). If we did, then if you were to consume a record you can check that it has offsets up to the log-end offset. If it does then you would know for sure that you have consumed everything for that partition To confirm then, the log-end-offset is the same as the cleaner point? On 19 February 2015 at 03:10, Jay Kreps jay.kr...@gmail.com wrote: Yeah I was thinking either along the lines Joel was suggesting or else adding a logEndOffset(TopicPartition) method or something like that. As Joel says the consumer actually has this information internally (we return it with the fetch request) but doesn't expose it. -Jay On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy jjkosh...@gmail.com wrote: 2. Make the log end offset available more easily in the consumer. Was thinking something would need to be added in LogCleanerManager, in the updateCheckpoints function. Where would be best to publish the information to make it more easily available, or would you just expose the offset-cleaner-checkpoint file as it is? Is it right you would also need to know which offset-cleaner-checkpoint entry related to each active partition? I'm not sure if I misunderstood Jay's suggestion, but I think it is along the lines of: we expose the log-end-offset (actually the high watermark) of the partition in the fetch response. However, this is not exposed to the consumer (either in the new ConsumerRecord class or the existing MessageAndMetadata class). If we did, then if you were to consume a record you can check that it has offsets up to the log-end offset. If it does then you would know for sure that you have consumed everything for that partition. Yes, was looking at this initially, but as we have 100-150 writes per second, it could be a while before there is a pause long enough to check it has caught up. Even with the consumer timeout set to -1, it takes some time to query the max offset values, which is still long enough for more messages to arrive. Got it - thanks for clarifying. On 18 February 2015 at 23:16, Joel Koshy jjkosh...@gmail.com wrote: You are also correct and perceptive to notice that if you check the end of the log then begin consuming and read up to that point compaction may have already kicked in (if the reading takes a while) and hence you might have an incomplete snapshot. Isn't it sufficient to just repeat the check at the end after reading the log and repeat until you are truly done? At least for the purposes of a snapshot? On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote: If you catch up off a compacted topic and keep consuming then you will become consistent with the log. I think what you are saying is that you want to create a snapshot from the Kafka topic but NOT do continual reads after that point. For example you might be creating a backup of the data to a file. I agree that this isn't as easy as it could be. As you say the only solution we have is that timeout which doesn't differentiate between GC stall in your process and no more messages left so you would need to tune the timeout. This is admittedly kind of a hack. You are also correct and perceptive to notice that if you check the end of the log then begin consuming and read up to that point compaction may have already kicked in (if the reading takes a while) and hence you might have an incomplete snapshot. I think there are two features we could add that would make this easier: 1. Make the cleaner point configurable on a per-topic basis. This feature would allow you to control how long the full log is retained and when compaction can kick in. This would give a configurable SLA for the reader process to catch up. 2. Make the log end offset available more easily in the consumer. -Jay On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell w.f.funn...@gmail.com wrote: We are currently using Kafka 0.8.1.1 with log compaction in order to provide streams of messages to our clients. As well as constantly consuming the stream, one of our use cases is to provide a snapshot, meaning the user will receive a copy of every message at least once. Each one of these messages represents an item of content in our system. The problem comes when determining if the client has actually reached the end of the topic. The
Re: Consuming a snapshot from log compacted topic
The log end offset is just the end of the committed messages in the log (the last thing the consumer has access to). It isn't the same as the cleaner point but is always later than it so it would work just as well. -Jay On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell w.f.funn...@gmail.com wrote: I'm not sure if I misunderstood Jay's suggestion, but I think it is along the lines of: we expose the log-end-offset (actually the high watermark) of the partition in the fetch response. However, this is not exposed to the consumer (either in the new ConsumerRecord class or the existing MessageAndMetadata class). If we did, then if you were to consume a record you can check that it has offsets up to the log-end offset. If it does then you would know for sure that you have consumed everything for that partition To confirm then, the log-end-offset is the same as the cleaner point? On 19 February 2015 at 03:10, Jay Kreps jay.kr...@gmail.com wrote: Yeah I was thinking either along the lines Joel was suggesting or else adding a logEndOffset(TopicPartition) method or something like that. As Joel says the consumer actually has this information internally (we return it with the fetch request) but doesn't expose it. -Jay On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy jjkosh...@gmail.com wrote: 2. Make the log end offset available more easily in the consumer. Was thinking something would need to be added in LogCleanerManager, in the updateCheckpoints function. Where would be best to publish the information to make it more easily available, or would you just expose the offset-cleaner-checkpoint file as it is? Is it right you would also need to know which offset-cleaner-checkpoint entry related to each active partition? I'm not sure if I misunderstood Jay's suggestion, but I think it is along the lines of: we expose the log-end-offset (actually the high watermark) of the partition in the fetch response. However, this is not exposed to the consumer (either in the new ConsumerRecord class or the existing MessageAndMetadata class). If we did, then if you were to consume a record you can check that it has offsets up to the log-end offset. If it does then you would know for sure that you have consumed everything for that partition. Yes, was looking at this initially, but as we have 100-150 writes per second, it could be a while before there is a pause long enough to check it has caught up. Even with the consumer timeout set to -1, it takes some time to query the max offset values, which is still long enough for more messages to arrive. Got it - thanks for clarifying. On 18 February 2015 at 23:16, Joel Koshy jjkosh...@gmail.com wrote: You are also correct and perceptive to notice that if you check the end of the log then begin consuming and read up to that point compaction may have already kicked in (if the reading takes a while) and hence you might have an incomplete snapshot. Isn't it sufficient to just repeat the check at the end after reading the log and repeat until you are truly done? At least for the purposes of a snapshot? On Wed, Feb 18, 2015 at 02:21:49PM -0800, Jay Kreps wrote: If you catch up off a compacted topic and keep consuming then you will become consistent with the log. I think what you are saying is that you want to create a snapshot from the Kafka topic but NOT do continual reads after that point. For example you might be creating a backup of the data to a file. I agree that this isn't as easy as it could be. As you say the only solution we have is that timeout which doesn't differentiate between GC stall in your process and no more messages left so you would need to tune the timeout. This is admittedly kind of a hack. You are also correct and perceptive to notice that if you check the end of the log then begin consuming and read up to that point compaction may have already kicked in (if the reading takes a while) and hence you might have an incomplete snapshot. I think there are two features we could add that would make this easier: 1. Make the cleaner point configurable on a per-topic basis. This feature would allow you to control how long the full log is retained and when compaction can kick in. This would give a configurable SLA for the reader process to catch up. 2. Make the log end offset available more easily in the consumer. -Jay On Wed, Feb 18, 2015 at 10:18 AM, Will Funnell w.f.funn...@gmail.com wrote: We are currently using Kafka 0.8.1.1 with log compaction in order to provide streams of
Re: data corruption like behavior
[2015-02-05 14:21:09,708] ERROR [ReplicaFetcherThread-2-1], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 147301; ClientId: ReplicaFetcherThread-2-1; ReplicaId: 3; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [site.db.people,6] - PartitionFetchInfo(0,1048576),[site.db.main,4] - PartitionFetchInfo(0,1048576),[site.db.school,7] - PartitionFetchInfo(0,1048576),[site.db.people,2] - PartitionFetchInfo(0,1048576),[k3.hydra,6] - PartitionFetchInfo(3,1048576),[site.db.school,3] - PartitionFetchInfo(0,1048576),[site.db.main,0] - PartitionFetchInfo(0,1048576),[site.db.cmphotos,2] - PartitionFetchInfo(2245,1048576),[site.db.cmphotos,6] - PartitionFetchInfo(2220,1048576) (kafka.server.ReplicaFetcherThread) java.net.ConnectException: Connection refused These were some of the errors from the server log. didnt find any on the producer side of things. On Thu, Feb 19, 2015 at 4:30 PM, Jun Rao j...@confluent.io wrote: Is there any error in the producer log? Is there any pattern in the messages being lost? Thanks, Jun On Thu, Feb 19, 2015 at 4:20 PM, Karts kartad...@gmail.com wrote: yes i did. On Thu, Feb 19, 2015 at 2:42 PM, Jun Rao j...@confluent.io wrote: Did you consume the messages from the beginning of the log? Thanks, Jun On Thu, Feb 19, 2015 at 12:18 PM, Karts kartad...@gmail.com wrote: but they have always been up. I mean when i was testing, all the zookeepers were up. and all the kafka nodes were up. its just that I changed the number of zookeeper nodes in my first test iteration. second and third were still the same. not sure why the topics were losing some messages. On Thu, Feb 19, 2015 at 11:39 AM, Jun Rao j...@confluent.io wrote: Zookeeper requires a majority of the nodes to be up for the service to be available. Kafka relies on Zookeeper to be always available. Thanks, Jun On Thu, Feb 19, 2015 at 11:15 AM, Karts kartad...@gmail.com wrote: I have noticed some strange patterns when testing with the 0.8.1 build and the 0.8.2 builds, and are listed below. 1. So I setup a brand new cluster [3 kafka nodes with 3 zookeepers], created 2 topics via the API calls, everything went fine and was successfully able to view my messages in my consumers. There were no messages lost. All is happy. Now, I change my setup to just have 1 zookeeper. and do my test again, i lose some messages. I have checked that all my configs are pointing to just 1 zookeeper and there was no mention of the other 2 offline zookeepers. any idea why ? 2. I revert back my settings to the original config, all 3 nodes are online, no errors, send messages to same old topic, and i am still loosing some messages. I deleted all the old topic files [to follow the 'cleanup' process], create a new topic, and i am successfully able to receive all messages. no loss whatsoever. 3. Now in this state, i upgrade to 0.8.2, and try sending messages to the topic that was made after the above cleanup, and i am losing messages again. Am i making sense? I mean this is a very strange behavior, and if anyone can comment on this [please correct me if i have done something 'very' wrong].. Thanks..
Re: Simple Consumer and offsets
Yeah that is a good point - will do the update as part of the doc changes in KAFKA-1729 On Thu, Feb 19, 2015 at 09:26:30PM -0500, Evan Huus wrote: On Thu, Feb 19, 2015 at 8:43 PM, Joel Koshy jjkosh...@gmail.com wrote: If you are using v0 of OffsetCommit/FetchRequest then you can issue that to any broker. For version 0 you will need to issue it to the coordinator. You can discover the coordinator by sending a ConsumerMetadataRequest to any broker. The protocol spec [1] still says Currently the supported version for all APIs is 0. Based on your message above that is no longer true, so could somebody familiar with the changes please update the spec appropriately? Thanks, Evan [1] https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol On Thu, Feb 19, 2015 at 07:55:16PM +, Suren wrote: Joel/All, The SimpleConsumer constructor requires a specific host and port. Can this be any broker? If it needs to be a specific broker, for 0.8.2, should this be the offset coordinator? For 0.8.1, does it matter? -Suren On Thursday, February 19, 2015 10:43 AM, Joel Koshy jjkosh...@gmail.com wrote: I see - yes, you can use the SimpleConsumer for that. However, your high-level consumers need to be shutdown while you do that (otherwise they may auto-commit while you are resetting offsets). Thanks, Joel On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote: We are using the High Level Consumer API to interact with Kafka for our normal use cases. However, on consumer restart in the case of consumer failures, we want to be able to manually reset offsets in certain situations. And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-) It looked like instantiating a SimpleConsumer just to reset offsets on restart was a viable option, while continuing to use the High Level Consumer for our normal operations. Not sure if there is a better way that is compatible across 0.8.1 and 0.8.2. -Suren On Thursday, February 19, 2015 10:25 AM, Joel Koshy jjkosh...@gmail.com wrote: Not sure what you mean by using the SimpleConsumer on failure recovery. Can you elaborate on this? On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote: Haven't used either one now. Sounds like 0.8.2.1 will help. We are using the High Level Consumer generally but are thinking to use the SimpleConsumer on failure recovery to set the offsets. Is that the recommended approach for this use case? Thanks. -Suren On Thursday, February 19, 2015 9:40 AM, Joel Koshy jjkosh...@gmail.com wrote: Are you using it from Java or Scala? i.e., are you using the javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer In 0.8.2 javaapi we explicitly set version 0 of the OffsetCommitRequest/OffsetFetchRequest which means it will commit/fetch to/from ZooKeeper only. If you use the scala API you can create an OffsetCommitRequest with version set to 1 (which will allow you to commit to Kafka). Since we are doing an 0.8.2.1 release we will make the above more consistent. i.e., you can create OffsetCommitRequests with version 1 even from the javaapi. I will be updating the documentation on this to make it clearer. Thanks, Joel On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: Joel, Looking at SimpleConsumer in the 0.8.2 code, it is using OffsetCommitRequest and sending that over to a broker. Is the broker storing that in ZK? -Suren On Tuesday, February 17, 2015 12:22 PM, Joel Koshy jjkosh...@gmail.com wrote: Hi Chris, In 0.8.2, the simple consumer Java API supports committing/fetching offsets that are stored in ZooKeeper. You don't need to issue any ConsumerMetadataRequest for this. Unfortunately, the API currently does not support fetching offsets that are stored in Kafka. Thanks, Joel On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: Hi, I am still using 0.8.1.1 because of the CPU use concerns. I'm confused about why the SimpleConsumer has: OffsetCommitResponse commitOffsets(OffsetCommitRequest request) and OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) but no way that I can see to issue a ConsumerMetadataRequest, which is what I think when restarting my consumers so that they can begin working where they last left off (in the event that they were stopped for a while then restarted some time later, and new messages had come in). The fetchOffsets() works on time, usually it looks like you send it
Re: NetworkProcessorAvgIdlePercent
Jun, I am already using the latest release 0.8.2.1. -Zakee On Thu, Feb 19, 2015 at 2:46 PM, Jun Rao j...@confluent.io wrote: Could you try the 0.8.2.1 release being voted on now? It fixes a CPU issue and should reduce the CPU load in network thread. Thanks, Jun On Thu, Feb 19, 2015 at 11:54 AM, Zakee kzak...@netzero.net wrote: Kafka documentation recommends 0.3 for above metric. I assume processor is busier if this goes below 0.3 and obviously it being 0.3 for long does not seem to be a good sign. What should be our criteria to raise an alert, I though it should be when its value goes below 0.3. However, the value seems to be below 0.3 a lot of the times, almost always if we take samples every five mins. What should be the threshold to raise an alarm ? What would be the impact of having this below 0.3 or even zero like most of the times? -Zakee How Old Men Tighten Skin 63 Year Old Man Shares DIY Skin Tightening Method You Can Do From Home http://thirdpartyoffers.netzero.net/TGL3231/54e63f5bda4c23f5b6560st02vuc 8% Annuity Return Secret Earn Guaranteed Income for Life! Compare Rates Today. http://thirdpartyoffers.netzero.net/TGL3255/54e6782bcbe78782b37bdmp15duc
Re: Simple Consumer and offsets
If you are using v0 of OffsetCommit/FetchRequest then you can issue that to any broker. For version 0 you will need to issue it to the coordinator. You can discover the coordinator by sending a ConsumerMetadataRequest to any broker. On Thu, Feb 19, 2015 at 07:55:16PM +, Suren wrote: Joel/All, The SimpleConsumer constructor requires a specific host and port. Can this be any broker? If it needs to be a specific broker, for 0.8.2, should this be the offset coordinator? For 0.8.1, does it matter? -Suren On Thursday, February 19, 2015 10:43 AM, Joel Koshy jjkosh...@gmail.com wrote: I see - yes, you can use the SimpleConsumer for that. However, your high-level consumers need to be shutdown while you do that (otherwise they may auto-commit while you are resetting offsets). Thanks, Joel On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote: We are using the High Level Consumer API to interact with Kafka for our normal use cases. However, on consumer restart in the case of consumer failures, we want to be able to manually reset offsets in certain situations. And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-) It looked like instantiating a SimpleConsumer just to reset offsets on restart was a viable option, while continuing to use the High Level Consumer for our normal operations. Not sure if there is a better way that is compatible across 0.8.1 and 0.8.2. -Suren On Thursday, February 19, 2015 10:25 AM, Joel Koshy jjkosh...@gmail.com wrote: Not sure what you mean by using the SimpleConsumer on failure recovery. Can you elaborate on this? On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote: Haven't used either one now. Sounds like 0.8.2.1 will help. We are using the High Level Consumer generally but are thinking to use the SimpleConsumer on failure recovery to set the offsets. Is that the recommended approach for this use case? Thanks. -Suren On Thursday, February 19, 2015 9:40 AM, Joel Koshy jjkosh...@gmail.com wrote: Are you using it from Java or Scala? i.e., are you using the javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer In 0.8.2 javaapi we explicitly set version 0 of the OffsetCommitRequest/OffsetFetchRequest which means it will commit/fetch to/from ZooKeeper only. If you use the scala API you can create an OffsetCommitRequest with version set to 1 (which will allow you to commit to Kafka). Since we are doing an 0.8.2.1 release we will make the above more consistent. i.e., you can create OffsetCommitRequests with version 1 even from the javaapi. I will be updating the documentation on this to make it clearer. Thanks, Joel On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: Joel, Looking at SimpleConsumer in the 0.8.2 code, it is using OffsetCommitRequest and sending that over to a broker. Is the broker storing that in ZK? -Suren On Tuesday, February 17, 2015 12:22 PM, Joel Koshy jjkosh...@gmail.com wrote: Hi Chris, In 0.8.2, the simple consumer Java API supports committing/fetching offsets that are stored in ZooKeeper. You don't need to issue any ConsumerMetadataRequest for this. Unfortunately, the API currently does not support fetching offsets that are stored in Kafka. Thanks, Joel On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: Hi, I am still using 0.8.1.1 because of the CPU use concerns. I'm confused about why the SimpleConsumer has: OffsetCommitResponse commitOffsets(OffsetCommitRequest request) and OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) but no way that I can see to issue a ConsumerMetadataRequest, which is what I think when restarting my consumers so that they can begin working where they last left off (in the event that they were stopped for a while then restarted some time later, and new messages had come in). The fetchOffsets() works on time, usually it looks like you send it Earliest or Latest (beginning or end of what's currently in the stream). I realize the documentation says this: *Downsides of using SimpleConsumer*The SimpleConsumer does require a significant amount of work not needed in the Consumer Groups: 1. You must keep track of the offsets in your application to know where you left off consuming. But that's not really quite true ... not as long as commitOffsets() has been provided. It seems the SimpleConsumer provides you with a solution to only one half of the problem of offset management. Using some zookeeper python scripts I wrote I can see that the commitOffsets() is doing its
Re: Consuming a snapshot from log compacted topic
The log end offset (of a partition) changes when messages are appended to the partition. (It is not correlated with the consumer's offset). On Thu, Feb 19, 2015 at 08:58:10PM +, Will Funnell wrote: So at what point does the log end offset change? When you commit? On 19 February 2015 at 18:47, Joel Koshy jjkosh...@gmail.com wrote: If I consumed up to the log end offset and log compaction happens in between, I would have missed some messages. Compaction actually only runs on the rolled over segments (not the active - i.e., latest segment). The log-end-offset will be in the latest segment which does not participate in compaction. The log end offset is just the end of the committed messages in the log (the last thing the consumer has access to). It isn't the same as the cleaner point but is always later than it so it would work just as well. Isn't this just roughly the same value as using c.getOffsetsBefore() with a partitionRequestTime of -1? Although its always later than the cleaner point, surely log compaction is still an issue here. If I consumed up to the log end offset and log compaction happens in between, I would have missed some messages. My thinking was that if you knew the log cleaner point, you could: Make a note of the starting offset Consume till end of log Check my starting point is ahead of current cleaner point, otherwise loop. I appreciate there is a chance I misunderstood your point. On 19 February 2015 at 18:02, Jay Kreps jay.kr...@gmail.com wrote: The log end offset is just the end of the committed messages in the log (the last thing the consumer has access to). It isn't the same as the cleaner point but is always later than it so it would work just as well. -Jay On Thu, Feb 19, 2015 at 8:54 AM, Will Funnell w.f.funn...@gmail.com wrote: I'm not sure if I misunderstood Jay's suggestion, but I think it is along the lines of: we expose the log-end-offset (actually the high watermark) of the partition in the fetch response. However, this is not exposed to the consumer (either in the new ConsumerRecord class or the existing MessageAndMetadata class). If we did, then if you were to consume a record you can check that it has offsets up to the log-end offset. If it does then you would know for sure that you have consumed everything for that partition To confirm then, the log-end-offset is the same as the cleaner point? On 19 February 2015 at 03:10, Jay Kreps jay.kr...@gmail.com wrote: Yeah I was thinking either along the lines Joel was suggesting or else adding a logEndOffset(TopicPartition) method or something like that. As Joel says the consumer actually has this information internally (we return it with the fetch request) but doesn't expose it. -Jay On Wed, Feb 18, 2015 at 4:51 PM, Joel Koshy jjkosh...@gmail.com wrote: 2. Make the log end offset available more easily in the consumer. Was thinking something would need to be added in LogCleanerManager, in the updateCheckpoints function. Where would be best to publish the information to make it more easily available, or would you just expose the offset-cleaner-checkpoint file as it is? Is it right you would also need to know which offset-cleaner-checkpoint entry related to each active partition? I'm not sure if I misunderstood Jay's suggestion, but I think it is along the lines of: we expose the log-end-offset (actually the high watermark) of the partition in the fetch response. However, this is not exposed to the consumer (either in the new ConsumerRecord class or the existing MessageAndMetadata class). If we did, then if you were to consume a record you can check that it has offsets up to the log-end offset. If it does then you would know for sure that you have consumed everything for that partition. Yes, was looking at this initially, but as we have 100-150 writes per second, it could be a while before there is a pause long enough to check it has caught up. Even with the consumer timeout set to -1, it takes some time to query the max offset values, which is still long enough for more messages to arrive. Got it - thanks for clarifying. On 18 February 2015 at 23:16, Joel Koshy jjkosh...@gmail.com wrote: You are also correct and perceptive to notice that if you check the end of the log then begin consuming and read up to that point compaction may have already
Re: Simple Consumer and offsets
On Thu, Feb 19, 2015 at 8:43 PM, Joel Koshy jjkosh...@gmail.com wrote: If you are using v0 of OffsetCommit/FetchRequest then you can issue that to any broker. For version 0 you will need to issue it to the coordinator. You can discover the coordinator by sending a ConsumerMetadataRequest to any broker. The protocol spec [1] still says Currently the supported version for all APIs is 0. Based on your message above that is no longer true, so could somebody familiar with the changes please update the spec appropriately? Thanks, Evan [1] https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol On Thu, Feb 19, 2015 at 07:55:16PM +, Suren wrote: Joel/All, The SimpleConsumer constructor requires a specific host and port. Can this be any broker? If it needs to be a specific broker, for 0.8.2, should this be the offset coordinator? For 0.8.1, does it matter? -Suren On Thursday, February 19, 2015 10:43 AM, Joel Koshy jjkosh...@gmail.com wrote: I see - yes, you can use the SimpleConsumer for that. However, your high-level consumers need to be shutdown while you do that (otherwise they may auto-commit while you are resetting offsets). Thanks, Joel On Thu, Feb 19, 2015 at 03:29:19PM +, Suren wrote: We are using the High Level Consumer API to interact with Kafka for our normal use cases. However, on consumer restart in the case of consumer failures, we want to be able to manually reset offsets in certain situations. And ideally we'd like to use the same api in 0.8.1 and 0.8.2. :-) It looked like instantiating a SimpleConsumer just to reset offsets on restart was a viable option, while continuing to use the High Level Consumer for our normal operations. Not sure if there is a better way that is compatible across 0.8.1 and 0.8.2. -Suren On Thursday, February 19, 2015 10:25 AM, Joel Koshy jjkosh...@gmail.com wrote: Not sure what you mean by using the SimpleConsumer on failure recovery. Can you elaborate on this? On Thu, Feb 19, 2015 at 03:04:47PM +, Suren wrote: Haven't used either one now. Sounds like 0.8.2.1 will help. We are using the High Level Consumer generally but are thinking to use the SimpleConsumer on failure recovery to set the offsets. Is that the recommended approach for this use case? Thanks. -Suren On Thursday, February 19, 2015 9:40 AM, Joel Koshy jjkosh...@gmail.com wrote: Are you using it from Java or Scala? i.e., are you using the javaapi.SimpleConsumer or kafka.consumer.SimpleConsumer In 0.8.2 javaapi we explicitly set version 0 of the OffsetCommitRequest/OffsetFetchRequest which means it will commit/fetch to/from ZooKeeper only. If you use the scala API you can create an OffsetCommitRequest with version set to 1 (which will allow you to commit to Kafka). Since we are doing an 0.8.2.1 release we will make the above more consistent. i.e., you can create OffsetCommitRequests with version 1 even from the javaapi. I will be updating the documentation on this to make it clearer. Thanks, Joel On Thu, Feb 19, 2015 at 02:28:32PM +, Suren wrote: Joel, Looking at SimpleConsumer in the 0.8.2 code, it is using OffsetCommitRequest and sending that over to a broker. Is the broker storing that in ZK? -Suren On Tuesday, February 17, 2015 12:22 PM, Joel Koshy jjkosh...@gmail.com wrote: Hi Chris, In 0.8.2, the simple consumer Java API supports committing/fetching offsets that are stored in ZooKeeper. You don't need to issue any ConsumerMetadataRequest for this. Unfortunately, the API currently does not support fetching offsets that are stored in Kafka. Thanks, Joel On Mon, Feb 16, 2015 at 05:02:08PM -0500, Christopher Piggott wrote: Hi, I am still using 0.8.1.1 because of the CPU use concerns. I'm confused about why the SimpleConsumer has: OffsetCommitResponse commitOffsets(OffsetCommitRequest request) and OffsetFetchResponse fetchOffsets(OffsetFetchRequest request) but no way that I can see to issue a ConsumerMetadataRequest, which is what I think when restarting my consumers so that they can begin working where they last left off (in the event that they were stopped for a while then restarted some time later, and new messages had come in). The fetchOffsets() works on time, usually it looks like you send it Earliest or Latest (beginning or end of what's currently in the stream). I realize the documentation says this: *Downsides of using SimpleConsumer*The SimpleConsumer does require a significant amount of work not needed in the Consumer Groups:
Re: data corruption like behavior
Is there any error in the producer log? Is there any pattern in the messages being lost? Thanks, Jun On Thu, Feb 19, 2015 at 4:20 PM, Karts kartad...@gmail.com wrote: yes i did. On Thu, Feb 19, 2015 at 2:42 PM, Jun Rao j...@confluent.io wrote: Did you consume the messages from the beginning of the log? Thanks, Jun On Thu, Feb 19, 2015 at 12:18 PM, Karts kartad...@gmail.com wrote: but they have always been up. I mean when i was testing, all the zookeepers were up. and all the kafka nodes were up. its just that I changed the number of zookeeper nodes in my first test iteration. second and third were still the same. not sure why the topics were losing some messages. On Thu, Feb 19, 2015 at 11:39 AM, Jun Rao j...@confluent.io wrote: Zookeeper requires a majority of the nodes to be up for the service to be available. Kafka relies on Zookeeper to be always available. Thanks, Jun On Thu, Feb 19, 2015 at 11:15 AM, Karts kartad...@gmail.com wrote: I have noticed some strange patterns when testing with the 0.8.1 build and the 0.8.2 builds, and are listed below. 1. So I setup a brand new cluster [3 kafka nodes with 3 zookeepers], created 2 topics via the API calls, everything went fine and was successfully able to view my messages in my consumers. There were no messages lost. All is happy. Now, I change my setup to just have 1 zookeeper. and do my test again, i lose some messages. I have checked that all my configs are pointing to just 1 zookeeper and there was no mention of the other 2 offline zookeepers. any idea why ? 2. I revert back my settings to the original config, all 3 nodes are online, no errors, send messages to same old topic, and i am still loosing some messages. I deleted all the old topic files [to follow the 'cleanup' process], create a new topic, and i am successfully able to receive all messages. no loss whatsoever. 3. Now in this state, i upgrade to 0.8.2, and try sending messages to the topic that was made after the above cleanup, and i am losing messages again. Am i making sense? I mean this is a very strange behavior, and if anyone can comment on this [please correct me if i have done something 'very' wrong].. Thanks..