Re: Reading messages offset in Apache Kafka

2014-08-04 Thread anand jain
Thanks Guozhang!!

Below is the code for iterating over log messages:
.
.
for (final KafkaStream stream : streams) {
ConsumerIteratorbyte[], byte[] consumerIte =
stream.iterator();
*while (consumerIte.hasNext()){*
System.out.println(Message from Topic :: + new
String(consumerIte.next().message()));
}
   }

.

As far as I understand, the statement *while (consumerIte.hasNext())* runs
in an infinite loop and returns true whenever a message is published.

How should I fit your piece of code(solution as suggested by you) here?

Regards
Anand

On Fri, Aug 1, 2014 at 8:46 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hi Anand,

 You can use the high-level consumer and turn of auto.offset.commit, and do
 sth. like:

 message = consumer.iter.next();
 bool acked = false
 while (!acked) {
 process(message)
 acked = writeToDB();
 }
 consumer.commit()


 Guozhang


 On Fri, Aug 1, 2014 at 3:30 AM, anand jain anandjain1...@gmail.com
 wrote:

  I am very much new to Kafka and we are using Kafka 0.8.1.
 
  What I need to do is to consume a message from topic. For that, I will
 have
  to write one consumer in Java which will consume a message from topic and
  then save that message to database. After a message is saved, some
  acknowledgement will be sent to Java consumer. If acknowledgement is
 true,
  then next message should be consumed from the topic. If acknowldgement is
  false(which means due to some error message,read from the topic, couldn't
  be saved into the database), then again that message should be read.
 
  I think I need to use Simple Consumer,to have control over message offset
  and have gone through the Simple Consumer example as given in this link
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
  .
 
  In this example, offset is evaluated in run method as 'readOffset'. Do I
  need to play with that? For e.g. I can use LatestTime() instead of
  EarliestTime() and in case of false, I will reset the offset to the one
  before using offset - 1.
 
  Is this how I should proceed? Or can the same be done using High Level
 API?
 



 --
 -- Guozhang



Re: offset commit api

2014-08-04 Thread Weide Zhang
Hi

It seems to me that 0.8.1.1 doesn't have the ConsumerMetadata API. So what
broker I should choose in order to commit and fetch offset information ?
Shall I use zookeeper for offset to manage it manually instead ?

Thanks,

Weide



On Sun, Aug 3, 2014 at 4:34 PM, Weide Zhang weo...@gmail.com wrote:

 Hi,

 I'm reading the offset management on the API link.

 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI

 I have a couple of questions regarding using the offset fetch and commit
 API in 0.8.1.1 ?

 1. Is the new offset commit and fetch api usable in use in 0.8.1.1 ?  Does
 0.8.1.1 already support offset coordinator ?

 2. what's the difference between old offsetrequest and new
 offsetfetchrequest ? It seems to me that the new api support per consumer
 group offset management fetch while old api doesn't. Also, what's the
 purpose of using a timestamp parameter in the fetch request ?

 3. In 0.8.1.1, the OffsetCommitRequest uses OffsetMetadataAndError, could
 you tell me what's the purpose of the error parameter and metadata
 parameter in the request ?

 4. Can I assume the offset management is purely independent of message
 consumption ? In other words, if I use a simple consumer to fetch message
 with random client id, can I still manually set some consumer group along
 with offset in the offset commit message ? Is that allowed ?

 Thanks,

 Weide




Re: Uniform Distribution of Messages for Topic Across Partitions Without Effecting Performance

2014-08-04 Thread Joe Stein
Bhavesh, take a look at
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyisdatanotevenlydistributedamongpartitionswhenapartitioningkeyisnotspecified
?

Maybe the root cause issue is something else? Even if producers produce
more or less than what they are producing you should be able to make it
random enough with a partitioner and a key.  I don't think you should need
more than what is in the FAQ but incase so maybe look into
http://en.wikipedia.org/wiki/MurmurHash as another hash option.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/


On Mon, Aug 4, 2014 at 9:12 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com
wrote:

 How to achieve uniform distribution of non-keyed messages per topic across
 all partitions?

 We have tried to do this uniform distribution across partition using custom
 partitioning from each producer instance using round robing (
 count(messages) % number of partition for topic). This strategy results in
 very poor performance.  So we have switched back to random stickiness that
 Kafka provide out of box per some interval ( 10 minutes not sure exactly )
 per topic.

 The above strategy results in consumer side lags sometime for some
 partitions because we have some applications/producers  producing more
 messages for same topic than other servers.

 Can Kafka provide out of box uniform distribution by using coordination
 among all producers and rely on measure rate such as  # messages per minute
 or # of bytes produce per minute to achieve uniform distribution and
 coordinate stickiness of partition among hundreds of producers for same
 topic ?

 Thanks,

 Bhavesh



