Filed: https://issues.apache.org/jira/browse/KAFKA-602
On Tue, Nov 6, 2012 at 4:06 PM, Joel Koshy <[email protected]> wrote: > That would be a bug. Can you file a jira? > > Thanks, > > Joel > > > On Tue, Nov 6, 2012 at 1:43 PM, Jason Rosenberg <[email protected]> wrote: > > > Ok, > > > > So one variation on this which does not appear to be working correctly on > > trunk, is if I start up a consumer connector, but then never send any > > messages to it, and then shut it down, it's consumer threads never get > > notified of the shut down. > > > > It seems there's a dependency on initially receiving at least one message > > and processing it, before the topicThreadIdAndQueues object gets > > initialized. > > > > Shall I file this one? > > > > Jason > > > > > > > > On Mon, Nov 5, 2012 at 8:01 PM, Jason Rosenberg <[email protected]> > wrote: > > > > > Interestingly, I just checked out the latest sources, and did the > build, > > > and it produced jars for 0.7.0! What's that about? > > > > > > Anyway, it does indeed look like this issue is indeed fixed with the > > > latest trunk. > > > > > > Will there be a released version of this, prior to 0.8? Or will there > be > > > a beta for 0.8 upcoming? > > > > > > Thanks, > > > > > > Jason > > > > > > > > > On Mon, Nov 5, 2012 at 7:35 PM, Jason Rosenberg <[email protected]> > > wrote: > > > > > >> Hi Joel, > > >> > > >> I'd be happy to try it, but am a bit concerned about porting any other > > >> 0.8 api changes to get everything working (I'd rather not expend the > > effort > > >> unless there's a stable version I can port to). Or should I just be > > able > > >> to drop latest trunk (or 0.8.x) code in place without any changes? > > >> > > >> Also, is there a ready bundled download beyond 0.7.2, or do I need > > >> download sources and build everything locally? > > >> > > >> Jason > > >> > > >> > > >> On Mon, Nov 5, 2012 at 7:10 PM, Joel Koshy <[email protected]> > wrote: > > >> > > >>> Can you try the latest from trunk? This might be related to > > >>> https://issues.apache.org/jira/browse/KAFKA-550 which did not make > it > > >>> into > > >>> 0.7.2 > > >>> > > >>> Thanks, > > >>> > > >>> Joel > > >>> > > >>> > > >>> On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <[email protected]> > > >>> wrote: > > >>> > > >>> > (I suspect this a bug, but thought I'd check with the group before > > >>> filing) > > >>> > > > >>> > If I create consumer streams using a topic filter, and request more > > >>> threads > > >>> > than are actually allocated (based on the current dynamic topic > event > > >>> > watcher, etc.), the unused threads don't get the shutdown message, > > >>> when the > > >>> > connector is shut down. > > >>> > > > >>> > Specifically, if I have code that looks like: > > >>> > > > >>> > Whitelist topicRegex = new Whitelist("^metrics\\..*$"); > > >>> > List<KafkaStream<Message>> streams = > > >>> > consumerConnector.createMessageStreamsByFilter(topicRegex, > > >>> > consumerThreads); > > >>> > > > >>> > ExecutorService executor = > > >>> > Executors.newFixedThreadPool(consumerThreads); > > >>> > > > >>> > for (final KafkaStream<Message> stream : streams) { > > >>> > executor.submit(new Runnable() { > > >>> > @Override public void run() { > > >>> > for (MessageAndMetadata<Message> msgAndMetadata : > > >>> stream) { > > >>> > // do some processing > > >>> > } > > >>> > } > > >>> > }; > > >>> > } > > >>> > > > >>> > And the number of consumerThreads is say, 20, and only 1 stream > ends > > up > > >>> > receiving messages, then only that 1 stream's thread gets the > > >>> > ZookeeperConsumerConnector.shutdownCommand, which is how the > > >>> KafkaStream > > >>> > iterator gets notified to exit. > > >>> > > > >>> > Looking at ZookeeperConsumerConnector.scala, it looks like the > > >>> > 'topicThreadIdAndQueues' list does not contain entries for all the > > >>> > threadId's, depending on the current state of rebalancing, and > thus, > > >>> the > > >>> > method, sendShutdownToAllQueues() doesn't actually do what it's > > >>> intended to > > >>> > do. > > >>> > > > >>> > The result, is that it's not possible to cleanly shutdown a > consumer. > > >>> > > > >>> > I am using 0.7.2. > > >>> > > > >>> > Jason > > >>> > > > >>> > > >> > > >> > > > > > >
