Hi there,
First let me describe the desired result.
I would like to have multiple threads subsribe to a particular topic under
one group_id.
I know that the following example demonstrates how to do this:
// create 4 partitions of the stream for topic “test”, to allow 4 threads
> to consume
> Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams =
> consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));
> List<KafkaMessageStream<Message>> streams =
> topicMessageStreams.get("test");
> // create list of 4 threads to consume from each of the partitions
> ExecutorService executor = Executors.newFixedThreadPool(4);
> // consume the messages in the threads
> for(final KafkaMessageStream<Message> stream: streams) {
> executor.submit(new Runnable() {
> public void run() {
> for(Message message: stream) {
> // process message
> }
> }
> });
BUT I would like to be able to create the connection to the broker from
WITHIN each thread. Is this possible?
For example, each thread does the following.
creates createJavaConsumerConnector(consumerConfig);
> Map<String, Integer> topic_count = new HashMap<String, Integer>();
> topic_count.put(topic, new Integer(1));
> Map<String, List<KafkaMessageStream<Message>>> consumer_map =
> consumerConnector
> .createMessageStreams(topic_count);
>
> KafkaMessageStream<Message> stream =
> consumer_map.get(topic).get(0);
>
> ConsumerIterator<Message> iter = stream.iterator();
>
> while(iter.hasNext())
> {
> System.out.println(KafkaUtils.getString(iter.next()));
> }
Rather than partitioning before the threads are started?
Thank you so much
--
Juan Wellington Moreno
*Software Engineer*