Thanks Guozhang!!
Below is the code for iterating over log messages:
.
.
for (final KafkaStream stream : streams) {
ConsumerIteratorbyte[], byte[] consumerIte =
stream.iterator();
*while (consumerIte.hasNext()){*
System.out.println(Message from Topic :: + new
String(consumerIte.next().message()));
}
}
.
As far as I understand, the statement *while (consumerIte.hasNext())* runs
in an infinite loop and returns true whenever a message is published.
How should I fit your piece of code(solution as suggested by you) here?
Regards
Anand
On Fri, Aug 1, 2014 at 8:46 PM, Guozhang Wang wangg...@gmail.com wrote:
Hi Anand,
You can use the high-level consumer and turn of auto.offset.commit, and do
sth. like:
message = consumer.iter.next();
bool acked = false
while (!acked) {
process(message)
acked = writeToDB();
}
consumer.commit()
Guozhang
On Fri, Aug 1, 2014 at 3:30 AM, anand jain anandjain1...@gmail.com
wrote:
I am very much new to Kafka and we are using Kafka 0.8.1.
What I need to do is to consume a message from topic. For that, I will
have
to write one consumer in Java which will consume a message from topic and
then save that message to database. After a message is saved, some
acknowledgement will be sent to Java consumer. If acknowledgement is
true,
then next message should be consumed from the topic. If acknowldgement is
false(which means due to some error message,read from the topic, couldn't
be saved into the database), then again that message should be read.
I think I need to use Simple Consumer,to have control over message offset
and have gone through the Simple Consumer example as given in this link
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
.
In this example, offset is evaluated in run method as 'readOffset'. Do I
need to play with that? For e.g. I can use LatestTime() instead of
EarliestTime() and in case of false, I will reset the offset to the one
before using offset - 1.
Is this how I should proceed? Or can the same be done using High Level
API?
--
-- Guozhang