Hi oleg;

   1. The consumer shell  (The default tool :./kafka-console-consumer.sh
   --zookeeper xxx:2181 --topic yy
   ) consumes my messages. Only my programmed listener is not consuming ..I
   provided the code in the previous thread.


Here is my producer (I believe this works, because, the* kafka consumer.sh *
 consumes)




import java.io.FileNotFoundException;
import java.util.Properties;

import com.xx.core.impl.KafkaConfigurationLoader;
import com.xx.core.model.base.RawFile;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * Class to produce some test messages to the Kafka server
 *
 * @author ratha
 *
 */
public class KafkaMessageProducer {
private Properties properties;
private String topic;
private File file;
private KafkaProducer<String, File> producer;

public KafkaMessageProducer(String topic, File file) {
this.topic = topic;
this.file = file;

KafkaConfigurationLoader confLoader = new KafkaConfigurationLoader();
try {
properties = confLoader.loadProducerConfig();

producer = new KafkaProducer<>(properties);
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}

public void generateMessgaes() {
try {
for(int i=0; i<10;i++){
producer.send(new ProducerRecord<String, File>(topic, file));
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("Error in publishing messages to the topic : " + topic);

} finally {
producer.close();
}
}

}

Thanks


On 30 March 2016 at 09:31, Oleg Zhurakousky <ozhurakou...@hortonworks.com>
wrote:

> Ratha
>
> " I published messages and started the listener ---> No success”.
>
> I would stop right here as you already see an issue. So I am assuming the
> listener you started is console listener and with that I am going to assume
> it works providing message is on the topic. And since nothing happened then
> you probably have issue in producer.
>
> Could you post producer code.
>
> Oleg
>
> > On Mar 29, 2016, at 6:24 PM, Ratha v <vijayara...@gmail.com> wrote:
> >
> > Hi Oleg;
> >
> > Thanks for the guide..Here is my troubleshooting steps;
> >
> >
> >   1. I published messages and started the listener ---> No success
> >   2. I set the "auto.offset.reset" property to *earliest  --> No success*
> >   3. *Set the "consumer.timeout" to 1 --> Nothing happens*
> >   4. *T*hought may be my messages are not published, so started the
> >   consumer shell script (default tool in the kafka distribution), that
> >   consumes my messages well.
> >   5. Changed hasnext() to while (true)  --> no change in the behaviour
> >
> >
> > Im really confused..:(
> >
> > Here is the code;
> >
> > *executor*
> >
> > public void start() {
> >
> > List<String> topics = Arrays.asList(topic);
> >
> > ExecutorService executor = Executors.newFixedThreadPool(CoreConstants.
> > THREAD_SIZE);
> >
> > ListenerThread lThread = new ListenerThread(topics, properties);
> >
> > executor.submit(lThread);
> >
> >
> > Runtime.getRuntime().addShutdownHook(new Thread() {
> >
> > @Override
> >
> > public void run() {
> >
> > lThread.shutdown();
> >
> > executor.shutdown();
> >
> > try {
> >
> > executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
> >
> > } catch (InterruptedException e) {
> >
> > e.printStackTrace();
> >
> > }
> >
> > }
> >
> > });
> >
> > }
> >
> >
> >
> > *Thread*
> >
> >
> >
> > public void run() {
> >
> > try {
> >
> > consumer.subscribe(topics);
> >
> > while (true) {
> >
> > ConsumerRecords<String, File> records = consumer.poll(100);
> >
> > System.out.println("&&&&&&2222 : "+records.count());
> >
> > for (ConsumerRecord<String, RawFile> record : records) {
> >
> > System.out.println("&&&&&&333");
> >
> > FileProcessor processor = new FileProcessor();
> >
> > processor.processFile(record.value());
> >
> >
> > System.out.println("&&&&&&"+record.value());
> >
> > }
> >
> > }
> >
> >
> > } catch (Throwable e) {
> >
> > e.printStackTrace();
> >
> > System.out.println("eror in polling");
> >
> > // ignore for shutdown
> >
> > } finally {
> >
> > consumer.close();
> >
> > }
> >
> >
> >
> > Do you think any other issues from my end?
> >
> > Thanks
> >
> >
> > On 29 March 2016 at 23:13, Oleg Zhurakousky <
> ozhurakou...@hortonworks.com>
> > wrote:
> >
> >> Ratha
> >>
> >> It appears you have couple of issues here, so I’ll start with the
> consumer
> >> first.
> >> If you do a search on this mailing list on “Consumer deadlock” in the
> >> subject you’ll find a thread where similar symptoms were discussed.
> >> Basically the hasNext() method you mentioned is implemented as a
> blocking
> >> call and while we may all have opinion about that decision and why
> Iterator
> >> was chosen in the first place, it is what it is. But from what I
> understand
> >> it simply means that there are no messages to poll from the topic (yes I
> >> know, hasNext()=false seems natural here but. . .). What you can do is
> set ‘
> >> consumer.timeout.ms’ property to value such as ‘1’. By doing so you are
> >> stating that you are willing to block for no longer then 1 millisecond.
> >> But you also mention that you are sending message to the topic and
> >> therefore have reasonable expectation to poll something from it but yet
> >> you’re blocking. That is strange indeed. What I would suggest is to try
> one
> >> thing at the time. Use your Java producer, in conjunction with console
> >> consumer. This will help to narrow down the problem (e.g., issue with
> >> producer that may not be actually sending). Then hopefully if you
> receive
> >> successfully then you know the problem is in the consumer and so on.
> >>
> >> Cheers
> >> Oleg
> >>
> >> On Mar 29, 2016, at 12:48 AM, Ratha v <vijayara...@gmail.com<mailto:
> >> vijayara...@gmail.com>> wrote:
> >>
> >> Hi all;
> >> I publish a java object and try to consume it..
> >> I have a poll method to consume objects, but it never returns any
> >> objects..My program runs forever.(?)
> >>
> >> *ConsumerThread*
> >>
> >> public void run() {
> >>
> >> try {
> >>
> >> consumer.subscribe(topics);
> >>
> >>
> >> Iterator<ConsumerRecord<String, File>> it =
> consumer.poll(1000).iterator();
> >>
> >> while (it.hasNext()) {
> >>
> >> ConsumerRecord<String, File> record = it.next();
> >>
> >> FileProcessor processor = new FileProcessor();
> >>
> >> processor.processFile(record.value());
> >>
> >> System.out.println("-----" + ": " + record);
> >>
> >>
> >>
> >> When i debug it never goes inside the while loop. Why is that?
> >>
> >> I publish object like this;
> >>
> >> producer.send(new ProducerRecord<String, File>(topic, file));
> >>
> >>
> >> Thanks
> >> --
> >> -Ratha
> >> http://vvratha.blogspot.com/
> >>
> >>
> >
> >
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
>
>


-- 
-Ratha
http://vvratha.blogspot.com/

Reply via email to