Re: [New Feature Request] Ability to Inject Queue Implementation Async Mode

2014-08-04 Thread Joe Stein
Is it possible there is another solution to the problem? I think if you
could better describe the problem(s) you are facing and how you are
architected some then you may get responses from others that perhaps have
faced the same problem with similar architectures ... or maybe folks can
chime in with solution(s) to the problem(s).  When only being presented
with solutions it is hard to say much about if it is problem that folks
will have and if this solution will work for them.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/


On Mon, Aug 4, 2014 at 8:52 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com
wrote:

 Kafka Version:  0.8.x

 1) Ability to define which messages get drooped (least recently instead of
 most recent in queue)
 2) Try Unbounded Queue to find out the Upper Limit without drooping any
 messages for application (use case Stress test)
 3) Priority Blocking Queue ( meaning a single Producer can send messages to
 multiple topic and I would like to give Delivery Priority to message for
 particular message for topic)

 We have use case to support #3 and #1 since we would like to deliver the
 Application Heartbeat first then any other event within the queue for any
 topics. To lower TCP connections, we only use one producer for 4 topics but
 one of the topics has priority for delivery.

 Please let me know if this is useful feature to have or not.

 Thanks in advance for great support !!

 Thanks,

 Bhavesh

 P.S.  Sorry for asking this question again, but last time there was no
 conclusion.



Conflict stored data in Zookeeper

2014-08-04 Thread Bongyeon Kim
Hi, everyone.

I'm using 0.8.1.1, and I have 8 brokers and 3 topics each have 16
partitions and 3 replicas.

I got unseen logs like below. this is occur every 5 seconds.


[2014-08-05 11:11:32,478] INFO conflict in /brokers/ids/2 data:
{jmx_port:9992,timestamp:1407204339990,host:172.25.63.9,version:1,port:9092}
stored data:
{jmx_port:9992,timestamp:1407204133312,host:172.25.63.9,version:1,port:9092}
(kafka.utils.ZkUtils$)
[2014-08-05 11:11:32,479] INFO I wrote this conflicted ephemeral node
[{jmx_port:9992,timestamp:1407204339990,host:172.25.63.9,version:1,port:9092}]
at /brokers/ids/2 a while back in a different session, hence I will backoff
for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)


I hope to know the what makes this messages.
Is it OK that's not ERROR? How can I remove that message?


Thanks in adavnce.

-- 
*Sincerely*
*,**Bongyeon Kim*

Java Developer  Engineer
Seoul, Korea
Mobile:  +82-10-9369-1314
Email:  bongyeon...@gmail.com
Twitter:  http://twitter.com/tigerby
Facebook:  http://facebook.com/tigerby
Wiki: http://tigerby.com


high level consumer api blocked forever

2014-08-04 Thread zhao weinan
Hi, every one.

I got into a strange case that my consumer using high level api worked fine
at first, but couple days later blocked in ConsumerIterator.hasNext(),
while there are pending messages on the topic: with
kafka-console-consumer.sh I can see continuous messages.

Then i connect to consumer process's jdwp port with eclipse, suspend the
consumer thread and ConsumerFetcherThread, found ConsumerIterator blocked
on channel.take, and ConsumerFetcherThread blocked on
PartitionTopicInfo.chunkQueue.put, but channel and chunkQueue are different
object... So ConsumerFetcherThread trying to put a full LinkedBlockingQueue
while ConsumerIterator trying to take a empty LinkedBlockingQueue.  Even
more stranger thing is, with one topic has three partitions, the 3
PartitionTopicInfo.chunkQueue and ConsumerIterator.channel are 4 different
objects.

And this case is pretty frequency, has anyone encountered this, or it's
this a known issue? Any information will be helpful.

Thanks a lot!




-- 
Zhao Weinan


Consumer is never shutdown

2014-08-04 Thread anand jain
Hi,

I just started with Apache Kafka and wrote a high level consumer program
following the example given here
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example.

Though, I was able to run the program and consume the messages, I have one
doubt regarding *consumer.shutdown()*. It has never been called. I used the
below piece of code to verify
 if (consumer != null) {
System.out.println(shutting down consumer);
consumer.shutdown();
}

Has someone encountered this before? Also, even if consumer didn't
shutdown, I didn't notice any bottleneck. Is it really needed?

Regards
Anand