Hi,

I am using the high-level consumer on 0.8.2-beta. I'm attempting to close a
ConsumerConnector (actually a handful of connectors), but am not having
much luck actually getting it to close cleanly. When I call shutdown on the
connector, I see an error in my application's log (these are always
IOExceptions in kafka.network.Processor - either Broken Pipe in
FileDispatcherImpl.write0 or else Connection reset by peer in
FileDispatcherImpl.read0), but the shutdown call itself does not return
until the socket.timeout.ms has expired (I've tested this by setting this
to all sorts of different values and confirmed that shutdown() always
returns after this timeout, but never before).

I don't know if it matters, but my code that works with the connector is
running on a separate thread via an ExecutorService (essentially I'm
consuming with one thread per group/topic combination (yes, one thread for
all partitions within the topic)).

FWIW, everything else seems to work fine - I can connect, set up the
KafkaStream, pull down messages etc. It's just the shutting down that
doesn't seem to be working. The reason I need this to work cleanly is that
my use case requires me to shut down specific connectors and re-create them
later, potentially numerous times during the running of my application. I
could potentially redesign things to keep each connector around after it is
no longer needed, cache it and re-use it later, but this still doesn't
solve the problem of how I eventually shut everything down cleanly.

Thanks,
Shannon

Reply via email to