Hi all;
Im publishing to the remote kafka server and try to consume messages from
that remote server. (Kafka v 0.90.1)
Publishing works fine but nor the consuming.

*Publisher*

package org.test;

*import java.io.IOException;*
*import java.util.Properties;*

*import org.apache.kafka.clients.producer.KafkaProducer;*
*import org.apache.kafka.clients.producer.ProducerRecord;*


*public class Producer {*

* private void generateMessgaes() throws IOException {*
* String topic = "MY_TOPIC";*

* Properties props = new Properties();*
* props.put("bootstrap.servers", "kafka.xx.com:9092
<http://kafka.xx.com:9092>");*
* props.put("acks", "all");*
* props.put("retries", 0);*
* props.put("batch.size", 16384);*
* props.put("linger.ms <http://linger.ms>", 1);*
* props.put("buffer.memory", 33554432);*
* props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");*
* props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");*
* props.put("serializer.class",
"org.apache.kafka.common.serialization.StringSerializer");*


* KafkaProducer<String, String> producer = null;*
* try {*
* producer = new KafkaProducer<>(props);*
* for (int i = 0; i < 10; i++) {*
* producer.send(new ProducerRecord<String, String>(topic, "test msg"));*
* System.out.println("producing---");*
* }*

* } catch (Throwable e) {*
* e.printStackTrace();*
* System.out.println("Error in publishing messages to the topic : " +
topic);*

* } finally {*
* producer.close();*
* }*
* }*

* public static void main(String[] args) throws IOException {*
* Producer producer = new Producer();*
* producer.generateMessgaes();*
* System.out.println("$$$$$");*
* }*
*}*


I can see "*producing--- and $$$$ *prints. But when i try to consume, i do
not see "polling " print messages.. It got stuck at poll(timeout).

Any clue? (I have asked this question before, asking again :()

*Consumer*

*package org.test;*


*import java.util.Arrays;*

*import java.util.List;*

*import java.util.Properties;*

*import org.apache.kafka.clients.consumer.ConsumerRecord;*

*import org.apache.kafka.clients.consumer.ConsumerRecords;*

*import org.apache.kafka.clients.consumer.KafkaConsumer;*



*public class Listener {*


* public void start() throws CoreException {*


* String topic = "MY_TOPIC";*


* List<String> topics = Arrays.asList(topic);*


* Properties props = new Properties();*

* props.put("bootstrap.servers", "kafka.xx.com:9092
<http://kafka.xx.com:9092>");*

* props.put("enable.auto.commit", true);*

* props.put("receive.buffer.bytes", 262144);*

* props.put("consumer.timeout.ms <http://consumer.timeout.ms>", 10000);*

* props.put("session.timeout.ms <http://session.timeout.ms>", 7000);*

* props.put("heartbeat.interval.ms <http://heartbeat.interval.ms>", 1000);*

* props.put("auto.offset.reset", "earliest");*

* props.put("group.id <http://group.id>", "test");*

* props.put("fetch.min.bytes", 1);*

* props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");*

* props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");*

* props.put("serializer.class",
"org.apache.kafka.common.serialization.StringDeserializer");*

* KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
String>(props);*

* consumer.subscribe(topics);*


* try {*

* while (true) {*

* ConsumerRecords<String, String> records = consumer.poll(100);*

* System.out.println("polling msges : " + records.count());*

* for (ConsumerRecord<String, String> record : records) {*

* System.out.println("kafka record : " + record.value());*

* }*

* }*

* } catch (Throwable e) {*

* e.printStackTrace();*

* System.out.println("eror in polling");*

* } finally {*

* consumer.close();*

* }*

* }*


* public static void main(String args[]) throws CoreException {*


* Listener listener = new Listener();*

* listener.start();*


* }*

*}*

Thanks
-- 
-Ratha
http://vvratha.blogspot.com/

Reply via email to