On May 9, 2012, at 2:03pm, Jun Rao wrote:
> Are you starting an embedded broker in your unit test?
That's what I'm trying to do, yes.
> If so, you need to
> call shutdown() on the broker.
I think I do - here's the code from the run() method of my KafakRunnable that I
use to run the Kafka broker in my unit test:
public void run() {
LOGGER.info("Starting KafkaRunnable...");
KafkaServerStartable kafkaServerStartable = new
KafkaServerStartable(_serverConfig);
kafkaServerStartable.startup();
_alive = true;
while (!Thread.interrupted()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
LOGGER.info("Stopping KafkaRunnable");
kafkaServerStartable.shutdown();
_alive = false;
LOGGER.info("Exiting KafkaRunnable");
}
-- Ken
> On Wed, May 9, 2012 at 12:01 PM, Ken Krugler
> <[email protected]>wrote:
>
>> Hi Jun,
>>
>> On May 8, 2012, at 5:40pm, Jun Rao wrote:
>>
>>> The problem is probably that you didn't shut down the broker cleanly (use
>>> kill -15 instead of kill -9).
>>
>> Thanks - though this is for unit tests. So it needs to be something I can
>> do via standard Java code.
>>
>> Is that possible, or does ZK require the Heavy Hammer to get it to
>> terminate?
>>
>> Thanks,
>>
>> -- Ken
>>
>>> On Tue, May 8, 2012 at 5:34 PM, Ken Krugler <[email protected]
>>> wrote:
>>>
>>>>
>>>> On May 7, 2012, at 11:10am, Jun Rao wrote:
>>>>
>>>>> Ken,
>>>>>
>>>>> Yes, you need to call ConsumerConnector#shutdown to cleanly shutdown
>> the
>>>>> consumer.
>>>>
>>>> Thanks for the confirmation.
>>>>
>>>>> Clearing ZK data and kafka log should be enough if you want to
>>>>> start from clean. The ZK NoNode exceptions that you saw can happen when
>>>>> some of the ZK paths are created for the very first time. They should
>>>> only
>>>>> show up once though.
>>>>
>>>> But if I delete ZK data at the start of my unit test (to avoid getting
>>>> "broker already registered" errors), then the ZK paths are gone, right?
>>>>
>>>> So these exception would show up every time my test runs, in that case.
>>>>
>>>> Is there a way to avoid the "broker already registered" error and these
>>>> exceptions?
>>>>
>>>> Thanks,
>>>>
>>>> -- Ken
>>>>
>>>>
>>>>> On Sun, May 6, 2012 at 9:53 AM, Ken Krugler <
>> [email protected]
>>>>> wrote:
>>>>>
>>>>>> I may have answered my own question…
>>>>>>
>>>>>> Looks like if I call ConsumerConnector#shutdown before interrupting my
>>>>>> consumer Runnable, it works because then
>>>>>> KafkaMessageStream#iterator#hasNext will return false, rather than
>>>> blocking.
>>>>>>
>>>>>> I'm still interested in any examples for the right way to set up/tear
>>>> down
>>>>>> a very temporary Kafka setup for testing.
>>>>>>
>>>>>> For example, I clear out the ZooKeeper data & log dirs before starting
>>>> it
>>>>>> up, in an attempt to avoid occasional errors with "broker already
>>>>>> registered".
>>>>>>
>>>>>> But that in turn seems to trigger Kafka logging about not finding ZK
>>>> nodes
>>>>>> for sessions:
>>>>>>
>>>>>> 12/05/06 09:35:51 INFO server.PrepRequestProcessor: Got user-level
>>>>>> KeeperException when processing sessionid:0x1372301d2120001
>> type:create
>>>>>> cxid:0x1 zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a Error
>>>>>> Path:/consumers/bixo-storm/ids Error:KeeperErrorCode = NoNode for
>>>>>> /consumers/bixo-storm/ids
>>>>>>
>>>>>> So I assume there's also Kafka state I should be clearing out before
>>>> each
>>>>>> run, right?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> -- Ken
>>>>>>
>>>>>> On May 6, 2012, at 8:21am, Ken Krugler wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I'm trying to run Kakfa in a minimal local test environment, but
>> having
>>>>>> issues gracefully shutting down.
>>>>>>>
>>>>>>> I can start up ZooKeeper/Kafka, and it's running fine.
>>>>>>>
>>>>>>> But when I try to shut it all down, I'm having trouble cleanly
>>>>>> terminating the consumers.
>>>>>>>
>>>>>>> I think the issue is that they're blocking on
>>>>>> ConsumerIterator.makeNext(), which doesn't seem to be paying attention
>>>> to
>>>>>> being interrupted.
>>>>>>>
>>>>>>> So then I proceed with cleaning up everything else, and shutting down
>>>>>> the Kafka broker.
>>>>>>>
>>>>>>> Which in turn triggers a kafka.consumer.ConsumerTimeoutException from
>>>>>> the pending hasNext() call in my consumer Runnable.
>>>>>>>
>>>>>>> What's the clean way to set up/tear down a ZooKeeper/Kafka setup
>> that's
>>>>>> being used indirectly by the test of another project?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> -- Ken
>>>>>>>
>>>>>>> --------------------------
>>>>>>> Ken Krugler
>>>>>>> http://www.scaleunlimited.com
>>>>>>> custom big data solutions & training
>>>>>>> Hadoop, Cascading, Mahout & Solr
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --------------------------
>>>>>> Ken Krugler
>>>>>> +1 530-210-6378
>>>>>> http://www.scaleunlimited.com
>>>>>> custom big data solutions & training
>>>>>> Hadoop, Cascading, Mahout & Solr
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --------------------------
>>>>>> Ken Krugler
>>>>>> http://www.scaleunlimited.com
>>>>>> custom big data solutions & training
>>>>>> Hadoop, Cascading, Mahout & Solr
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>> --------------------------
>>>> Ken Krugler
>>>> http://www.scaleunlimited.com
>>>> custom big data solutions & training
>>>> Hadoop, Cascading, Mahout & Solr
>>>>
>>>>
>>>>
>>>>
>>>>
>>
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com
>> custom big data solutions & training
>> Hadoop, Cascading, Mahout & Solr
>>
>>
>>
>>
>>
--------------------------------------------
http://about.me/kkrugler
+1 530-210-6378
--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Mahout & Solr