Hi,

I am trying to consume message from kafka with using web service. For exapmle; 
when the user call that web service, the all messages from partitions will be 
read by consumers and web service will returns the kafka messages to user who 
called it.

I used below code for that purpose but ConsumerIterator's next or hasNext() 
methods never returns false expect any error occurs, so The web service method 
will never be ended.

I also try to set props.put("consumer.timeout.ms", "1000"); for taking 
exception but at that time the threads were blocked in

/**
     * Dispatch an uncaught exception to the handler. This method is
     * intended to be called only by the JVM.
     */
    private void dispatchUncaughtException(Throwable e) {
        getUncaughtExceptionHandler().uncaughtException(this, e);
    }

That method.

Is there any body who achived the consume messages with invoking any function 
like web service or etc?

Thanks.

Code ;

package org.gt.kafka.lastvers;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class gtKafkaConsumerTask implements Callable {
      private KafkaStream stream;
      private int threadNumber;


      public gtKafkaConsumerTask(KafkaStream stream, int threadNumber) {
            this.threadNumber = threadNumber;
            this.stream = stream;
      }

    @Override
      public Object call() throws Exception {
            String currMessage="";
            String retMessages="";
            int sizes;

            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            currMessage=new String(it.next().message(),"UTF-8");
            retMessages+=currMessage+"\n";

            while (currMessage !=null)
                  try {
                        currMessage=new String(it.next().message(),"UTF-8");
                        /*System.out.println("Thread " + threadNumber + ": "
                                          + currMessage);*/
                        retMessages+=currMessage+"\n";
                  } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                  }
                System.out.println("Shutting down Thread: " + threadNumber);

                return retMessages.substring(0,retMessages.length()-1);
      }

}



Barış Akgün
Analitik Veri Ambarı ve Büyük Veri Yönetimi
Uzman

Tel

:

Dahili

:

Faks

:



Bu mesaj ve ekleri, mesajda gonderildigi belirtilen kisi/kisilere ozeldir ve 
gizlidir. Bu mesajin muhatabi olmamaniza ragmen tarafiniza ulasmis olmasi 
halinde mesaj iceriginin gizliligi ve bu gizlilik yukumlulugune uyulmasi 
zorunlulugu tarafiniz icin de soz konusudur. Mesaj ve eklerinde yer alan 
bilgilerin dogrulugu ve guncelligi konusunda gonderenin ya da sirketimizin 
herhangi bir sorumlulugu bulunmamaktadir. Sirketimiz mesajin ve bilgilerinin 
size degisiklige ugrayarak veya gec ulasmasindan, butunlugunun ve gizliliginin 
korunamamasindan, virus icermesinden ve bilgisayar sisteminize verebilecegi 
herhangi bir zarardan sorumlu tutulamaz.

This message and attachments are confidential and intended solely for the 
individual(s) stated in this message. If you received this message although you 
are not the addressee, you are responsible to keep the message confidential. 
The sender has no responsibility for the accuracy or correctness of the 
information in the message and its attachments. Our company shall have no 
liability for any changes or late receiving, loss of integrity and 
confidentiality, viruses and any damages caused in anyway to your computer 
system.

Reply via email to