Hi oleg; 1. The consumer shell (The default tool :./kafka-console-consumer.sh --zookeeper xxx:2181 --topic yy ) consumes my messages. Only my programmed listener is not consuming ..I provided the code in the previous thread.
Here is my producer (I believe this works, because, the* kafka consumer.sh * consumes) import java.io.FileNotFoundException; import java.util.Properties; import com.xx.core.impl.KafkaConfigurationLoader; import com.xx.core.model.base.RawFile; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; /** * Class to produce some test messages to the Kafka server * * @author ratha * */ public class KafkaMessageProducer { private Properties properties; private String topic; private File file; private KafkaProducer<String, File> producer; public KafkaMessageProducer(String topic, File file) { this.topic = topic; this.file = file; KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader(); try { properties = confLoader.loadProducerConfig(); producer = new KafkaProducer<>(properties); } catch (FileNotFoundException e) { e.printStackTrace(); } } public void generateMessgaes() { try { for(int i=0; i<10;i++){ producer.send(new ProducerRecord<String, File>(topic, file)); } } catch (Exception e) { e.printStackTrace(); System.out.println("Error in publishing messages to the topic : " + topic); } finally { producer.close(); } } } Thanks On 30 March 2016 at 09:31, Oleg Zhurakousky <ozhurakou...@hortonworks.com> wrote: > Ratha > > " I published messages and started the listener ---> No success”. > > I would stop right here as you already see an issue. So I am assuming the > listener you started is console listener and with that I am going to assume > it works providing message is on the topic. And since nothing happened then > you probably have issue in producer. > > Could you post producer code. > > Oleg > > > On Mar 29, 2016, at 6:24 PM, Ratha v <vijayara...@gmail.com> wrote: > > > > Hi Oleg; > > > > Thanks for the guide..Here is my troubleshooting steps; > > > > > > 1. I published messages and started the listener ---> No success > > 2. I set the "auto.offset.reset" property to *earliest --> No success* > > 3. *Set the "consumer.timeout" to 1 --> Nothing happens* > > 4. *T*hought may be my messages are not published, so started the > > consumer shell script (default tool in the kafka distribution), that > > consumes my messages well. > > 5. Changed hasnext() to while (true) --> no change in the behaviour > > > > > > Im really confused..:( > > > > Here is the code; > > > > *executor* > > > > public void start() { > > > > List<String> topics = Arrays.asList(topic); > > > > ExecutorService executor = Executors.newFixedThreadPool(CoreConstants. > > THREAD_SIZE); > > > > ListenerThread lThread = new ListenerThread(topics, properties); > > > > executor.submit(lThread); > > > > > > Runtime.getRuntime().addShutdownHook(new Thread() { > > > > @Override > > > > public void run() { > > > > lThread.shutdown(); > > > > executor.shutdown(); > > > > try { > > > > executor.awaitTermination(5000, TimeUnit.MILLISECONDS); > > > > } catch (InterruptedException e) { > > > > e.printStackTrace(); > > > > } > > > > } > > > > }); > > > > } > > > > > > > > *Thread* > > > > > > > > public void run() { > > > > try { > > > > consumer.subscribe(topics); > > > > while (true) { > > > > ConsumerRecords<String, File> records = consumer.poll(100); > > > > System.out.println("&&&&&&2222 : "+records.count()); > > > > for (ConsumerRecord<String, RawFile> record : records) { > > > > System.out.println("&&&&&&333"); > > > > FileProcessor processor = new FileProcessor(); > > > > processor.processFile(record.value()); > > > > > > System.out.println("&&&&&&"+record.value()); > > > > } > > > > } > > > > > > } catch (Throwable e) { > > > > e.printStackTrace(); > > > > System.out.println("eror in polling"); > > > > // ignore for shutdown > > > > } finally { > > > > consumer.close(); > > > > } > > > > > > > > Do you think any other issues from my end? > > > > Thanks > > > > > > On 29 March 2016 at 23:13, Oleg Zhurakousky < > ozhurakou...@hortonworks.com> > > wrote: > > > >> Ratha > >> > >> It appears you have couple of issues here, so I’ll start with the > consumer > >> first. > >> If you do a search on this mailing list on “Consumer deadlock” in the > >> subject you’ll find a thread where similar symptoms were discussed. > >> Basically the hasNext() method you mentioned is implemented as a > blocking > >> call and while we may all have opinion about that decision and why > Iterator > >> was chosen in the first place, it is what it is. But from what I > understand > >> it simply means that there are no messages to poll from the topic (yes I > >> know, hasNext()=false seems natural here but. . .). What you can do is > set ‘ > >> consumer.timeout.ms’ property to value such as ‘1’. By doing so you are > >> stating that you are willing to block for no longer then 1 millisecond. > >> But you also mention that you are sending message to the topic and > >> therefore have reasonable expectation to poll something from it but yet > >> you’re blocking. That is strange indeed. What I would suggest is to try > one > >> thing at the time. Use your Java producer, in conjunction with console > >> consumer. This will help to narrow down the problem (e.g., issue with > >> producer that may not be actually sending). Then hopefully if you > receive > >> successfully then you know the problem is in the consumer and so on. > >> > >> Cheers > >> Oleg > >> > >> On Mar 29, 2016, at 12:48 AM, Ratha v <vijayara...@gmail.com<mailto: > >> vijayara...@gmail.com>> wrote: > >> > >> Hi all; > >> I publish a java object and try to consume it.. > >> I have a poll method to consume objects, but it never returns any > >> objects..My program runs forever.(?) > >> > >> *ConsumerThread* > >> > >> public void run() { > >> > >> try { > >> > >> consumer.subscribe(topics); > >> > >> > >> Iterator<ConsumerRecord<String, File>> it = > consumer.poll(1000).iterator(); > >> > >> while (it.hasNext()) { > >> > >> ConsumerRecord<String, File> record = it.next(); > >> > >> FileProcessor processor = new FileProcessor(); > >> > >> processor.processFile(record.value()); > >> > >> System.out.println("-----" + ": " + record); > >> > >> > >> > >> When i debug it never goes inside the while loop. Why is that? > >> > >> I publish object like this; > >> > >> producer.send(new ProducerRecord<String, File>(topic, file)); > >> > >> > >> Thanks > >> -- > >> -Ratha > >> http://vvratha.blogspot.com/ > >> > >> > > > > > > -- > > -Ratha > > http://vvratha.blogspot.com/ > > -- -Ratha http://vvratha.blogspot.com/