Hi all,

I checked out the java source and looked at the java examples.  They worked 
well in my IDE and on the console.  However, I also tried the threaded example 
following the consumer group example.  The problem is, this example is not 
working and toString on the stream iterator returns the words "empty iterator". 
 Below, run2() method is the run method from the source code, THAT WORKS.  The 
run() method below is from the Consumer Group Example and DOES NOT WORK.

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

It simply prints messages like

Created iterator empty iterator thread number 9
Created iterator empty iterator thread number 1
Shutting down Thread: 1
Created iterator empty iterator thread number 3

And continues doing so as I produce message using the console producer and does 
not print messages.




Im not sure if this is a versioning issue, or what might be the cause.   But 
help is appreciated!



Here is the Consumer class:

import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerIterator;

public class Consumer implements Runnable {

    private KafkaStream kafkaStream;
    private Integer threadNumber;

    public Consumer(KafkaStream kafkaStream, Integer threadNumber) {
        this.threadNumber = threadNumber;
        this.kafkaStream = kafkaStream;
    }

    public void run() {
        ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
        System.out.println("Created iterator " + it.toString() + " thread 
number " + threadNumber);
        while(it.hasNext()) {
            System.out.println("Thread " + threadNumber + ": " + new 
String(it.next().message()));

            // validate
            // enrich
            // dispatch
        }
        System.out.println("Shutting down Thread: " + threadNumber);
    }

}




In my ConsumerThreadPool class:


public class ConsumerThreadPool {

    private final ConsumerConnector consumer;
    private final String topic;

    private ExecutorService executor;
    private static ApplicationContext context = new 
AnnotationConfigApplicationContext(AppConfig.class);

    public ConsumerThreadPool(String topic) {
        consumer = 
kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)context.getBean("consumerConfig"));
        this.topic = topic;
    }

    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
    }

    public void run(Integer numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

        topicCountMap.put(topic, new Integer(numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        // create threads
        executor = Executors.newFixedThreadPool(numThreads);

        // now create an object to consume the messages
        Integer threadNumber = 0;
        for(KafkaStream<byte[], byte[]> stream : streams) {
            executor.submit(new Consumer(stream, threadNumber));
            threadNumber++;
        }
    }


    public void run2() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

        topicCountMap.put(topic, new Integer(1));

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);

        KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while(true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            while(it.hasNext()){
                System.out.println(new String(it.next().message()));

            }
        }
    }

}



The AppConfig is pretty simple:

@Configuration
@ComponentScan("com.truecar.inventory.worker.core")
public class AppConfig {

    @Bean
    @Named("sharedProducerConsumerConfig")
    private static Properties sharedProducerConsumerConfig() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", "127.0.0.1:2181");
        properties.put("group.id", "intelligence");
        properties.put("zookeeper.session.timeout.ms", "400");
        properties.put("zookeeper.sync.time.ms", "200");
        properties.put("auto.commit.interval.ms", "1000");
        return properties;
    }

    @Bean
    @Named("consumerConfig")
    private static ConsumerConfig consumerConfig() {
        Properties properties = sharedProducerConsumerConfig();
        return new ConsumerConfig(properties);
    }

    @Bean
    @Named("producerConfig")
    private static ProducerConfig producerConfig() {
        Properties properties = sharedProducerConsumerConfig();
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("metadata.broker.list", "localhost:9092");
        return new ProducerConfig(properties);
    }

}


--
[cid:2DEF4F6D-0510-4F64-8691-61C7B1DE18FD]

Reply via email to