You can set the "consumer.timeout.ms" to have a ConsumerTimeoutException thrown
if the broker doesn't respond within that time period:
var done = False
val consumerIterator = initConsumer()
while(true) {
try {
val messageAndMetadata = consumerIterator.next() // Blocks until a
new message is available or timeout is reached
val message = messageAndMetadata.message
val offset = messageAndMetadata.offset
System.out.println(new String(message, "UTF-8")) // For debugging
purposes
parseMessage( message, offset)
}
catch {
case t: ConsumerTimeoutException => return
case e => {
println("unexpected exception: ")
e.printStackTrace()
}
}
} // End while loop
HTH,
Florin
On Jul 25, 2013, at 10:56 AM, Rob Withers <[email protected]> wrote:
> Oh boy, is my mind slow today. The tamasic cells woke up but the rajasic
> ones stayed asleep, which is rather ironic, if you know what I mean. My
> only hope is the sattvasic few.
>
> The issue of threading is secondary to the blocking api. How can I know the
> traffic is drained from a topic/partition, cleanup, and return the response
> to the REST call?
>
> thanks,
> rob
>
> On Jul 25, 2013, at 11:49 AM, Rob Withers <[email protected]> wrote:
>
>> Thanks, Joe, I also see the answer to my other question, that the
>> KafkaStream is not on a different thread, but I automatically expect it to
>> be since all other uses we have had of the KafkaStream are stuffed in a
>> Runnable. duh.
>>
>> thanks,
>> rob
>>
>> On Jul 25, 2013, at 11:41 AM, Joe Stein <[email protected]> wrote:
>>
>>> in 0.8 you can set the property "auto.offset.reset" = "smallest" when
>>> creating your ConsumerConfig ... this will override the default value of
>>> "largest"
>>>
>>> take a look at ConsoleConsumer.scala for more example if need be
>>>
>>>
>>> /*******************************************
>>>
>>> Joe Stein
>>> Founder, Principal Consultant
>>> Big Data Open Source Security LLC
>>> http://www.stealth.ly <http://www.stealth.ly>
>>> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>>>
>>> ********************************************/
>>>
>>>
>>> On Thu, Jul 25, 2013 at 1:24 PM, James A. Robinson <
>>> [email protected]> wrote:
>>>
>>>> On Thu, Jul 25, 2013 at 9:11 AM, Withers, Robert
>>>> <[email protected]> wrote:
>>>>> We are creating a consumer with properties and I did not see a
>>>>> property that screamed that it was to start at the beginning of a
>>>>> topic. Is there such a property?
>>>>
>>>> In v0.7, set 'autooffset.reset' to 'smallest'.
>>>>
>>>> Jim
>>>>
>>>> - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
>>>> James A. Robinson [email protected]
>>>> HighWire | Stanford University http://highwire.stanford.edu/
>>>> +1 650 7237294 (Work) +1 650 7259335 (Fax)
>>>>
>>
>