The log suggests that the shutdown method were still called

Thread 0: 2015-04-29
12:55:54.292|3|13|Normal|-74.189262999999997|41.339009999999753

Last Shutdown via example.shutDown called!

15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
ZKConsumerConnector shutting down

Please ensure no consumer.shutdown(); and  executor.shutdown(); are called
during the course of your program

On Thu, Apr 30, 2015 at 2:23 AM, christopher palm <cpa...@gmail.com> wrote:

> Commenting out Example shutdown did not seem to make a difference, I added
> the print statement below to highlight the fact.
>
> The other threads still shut down, and only one thread lives on, eventually
> that dies after a few minutes as well
>
> Could this be that the producer default partitioner is isn't balancing data
> across all partitions?
>
> Thanks,
> Chris
>
> Thread 0: 2015-04-29
> 12:55:54.292|3|13|Normal|-74.189262999999997|41.339009999999753
>
> Last Shutdown via example.shutDown called!
>
> 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
> ZKConsumerConnector shutting down
>
> 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
> scheduler
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1430330968420] Stopping leader finder thread
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> -leader-finder-thread], Shutting down
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> -leader-finder-thread], Stopped
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> -leader-finder-thread], Shutdown completed
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1430330968420] Stopping all fetchers
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> [ConsumerFetcherThread-consumergroup], Shutting down
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> [ConsumerFetcherThread-], Stopped
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> [ConsumerFetcherThread-], Shutdown completed
>
> 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-] All connections stopped
>
> 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
> thread.
>
> Shutting down Thread: 2
>
> Shutting down Thread: 1
>
> Shutting down Thread: 3
>
> 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
> [consumergroup], ZKConsumerConnector shut down completed
>
> Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
> distance|-73.990215000000035|40.663669999999911
>
> 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
> [consumergroup], stopping watcher executor thread for consumer
> consumergroup
>
> Thread 0: 2015-04-29
> 12:55:56.313|1|11|Normal|-79.741653000000042|42.13045800000009
>
> On Wed, Apr 29, 2015 at 10:11 AM, tao xiao <xiaotao...@gmail.com> wrote:
>
> > example.shutdown(); in ConsumerGroupExample closes all consumer
> connections
> > to Kafka. remove this line the consumer threads will run forever
> >
> > On Wed, Apr 29, 2015 at 9:42 PM, christopher palm <cpa...@gmail.com>
> > wrote:
> >
> > > Hi All,
> > >
> > > I am trying to get a multi threaded HL consumer working against a 2
> > broker
> > > Kafka cluster with a 4 partition 2 replica  topic.
> > >
> > > The consumer code is set to run with 4 threads, one for each partition.
> > >
> > > The producer code uses the default partitioner and loops indefinitely
> > > feeding events into the topic.(I excluded the while loop in the paste
> > > below)
> > >
> > > What I see is the threads eventually all exit, even thought the
> producer
> > is
> > > still sending events into the topic.
> > >
> > > My understanding is that the consumer thread per partition is the
> correct
> > > setup.
> > >
> > > Any ideas why this code doesn't continue to consume events at they are
> > > pushed to the topic?
> > >
> > > I suspect I am configuring something wrong here, but am not sure what.
> > >
> > > Thanks,
> > >
> > > Chris
> > >
> > >
> > > *T**opic Configuration*
> > >
> > > Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:
> > >
> > > Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr:
> 1,2
> > >
> > > Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr:
> 1,2
> > >
> > > Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr:
> 1,2
> > >
> > >  Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr:
> 1,2
> > >
> > > *Producer Code:*
> > >
> > >      Properties props = new Properties();
> > >
> > >         props.put("metadata.broker.list", args[0]);
> > >
> > >         props.put("zk.connect", args[1]);
> > >
> > >         props.put("serializer.class",
> "kafka.serializer.StringEncoder");
> > >
> > >         props.put("request.required.acks", "1");
> > >
> > >         String TOPIC = args[2];
> > >
> > >         ProducerConfig config = new ProducerConfig(props);
> > >
> > >         Producer<String, String> producer = new Producer<String,
> String>(
> > > config);
> > >
> > >         finalEvent = new Timestamp(new Date().getTime()) + "|"
> > >
> > >                     + truckIds[0] + "|" + driverIds[0] + "|" +
> > > events[random
> > > .nextInt(evtCnt)]
> > >
> > >                     + "|" + getLatLong(arrayroute17[i]);
> > >
> > >         try {
> > >
> > >                 KeyedMessage<String, String> data = new
> > > KeyedMessage<String, String>(TOPIC, finalEvent);
> > >
> > >                 LOG.info("Sending Messge #: " + routeName[0] + ": " + i
> > +",
> > > msg:" + finalEvent);
> > >
> > >                 producer.send(data);
> > >
> > >                 Thread.sleep(1000);
> > >
> > >             } catch (Exception e) {
> > >
> > >                 e.printStackTrace();
> > >
> > >             }
> > >
> > >
> > > *Consumer Code:*
> > >
> > > public class ConsumerTest implements Runnable {
> > >
> > >    private KafkaStream m_stream;
> > >
> > >    private int m_threadNumber;
> > >
> > >    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
> > >
> > >        m_threadNumber = a_threadNumber;
> > >
> > >        m_stream = a_stream;
> > >
> > >    }
> > >
> > >    public void run() {
> > >
> > >        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
> > >
> > >        while (it.hasNext()){
> > >
> > >            System.out.println("Thread " + m_threadNumber + ": " + new
> > > String(it.next().message()));
> > >
> > >            try {
> > >
> > >              Thread.sleep(1000);
> > >
> > >             }catch (InterruptedException e) {
> > >
> > >                  e.printStackTrace();
> > >
> > >  }
> > >
> > >        }
> > >
> > >        System.out.println("Shutting down Thread: " + m_threadNumber);
> > >
> > >    }
> > >
> > > }
> > >
> > > public class ConsumerGroupExample {
> > >
> > >     private final ConsumerConnector consumer;
> > >
> > >     private final String topic;
> > >
> > >     private  ExecutorService executor;
> > >
> > >
> > >
> > >     public ConsumerGroupExample(String a_zookeeper, String a_groupId,
> > > String a_topic) {
> > >
> > >         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> > >
> > >                 createConsumerConfig(a_zookeeper, a_groupId));
> > >
> > >         this.topic = a_topic;
> > >
> > >     }
> > >
> > >
> > >
> > >     public void shutdown() {
> > >
> > >         if (consumer != null) consumer.shutdown();
> > >
> > >         if (executor != null) executor.shutdown();
> > >
> > >         try {
> > >
> > >             if (!executor.awaitTermination(5000,
> TimeUnit.MILLISECONDS))
> > {
> > >
> > >                 System.out.println("Timed out waiting for consumer
> > threads
> > > to shut down, exiting uncleanly");
> > >
> > >             }
> > >
> > >         } catch (InterruptedException e) {
> > >
> > >             System.out.println("Interrupted during shutdown, exiting
> > > uncleanly");
> > >
> > >         }
> > >
> > >    }
> > >
> > >
> > >
> > >     public void run(int a_numThreads) {
> > >
> > >         Map<String, Integer> topicCountMap = new HashMap<String,
> > > Integer>();
> > >
> > >         topicCountMap.put(topic, new Integer(a_numThreads));
> > >
> > >         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > consumer.createMessageStreams(topicCountMap);
> > >
> > >         List<KafkaStream<byte[], byte[]>> streams =
> > consumerMap.get(topic);
> > >
> > >         executor = Executors.newFixedThreadPool(a_numThreads);
> > >
> > >         int threadNumber = 0;
> > >
> > >         for (final KafkaStream stream : streams) {
> > >
> > >             executor.submit(new ConsumerTest(stream, threadNumber));
> > >
> > >             threadNumber++;
> > >
> > >         }
> > >
> > >     }
> > >
> > >
> > >
> > >     private static ConsumerConfig createConsumerConfig(String
> > a_zookeeper,
> > > String a_groupId) {
> > >
> > >         Properties props = new Properties();
> > >
> > >         props.put("zookeeper.connect", a_zookeeper);
> > >
> > >         props.put("group.id", a_groupId);
> > >
> > >         props.put("zookeeper.session.timeout.ms", "400");
> > >
> > >         props.put("zookeeper.sync.time.ms", "200");
> > >
> > >         props.put("auto.commit.interval.ms", "1000");
> > >
> > >         props.put("consumer.timeout.ms", "-1");
> > >
> > >          return new ConsumerConfig(props);
> > >
> > >     }
> > >
> > >
> > >
> > >     public static void main(String[] args) {
> > >
> > >         String zooKeeper = args[0];
> > >
> > >         String groupId = args[1];
> > >
> > >         String topic = args[2];
> > >
> > >         int threads = Integer.parseInt(args[3]);
> > >
> > >         ConsumerGroupExample example = new
> > ConsumerGroupExample(zooKeeper,
> > > groupId, topic);
> > >
> > >         example.run(threads);
> > >
> > >         try {
> > >
> > >             Thread.sleep(10000);
> > >
> > >         } catch (InterruptedException ie) {
> > >
> > >
> > >
> > >         }
> > >
> > >         example.shutdown();
> > >
> > >     }
> > >
> > > }
> > >
> >
> >
> >
> > --
> > Regards,
> > Tao
> >
>



-- 
Regards,
Tao

Reply via email to