Re: Reading messages offset in Apache Kafka
Thanks Guozhang!! Below is the code for iterating over log messages: . . for (final KafkaStream stream : streams) { ConsumerIteratorbyte[], byte[] consumerIte = stream.iterator(); *while (consumerIte.hasNext()){* System.out.println(Message from Topic :: + new String(consumerIte.next().message())); } } . As far as I understand, the statement *while (consumerIte.hasNext())* runs in an infinite loop and returns true whenever a message is published. How should I fit your piece of code(solution as suggested by you) here? Regards Anand On Fri, Aug 1, 2014 at 8:46 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Anand, You can use the high-level consumer and turn of auto.offset.commit, and do sth. like: message = consumer.iter.next(); bool acked = false while (!acked) { process(message) acked = writeToDB(); } consumer.commit() Guozhang On Fri, Aug 1, 2014 at 3:30 AM, anand jain anandjain1...@gmail.com wrote: I am very much new to Kafka and we are using Kafka 0.8.1. What I need to do is to consume a message from topic. For that, I will have to write one consumer in Java which will consume a message from topic and then save that message to database. After a message is saved, some acknowledgement will be sent to Java consumer. If acknowledgement is true, then next message should be consumed from the topic. If acknowldgement is false(which means due to some error message,read from the topic, couldn't be saved into the database), then again that message should be read. I think I need to use Simple Consumer,to have control over message offset and have gone through the Simple Consumer example as given in this link https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example . In this example, offset is evaluated in run method as 'readOffset'. Do I need to play with that? For e.g. I can use LatestTime() instead of EarliestTime() and in case of false, I will reset the offset to the one before using offset - 1. Is this how I should proceed? Or can the same be done using High Level API? -- -- Guozhang
Re: offset commit api
Hi It seems to me that 0.8.1.1 doesn't have the ConsumerMetadata API. So what broker I should choose in order to commit and fetch offset information ? Shall I use zookeeper for offset to manage it manually instead ? Thanks, Weide On Sun, Aug 3, 2014 at 4:34 PM, Weide Zhang weo...@gmail.com wrote: Hi, I'm reading the offset management on the API link. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI I have a couple of questions regarding using the offset fetch and commit API in 0.8.1.1 ? 1. Is the new offset commit and fetch api usable in use in 0.8.1.1 ? Does 0.8.1.1 already support offset coordinator ? 2. what's the difference between old offsetrequest and new offsetfetchrequest ? It seems to me that the new api support per consumer group offset management fetch while old api doesn't. Also, what's the purpose of using a timestamp parameter in the fetch request ? 3. In 0.8.1.1, the OffsetCommitRequest uses OffsetMetadataAndError, could you tell me what's the purpose of the error parameter and metadata parameter in the request ? 4. Can I assume the offset management is purely independent of message consumption ? In other words, if I use a simple consumer to fetch message with random client id, can I still manually set some consumer group along with offset in the offset commit message ? Is that allowed ? Thanks, Weide
Re: Uniform Distribution of Messages for Topic Across Partitions Without Effecting Performance
Bhavesh, take a look at https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyisdatanotevenlydistributedamongpartitionswhenapartitioningkeyisnotspecified ? Maybe the root cause issue is something else? Even if producers produce more or less than what they are producing you should be able to make it random enough with a partitioner and a key. I don't think you should need more than what is in the FAQ but incase so maybe look into http://en.wikipedia.org/wiki/MurmurHash as another hash option. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Mon, Aug 4, 2014 at 9:12 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: How to achieve uniform distribution of non-keyed messages per topic across all partitions? We have tried to do this uniform distribution across partition using custom partitioning from each producer instance using round robing ( count(messages) % number of partition for topic). This strategy results in very poor performance. So we have switched back to random stickiness that Kafka provide out of box per some interval ( 10 minutes not sure exactly ) per topic. The above strategy results in consumer side lags sometime for some partitions because we have some applications/producers producing more messages for same topic than other servers. Can Kafka provide out of box uniform distribution by using coordination among all producers and rely on measure rate such as # messages per minute or # of bytes produce per minute to achieve uniform distribution and coordinate stickiness of partition among hundreds of producers for same topic ? Thanks, Bhavesh
Re: [New Feature Request] Ability to Inject Queue Implementation Async Mode
Is it possible there is another solution to the problem? I think if you could better describe the problem(s) you are facing and how you are architected some then you may get responses from others that perhaps have faced the same problem with similar architectures ... or maybe folks can chime in with solution(s) to the problem(s). When only being presented with solutions it is hard to say much about if it is problem that folks will have and if this solution will work for them. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Mon, Aug 4, 2014 at 8:52 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Kafka Version: 0.8.x 1) Ability to define which messages get drooped (least recently instead of most recent in queue) 2) Try Unbounded Queue to find out the Upper Limit without drooping any messages for application (use case Stress test) 3) Priority Blocking Queue ( meaning a single Producer can send messages to multiple topic and I would like to give Delivery Priority to message for particular message for topic) We have use case to support #3 and #1 since we would like to deliver the Application Heartbeat first then any other event within the queue for any topics. To lower TCP connections, we only use one producer for 4 topics but one of the topics has priority for delivery. Please let me know if this is useful feature to have or not. Thanks in advance for great support !! Thanks, Bhavesh P.S. Sorry for asking this question again, but last time there was no conclusion.
Conflict stored data in Zookeeper
Hi, everyone. I'm using 0.8.1.1, and I have 8 brokers and 3 topics each have 16 partitions and 3 replicas. I got unseen logs like below. this is occur every 5 seconds. [2014-08-05 11:11:32,478] INFO conflict in /brokers/ids/2 data: {jmx_port:9992,timestamp:1407204339990,host:172.25.63.9,version:1,port:9092} stored data: {jmx_port:9992,timestamp:1407204133312,host:172.25.63.9,version:1,port:9092} (kafka.utils.ZkUtils$) [2014-08-05 11:11:32,479] INFO I wrote this conflicted ephemeral node [{jmx_port:9992,timestamp:1407204339990,host:172.25.63.9,version:1,port:9092}] at /brokers/ids/2 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$) I hope to know the what makes this messages. Is it OK that's not ERROR? How can I remove that message? Thanks in adavnce. -- *Sincerely* *,**Bongyeon Kim* Java Developer Engineer Seoul, Korea Mobile: +82-10-9369-1314 Email: bongyeon...@gmail.com Twitter: http://twitter.com/tigerby Facebook: http://facebook.com/tigerby Wiki: http://tigerby.com
high level consumer api blocked forever
Hi, every one. I got into a strange case that my consumer using high level api worked fine at first, but couple days later blocked in ConsumerIterator.hasNext(), while there are pending messages on the topic: with kafka-console-consumer.sh I can see continuous messages. Then i connect to consumer process's jdwp port with eclipse, suspend the consumer thread and ConsumerFetcherThread, found ConsumerIterator blocked on channel.take, and ConsumerFetcherThread blocked on PartitionTopicInfo.chunkQueue.put, but channel and chunkQueue are different object... So ConsumerFetcherThread trying to put a full LinkedBlockingQueue while ConsumerIterator trying to take a empty LinkedBlockingQueue. Even more stranger thing is, with one topic has three partitions, the 3 PartitionTopicInfo.chunkQueue and ConsumerIterator.channel are 4 different objects. And this case is pretty frequency, has anyone encountered this, or it's this a known issue? Any information will be helpful. Thanks a lot! -- Zhao Weinan
Consumer is never shutdown
Hi, I just started with Apache Kafka and wrote a high level consumer program following the example given here https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example. Though, I was able to run the program and consume the messages, I have one doubt regarding *consumer.shutdown()*. It has never been called. I used the below piece of code to verify if (consumer != null) { System.out.println(shutting down consumer); consumer.shutdown(); } Has someone encountered this before? Also, even if consumer didn't shutdown, I didn't notice any bottleneck. Is it really needed? Regards Anand